Efficiently retrieving broker data is critical for traders, especially when working with large datasets or multiple stock symbols. OpenAlgo, simplifies this process with its robust API capabilities. However, adhering to rate limits is essential to ensure uninterrupted access.
In this blog, you’ll learn:
1. How to fetch historical broker data using OpenAlgo.
2. How to implement rate-limit protection to avoid API throttling.
Fetching Historical Broker Data with OpenAlgo
OpenAlgo provides a simple yet powerful API to retrieve historical broker data. Here’s a quick example to fetch historical data for the SBIN stock:
from openalgo import api
# Initialize the API client
client = api(api_key='openalgo-api-key', host='http://127.0.0.1:5000')
# Fetch historical data for SBIN
df = client.history(
symbol="SBIN",
exchange="NSE",
interval="5m",
start_date="2025-01-08",
end_date="2025-01-08"
)
print(df)
Output in Pandas Dataframe
How It Works:
1. API Initialization:
• The api object is initialized with an API key and host URL.
2. Fetching Historical Data:
• Use the client.history method to specify the symbol, exchange, interval, and date range.
The result is a Pandas DataFrame containing the historical price and volume data for the given stock symbol.
The Need for Rate Limit Protection
When working with broker APIs, hitting the rate limit can result in:
• Temporary throttling or blocking.
• Loss of critical data during trading hours.
To address this, we introduce rate-limit protection using Python’s asyncio and a custom RateLimiter.
Implementing Rate-Limit Protection in OpenAlgo
Here’s how you can fetch data for multiple symbols concurrently while staying within the rate limits.
Configuration
We define:
• SYMBOLS: A list of stock symbols to fetch data for.
• RATE_LIMIT: Maximum number of requests per second (e.g., 8).
• REQUEST_WINDOW: The time window for rate-limiting (e.g., 1 second).
Rate Limiter Class
The RateLimiter ensures no more than RATE_LIMIT requests are made within the REQUEST_WINDOW:
from collections import deque
import time
import asyncio
class RateLimiter:
def __init__(self, rate_limit: int, window: float):
self.rate_limit = rate_limit
self.window = window
self.requests = deque()
async def acquire(self):
now = time.time()
while self.requests and self.requests[0] < now - self.window:
self.requests.popleft()
if len(self.requests) >= self.rate_limit:
wait_time = self.requests[0] + self.window - now
if wait_time > 0:
await asyncio.sleep(wait_time)
self.requests.append(now)
Market Hours Awareness
The script fetches data only during Indian stock market hours (9:15 AM to 3:30 PM IST):
from datetime import datetime, time as dt_time
import pytz
def is_market_hours():
ist = pytz.timezone('Asia/Kolkata')
current_time = datetime.now(ist).time()
market_start = dt_time(9, 15)
market_end = dt_time(15, 30)
return market_start <= current_time <= market_end
Fetching Data Asynchronously
The script uses asyncio to fetch data for multiple symbols concurrently while enforcing rate limits.
Fetch Data for a Single Symbol:
async def fetch_symbol_data(client, symbol: str, current_date: str, rate_limiter: RateLimiter):
await rate_limiter.acquire()
try:
return client.history(
symbol=symbol,
exchange="NSE",
interval="1m",
start_date=current_date,
end_date=current_date
)
except Exception as e:
print(f"Error fetching data for {symbol}: {e}")
return None
Fetch Data for All Symbols:
async def fetch_all_symbols(client, symbols, current_date, rate_limiter):
tasks = [
asyncio.create_task(fetch_symbol_data(client, symbol, current_date, rate_limiter))
for symbol in symbols
]
results = {}
for symbol, task in zip(symbols, tasks):
results[symbol] = await task
return results
Main Loop
The main loop:
1. Checks if the current time is within market hours.
2. Fetches data for all symbols.
3. Prints the results.
async def main_loop():
client = api(api_key='YOUR_API_KEY', host='http://127.0.0.1:5000')
rate_limiter = RateLimiter(rate_limit=8, window=1.0)
while True:
if is_market_hours():
ist = pytz.timezone('Asia/Kolkata')
current_date = datetime.now(ist).strftime('%Y-%m-%d')
results = await fetch_all_symbols(client, SYMBOLS, current_date, rate_limiter)
for symbol, df in results.items():
if df is not None:
print(f"Data fetched for {symbol}:\n{df}\n{'-'*50}")
else:
print(f"[{datetime.now(pytz.timezone('Asia/Kolkata'))}] Outside market hours. Waiting...")
await asyncio.sleep(1)
Running the Script
Save the code in a Python file marketdata.py and execute it
import time
from datetime import datetime, time as dt_time
import pytz
from openalgo import api
import asyncio
from collections import deque
from typing import List, Dict
import pandas as pd
# Configuration
SYMBOLS = ["SBIN", "RELIANCE", "TCS", "HDFCBANK", "INFY", "ICICIBANK"] # Add your symbols here
RATE_LIMIT = 8 # Requests per second (conservative from 10)
REQUEST_WINDOW = 1.0 # Window in seconds
class RateLimiter:
def __init__(self, rate_limit: int, window: float):
self.rate_limit = rate_limit
self.window = window
self.requests = deque()
async def acquire(self):
now = time.time()
# Remove old requests outside the window
while self.requests and self.requests[0] < now - self.window:
self.requests.popleft()
# If we're at the rate limit, wait until we can make another request
if len(self.requests) >= self.rate_limit:
wait_time = self.requests[0] + self.window - now
if wait_time > 0:
await asyncio.sleep(wait_time)
self.requests.append(now)
def is_market_hours():
ist = pytz.timezone('Asia/Kolkata')
current_time = datetime.now(ist).time()
market_start = dt_time(9, 15) # 9:15 AM
market_end = dt_time(15, 30) # 3:30 PM
return market_start <= current_time <= market_end
async def fetch_symbol_data(client, symbol: str, current_date: str, rate_limiter: RateLimiter) -> pd.DataFrame:
await rate_limiter.acquire()
try:
df = client.history(
symbol=symbol,
exchange="NSE",
interval="1m",
start_date=current_date,
end_date=current_date
)
return df
except Exception as e:
print(f"Error fetching data for {symbol}: {str(e)}")
return None
async def fetch_all_symbols(client, symbols: List[str], current_date: str, rate_limiter: RateLimiter) -> Dict[str, pd.DataFrame]:
tasks = []
for symbol in symbols:
task = asyncio.create_task(fetch_symbol_data(client, symbol, current_date, rate_limiter))
tasks.append((symbol, task))
results = {}
for symbol, task in tasks:
try:
df = await task
if df is not None:
results[symbol] = df
except Exception as e:
print(f"Task failed for {symbol}: {str(e)}")
return results
async def main_loop():
# Initialize the API client
client = api(
api_key='YOU_OPENALGO_API_KEY',
host='http://127.0.0.1:5000'
)
rate_limiter = RateLimiter(RATE_LIMIT, REQUEST_WINDOW)
while True:
try:
if is_market_hours():
# Get current date in IST
ist = pytz.timezone('Asia/Kolkata')
current_date = datetime.now(ist).strftime('%Y-%m-%d')
# Fetch data for all symbols
results = await fetch_all_symbols(client, SYMBOLS, current_date, rate_limiter)
# Process and print results
for symbol, df in results.items():
print(f"[{datetime.now(ist)}] Data fetched for {symbol}")
print(df)
print("-" * 50)
else:
print(f"[{datetime.now(pytz.timezone('Asia/Kolkata'))}] Outside market hours. Waiting...")
# Wait before next iteration
await asyncio.sleep(1)
except Exception as e:
print(f"Error in main loop: {str(e)}")
await asyncio.sleep(10) # Wait before retrying
if __name__ == "__main__":
asyncio.run(main_loop())
Key Features
1. Rate-Limit Compliance:
• The RateLimiter ensures no more than 8 requests per second, preventing API throttling.
2. Concurrent Fetching:
• Data for all symbols is fetched concurrently using asyncio.
3. Market Hours Awareness:
• Data is fetched only during active trading hours (IST).
4. Error Handling:
• Errors during data fetching are logged without interrupting the script.
Use Cases
This solution can be used for:
• Trading Dashboards: Real-time data updates for multiple stocks.
• Backtesting: Fetching historical data for strategy evaluation.
• Automated Trading: Real-time data feeds for trading bots.
Final Thoughts
Using OpenAlgo, you can fetch broker historical data efficiently while adhering to rate limits. The combination of rate-limit protection, market hours awareness, and asynchronous programming ensures a robust, scalable solution for traders and developers.
Get started today and take your trading data retrieval to the next level!