We highly recommend reading the original article for additional context and perspectives on working with WebSocket data and ClickHouse.
Model used to Build the Code : Claude Sonnet 3.5
Total Time taken – 2 hours for Installation of Click House, Prompts , Testing and Documentation
Real-time market data is essential for traders and investors making informed decisions. This guide will walk you through the process of capturing WebSocket stock market data and storing it efficiently in ClickHouse, a powerful column-oriented database.
- Prerequisites
- Why ClickHouse for Stock Market Data?
- Setting Up the Environment
- The Python Script to Fetch Websockets and Store in ClickHouse
- Decoding the Websocket Data
- Understanding the Code
- Checking Data in ClickHouse
- Scalability and Performance
- The Value of Storing Market Data
- Applications for Traders
- Conclusion
Prerequisites
Before you begin, ensure you have the following:
Brokerage Account: You need an account with a broker that provides WebSocket market data.
API Credentials: Obtain the following from your broker’s API portal:
- Client ID
- API Key
- Client PIN
- TOTP Secret (for generating Time-based One-Time Passwords)
System Requirements:
- Python 3.7 or higher
- ClickHouse database installed on your system or a remote server
- Stable internet connection for WebSocket data streaming
Basic Knowledge:
- Familiarity with Python programming
- Understanding of WebSocket concepts
- Basic SQL knowledge for querying ClickHouse
Additional Information: (Optional)
- Your local and public IP addresses
- Your system’s MAC address
- Any state variable required by your broker’s API
Once you have these prerequisites in place, you’re ready to set up the environment and start capturing WebSocket stock market data.
Complete source code is available at GitHub – Storing Websocket Feed to Clickhouse using Python
Why ClickHouse for Stock Market Data?
ClickHouse offers several advantages for storing and analyzing market data:
- High-speed Queries: Process billions of rows in seconds for quick market analysis.
- Efficient Storage: Column-oriented structure allows for better data compression.
- Real-time Data Handling: Efficiently manages high-frequency data inserts from live market feeds.
- Scalability: Easily accommodate growing data volumes by adding more servers.
- SQL Compatibility: Familiar query language for most financial analysts and developers.
Setting Up the Environment
Before we dive into the code, ensure you have the following set up:
- Install ClickHouse on your system.
- Install required Python libraries:
pip install clickhouse-connect websockets asyncio aiohttp python-dotenv backoff
- Set up a
.env
file with your broker credentials to store securely and ClickHouse configuration.
# AngelOne credentials
ANGEL_CLIENT_ID=YOUR_CLIENT_ID
ANGEL_CLIENT_PIN=YOUR_PIN
ANGEL_TOTP_CODE=YOUR_TOTP_CODE
ANGEL_API_KEY=YOUR_API_KEY
ANGEL_CLIENT_LOCAL_IP=YOUR_LOCAL_IP
ANGEL_CLIENT_PUBLIC_IP=YOUR_PUBLIC_IP
ANGEL_MAC_ADDRESS=YOUR_MAC_ADDRESS
ANGEL_STATE_VARIABLE=YOUR_STATE_VARIABLE
# ClickHouse configuration
CLICKHOUSE_HOST=localhost
CLICKHOUSE_PORT=8123
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=
The Python Script to Fetch Websockets and Store in ClickHouse
Here’s the Python script clickhouse-websockets-angelone.py that captures WebSocket market data and stores it in ClickHouse:
import asyncio
import websockets
import json
import aiohttp
from datetime import datetime
import clickhouse_connect
from smartWebSocketV2 import SmartWebSocketV2
import os
from dotenv import load_dotenv
import backoff
# Load environment variables
load_dotenv()
# AngelOne credentials and configuration
client_id = os.getenv('ANGEL_CLIENT_ID')
client_pin = os.getenv('ANGEL_CLIENT_PIN')
totp_code = os.getenv('ANGEL_TOTP_CODE')
api_key = os.getenv('ANGEL_API_KEY')
client_local_ip = os.getenv('ANGEL_CLIENT_LOCAL_IP')
client_public_ip = os.getenv('ANGEL_CLIENT_PUBLIC_IP')
mac_address = os.getenv('ANGEL_MAC_ADDRESS')
state_var = os.getenv('ANGEL_STATE_VARIABLE')
# Global variables for tokens
AUTH_TOKEN = None
FEED_TOKEN = None
# ClickHouse configuration
CLICKHOUSE_HOST = os.getenv('CLICKHOUSE_HOST')
CLICKHOUSE_PORT = int(os.getenv('CLICKHOUSE_PORT'))
CLICKHOUSE_USER = os.getenv('CLICKHOUSE_USER')
CLICKHOUSE_PASSWORD = os.getenv('CLICKHOUSE_PASSWORD')
# Initialize ClickHouse client
client = clickhouse_connect.get_client(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT,
username=CLICKHOUSE_USER, password=CLICKHOUSE_PASSWORD)
def create_clickhouse_table():
query = '''
CREATE TABLE IF NOT EXISTS angelone_market_data (
token String,
timestamp DateTime64(3),
last_traded_price Float64,
open_price Float64,
high_price Float64,
low_price Float64,
close_price Float64,
volume Float64
) ENGINE = MergeTree()
ORDER BY timestamp
'''
client.command(query)
print("Connected to ClickHouse. Server version:", client.server_version)
def store_data_in_clickhouse(data):
try:
values = [
(data['token'],
datetime.now(),
data['last_traded_price'],
data['open_price_of_the_day'],
data['high_price_of_the_day'],
data['low_price_of_the_day'],
data['closed_price'],
data['volume_trade_for_the_day'])
]
client.insert('angelone_market_data', values)
except Exception as e:
print(f"Error storing data in ClickHouse: {e}")
async def generate_tokens():
global AUTH_TOKEN, FEED_TOKEN
url = "https://apiconnect.angelone.in/rest/auth/angelbroking/user/v1/loginByPassword"
payload = json.dumps({
"clientcode": client_id,
"password": client_pin,
"totp": totp_code,
"state": state_var
})
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'X-UserType': 'USER',
'X-SourceID': 'WEB',
'X-ClientLocalIP': client_local_ip,
'X-ClientPublicIP': client_public_ip,
'X-MACAddress': mac_address,
'X-PrivateKey': api_key
}
async with aiohttp.ClientSession() as session:
async with session.post(url, data=payload, headers=headers) as response:
if response.status == 200:
data = await response.json()
AUTH_TOKEN = data['data']['jwtToken']
FEED_TOKEN = data['data']['feedToken']
print("Tokens generated successfully.")
else:
print(f"Failed to generate tokens. Status: {response.status}")
print(await response.text())
raise Exception("Token generation failed")
@backoff.on_exception(backoff.expo, Exception, max_tries=10)
async def connect_with_retry():
return await connect_websocket()
async def connect_websocket():
if not AUTH_TOKEN or not FEED_TOKEN:
print("Tokens are not set. Make sure to generate tokens first.")
return
uri = "wss://smartapisocket.angelone.in/smart-stream"
headers = {
"Authorization": f"Bearer {AUTH_TOKEN}",
"x-api-key": api_key,
"x-client-code": client_id,
"x-feed-token": FEED_TOKEN
}
while True:
try:
async with websockets.connect(uri, extra_headers=headers,
ping_interval=20, ping_timeout=10) as websocket:
print("WebSocket connected")
await subscribe_to_quotes(websocket)
ws_parser = SmartWebSocketV2(AUTH_TOKEN, api_key, client_id, FEED_TOKEN)
while True:
try:
message = await asyncio.wait_for(websocket.recv(), timeout=30)
if isinstance(message, bytes):
parsed_message = ws_parser._parse_binary_data(message)
if parsed_message:
process_and_store_data(parsed_message)
except asyncio.TimeoutError:
print("No data received for 30 seconds, sending ping...")
pong_waiter = await websocket.ping()
await asyncio.wait_for(pong_waiter, timeout=10)
except websockets.exceptions.ConnectionClosed:
print("WebSocket connection closed, reconnecting...")
break
except Exception as e:
print(f"Error in WebSocket connection: {e}")
print("Attempting to reconnect in 5 seconds...")
await asyncio.sleep(5)
def process_and_store_data(data):
try:
adjusted_data = {
'token': str(data['token']),
'last_traded_price': float(data['last_traded_price']) / 100,
'open_price_of_the_day': float(data.get('open_price_of_the_day', 0)) / 100,
'high_price_of_the_day': float(data.get('high_price_of_the_day', 0)) / 100,
'low_price_of_the_day': float(data.get('low_price_of_the_day', 0)) / 100,
'closed_price': float(data.get('closed_price', 0)) / 100,
'volume_trade_for_the_day': float(data.get('volume_trade_for_the_day', 0))
}
store_data_in_clickhouse(adjusted_data)
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
minute = datetime.now().strftime("%H:%M")
output = f"[{current_time}] {adjusted_data['token']} - minute: {minute} | "
output += f"open: {adjusted_data['open_price_of_the_day']:.1f} | "
output += f"close: {adjusted_data['last_traded_price']:.1f} | "
output += f"high: {adjusted_data['high_price_of_the_day']:.1f} | "
output += f"low: {adjusted_data['low_price_of_the_day']:.1f} | "
output += f"volume: {adjusted_data['volume_trade_for_the_day']:.3f}"
print(output, flush=True)
except Exception as e:
print(f"Error in process_and_store_data: {e}")
print(f"Error occurred with data: {data}")
async def subscribe_to_quotes(websocket):
subscribe_message = json.dumps({
"correlationID": "ws_test",
"action": 1,
"params": {
"mode": 2,
"tokenList": [{"exchangeType": 1, "tokens": ["2885"]}] # RELIANCE token
}
})
await websocket.send(subscribe_message)
print("Subscribed to RELIANCE quotes")
async def main():
create_clickhouse_table()
while True:
try:
await generate_tokens()
await connect_with_retry()
except Exception as e:
print(f"An error occurred: {e}")
print("Restarting the entire process in 10 seconds...")
await asyncio.sleep(10)
if __name__ == "__main__":
asyncio.run(main())
Decoding the Websocket Data
Use this python code smartWebSocketV2.py to decode the websocket data. This websocket decoding coded is extracted from AngelOne Python Library
import struct
import time
import ssl
import json
import websocket
import os
import logging
import logzero
from logzero import logger
class SmartWebSocketV2(object):
"""
SmartAPI Web Socket version 2
"""
ROOT_URI = "wss://smartapisocket.angelone.in/smart-stream"
HEART_BEAT_MESSAGE = "ping"
HEART_BEAT_INTERVAL = 10 # Adjusted to 10s
LITTLE_ENDIAN_BYTE_ORDER = "<"
RESUBSCRIBE_FLAG = False
# HB_THREAD_FLAG = True
# Available Actions
SUBSCRIBE_ACTION = 1
UNSUBSCRIBE_ACTION = 0
# Possible Subscription Mode
LTP_MODE = 1
QUOTE = 2
SNAP_QUOTE = 3
DEPTH = 4
# Exchange Type
NSE_CM = 1
NSE_FO = 2
BSE_CM = 3
BSE_FO = 4
MCX_FO = 5
NCX_FO = 7
CDE_FO = 13
# Subscription Mode Map
SUBSCRIPTION_MODE_MAP = {
1: "LTP",
2: "QUOTE",
3: "SNAP_QUOTE",
4: "DEPTH"
}
wsapp = None
input_request_dict = {}
current_retry_attempt = 0
def __init__(self, auth_token, api_key, client_code, feed_token, max_retry_attempt=1,retry_strategy=0, retry_delay=10, retry_multiplier=2, retry_duration=60):
"""
Initialise the SmartWebSocketV2 instance
Parameters
------
auth_token: string
jwt auth token received from Login API
api_key: string
api key from Smart API account
client_code: string
angel one account id
feed_token: string
feed token received from Login API
"""
self.auth_token = auth_token
self.api_key = api_key
self.client_code = client_code
self.feed_token = feed_token
self.DISCONNECT_FLAG = True
self.last_pong_timestamp = None
self.MAX_RETRY_ATTEMPT = max_retry_attempt
self.retry_strategy = retry_strategy
self.retry_delay = retry_delay
self.retry_multiplier = retry_multiplier
self.retry_duration = retry_duration
# Create a log folder based on the current date
log_folder = time.strftime("%Y-%m-%d", time.localtime())
log_folder_path = os.path.join("logs", log_folder) # Construct the full path to the log folder
os.makedirs(log_folder_path, exist_ok=True) # Create the log folder if it doesn't exist
log_path = os.path.join(log_folder_path, "app.log") # Construct the full path to the log file
logzero.logfile(log_path, loglevel=logging.INFO) # Output logs to a date-wise log file
if not self._sanity_check():
logger.error("Invalid initialization parameters. Provide valid values for all the tokens.")
raise Exception("Provide valid value for all the tokens")
def _sanity_check(self):
if not all([self.auth_token, self.api_key, self.client_code, self.feed_token]):
return False
return True
def _on_message(self, wsapp, message):
logger.info(f"Received message: {message}")
if message != "pong":
parsed_message = self._parse_binary_data(message)
# Check if it's a control message (e.g., heartbeat)
if self._is_control_message(parsed_message):
self._handle_control_message(parsed_message)
else:
self.on_data(wsapp, parsed_message)
else:
self.on_message(wsapp, message)
def _is_control_message(self, parsed_message):
return "subscription_mode" not in parsed_message
def _handle_control_message(self, parsed_message):
if parsed_message["subscription_mode"] == 0:
self._on_pong(self.wsapp, "pong")
elif parsed_message["subscription_mode"] == 1:
self._on_ping(self.wsapp, "ping")
# Invoke on_control_message callback with the control message data
if hasattr(self, 'on_control_message'):
self.on_control_message(self.wsapp, parsed_message)
def _on_data(self, wsapp, data, data_type, continue_flag):
if data_type == 2:
parsed_message = self._parse_binary_data(data)
self.on_data(wsapp, parsed_message)
def _on_open(self, wsapp):
if self.RESUBSCRIBE_FLAG:
self.resubscribe()
else:
self.on_open(wsapp)
def _on_pong(self, wsapp, data):
if data == self.HEART_BEAT_MESSAGE:
timestamp = time.time()
formatted_timestamp = time.strftime("%d-%m-%y %H:%M:%S", time.localtime(timestamp))
logger.info(f"In on pong function ==> {data}, Timestamp: {formatted_timestamp}")
self.last_pong_timestamp = timestamp
def _on_ping(self, wsapp, data):
timestamp = time.time()
formatted_timestamp = time.strftime("%d-%m-%y %H:%M:%S", time.localtime(timestamp))
logger.info(f"In on ping function ==> {data}, Timestamp: {formatted_timestamp}")
self.last_ping_timestamp = timestamp
def subscribe(self, correlation_id, mode, token_list):
"""
This Function subscribe the price data for the given token
Parameters
------
correlation_id: string
A 10 character alphanumeric ID client may provide which will be returned by the server in error response
to indicate which request generated error response.
Clients can use this optional ID for tracking purposes between request and corresponding error response.
mode: integer
It denotes the subscription type
possible values -> 1, 2 and 3
1 -> LTP
2 -> Quote
3 -> Snap Quote
token_list: list of dict
Sample Value ->
[
{ "exchangeType": 1, "tokens": ["10626", "5290"]},
{"exchangeType": 5, "tokens": [ "234230", "234235", "234219"]}
]
exchangeType: integer
possible values ->
1 -> nse_cm
2 -> nse_fo
3 -> bse_cm
4 -> bse_fo
5 -> mcx_fo
7 -> ncx_fo
13 -> cde_fo
tokens: list of string
"""
try:
request_data = {
"correlationID": correlation_id,
"action": self.SUBSCRIBE_ACTION,
"params": {
"mode": mode,
"tokenList": token_list
}
}
if mode == 4:
for token in token_list:
if token.get('exchangeType') != 1:
error_message = f"Invalid ExchangeType:{token.get('exchangeType')} Please check the exchange type and try again it support only 1 exchange type"
logger.error(error_message)
raise ValueError(error_message)
if self.input_request_dict.get(mode) is None:
self.input_request_dict[mode] = {}
for token in token_list:
if token['exchangeType'] in self.input_request_dict[mode]:
self.input_request_dict[mode][token['exchangeType']].extend(token["tokens"])
else:
self.input_request_dict[mode][token['exchangeType']] = token["tokens"]
if mode == self.DEPTH:
total_tokens = sum(len(token["tokens"]) for token in token_list)
quota_limit = 50
if total_tokens > quota_limit:
error_message = f"Quota exceeded: You can subscribe to a maximum of {quota_limit} tokens only."
logger.error(error_message)
raise Exception(error_message)
self.wsapp.send(json.dumps(request_data))
self.RESUBSCRIBE_FLAG = True
except Exception as e:
logger.error(f"Error occurred during subscribe: {e}")
raise e
def unsubscribe(self, correlation_id, mode, token_list):
"""
This function unsubscribe the data for given token
Parameters
------
correlation_id: string
A 10 character alphanumeric ID client may provide which will be returned by the server in error response
to indicate which request generated error response.
Clients can use this optional ID for tracking purposes between request and corresponding error response.
mode: integer
It denotes the subscription type
possible values -> 1, 2 and 3
1 -> LTP
2 -> Quote
3 -> Snap Quote
token_list: list of dict
Sample Value ->
[
{ "exchangeType": 1, "tokens": ["10626", "5290"]},
{"exchangeType": 5, "tokens": [ "234230", "234235", "234219"]}
]
exchangeType: integer
possible values ->
1 -> nse_cm
2 -> nse_fo
3 -> bse_cm
4 -> bse_fo
5 -> mcx_fo
7 -> ncx_fo
13 -> cde_fo
tokens: list of string
"""
try:
request_data = {
"correlationID": correlation_id,
"action": self.UNSUBSCRIBE_ACTION,
"params": {
"mode": mode,
"tokenList": token_list
}
}
self.input_request_dict.update(request_data)
self.wsapp.send(json.dumps(request_data))
self.RESUBSCRIBE_FLAG = True
except Exception as e:
logger.error(f"Error occurred during unsubscribe: {e}")
raise e
def resubscribe(self):
try:
for key, val in self.input_request_dict.items():
token_list = []
for key1, val1 in val.items():
temp_data = {
'exchangeType': key1,
'tokens': val1
}
token_list.append(temp_data)
request_data = {
"action": self.SUBSCRIBE_ACTION,
"params": {
"mode": key,
"tokenList": token_list
}
}
self.wsapp.send(json.dumps(request_data))
except Exception as e:
logger.error(f"Error occurred during resubscribe: {e}")
raise e
def connect(self):
"""
Make the web socket connection with the server
"""
headers = {
"Authorization": self.auth_token,
"x-api-key": self.api_key,
"x-client-code": self.client_code,
"x-feed-token": self.feed_token
}
try:
self.wsapp = websocket.WebSocketApp(self.ROOT_URI, header=headers, on_open=self._on_open,
on_error=self._on_error, on_close=self._on_close, on_data=self._on_data,
on_ping=self._on_ping,
on_pong=self._on_pong)
self.wsapp.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, ping_interval=self.HEART_BEAT_INTERVAL,
ping_payload=self.HEART_BEAT_MESSAGE)
except Exception as e:
logger.error(f"Error occurred during WebSocket connection: {e}")
raise e
def close_connection(self):
"""
Closes the connection
"""
self.RESUBSCRIBE_FLAG = False
self.DISCONNECT_FLAG = True
if self.wsapp:
self.wsapp.close()
def _on_error(self, wsapp, error):
self.RESUBSCRIBE_FLAG = True
if self.current_retry_attempt < self.MAX_RETRY_ATTEMPT:
logger.warning(f"Attempting to resubscribe/reconnect (Attempt {self.current_retry_attempt + 1})...")
self.current_retry_attempt += 1
if self.retry_strategy == 0: #retry_strategy for simple
time.sleep(self.retry_delay)
elif self.retry_strategy == 1: #retry_strategy for exponential
delay = self.retry_delay * (self.retry_multiplier ** (self.current_retry_attempt - 1))
time.sleep(delay)
else:
logger.error(f"Invalid retry strategy {self.retry_strategy}")
raise Exception(f"Invalid retry strategy {self.retry_strategy}")
try:
self.close_connection()
self.connect()
except Exception as e:
logger.error(f"Error occurred during resubscribe/reconnect: {e}")
if hasattr(self, 'on_error'):
self.on_error("Reconnect Error", str(e) if str(e) else "Unknown error")
else:
self.close_connection()
if hasattr(self, 'on_error'):
self.on_error("Max retry attempt reached", "Connection closed")
if self.retry_duration is not None and (self.last_pong_timestamp is not None and time.time() - self.last_pong_timestamp > self.retry_duration * 60):
logger.warning("Connection closed due to inactivity.")
else:
logger.warning("Connection closed due to max retry attempts reached.")
def _on_close(self, wsapp):
self.on_close(wsapp)
def _parse_binary_data(self, binary_data):
parsed_data = {
"subscription_mode": self._unpack_data(binary_data, 0, 1, byte_format="B")[0],
"exchange_type": self._unpack_data(binary_data, 1, 2, byte_format="B")[0],
"token": SmartWebSocketV2._parse_token_value(binary_data[2:27]),
"sequence_number": self._unpack_data(binary_data, 27, 35, byte_format="q")[0],
"exchange_timestamp": self._unpack_data(binary_data, 35, 43, byte_format="q")[0],
"last_traded_price": self._unpack_data(binary_data, 43, 51, byte_format="q")[0]
}
try:
parsed_data["subscription_mode_val"] = self.SUBSCRIPTION_MODE_MAP.get(parsed_data["subscription_mode"])
if parsed_data["subscription_mode"] in [self.QUOTE, self.SNAP_QUOTE]:
parsed_data["last_traded_quantity"] = self._unpack_data(binary_data, 51, 59, byte_format="q")[0]
parsed_data["average_traded_price"] = self._unpack_data(binary_data, 59, 67, byte_format="q")[0]
parsed_data["volume_trade_for_the_day"] = self._unpack_data(binary_data, 67, 75, byte_format="q")[0]
parsed_data["total_buy_quantity"] = self._unpack_data(binary_data, 75, 83, byte_format="d")[0]
parsed_data["total_sell_quantity"] = self._unpack_data(binary_data, 83, 91, byte_format="d")[0]
parsed_data["open_price_of_the_day"] = self._unpack_data(binary_data, 91, 99, byte_format="q")[0]
parsed_data["high_price_of_the_day"] = self._unpack_data(binary_data, 99, 107, byte_format="q")[0]
parsed_data["low_price_of_the_day"] = self._unpack_data(binary_data, 107, 115, byte_format="q")[0]
parsed_data["closed_price"] = self._unpack_data(binary_data, 115, 123, byte_format="q")[0]
if parsed_data["subscription_mode"] == self.SNAP_QUOTE:
parsed_data["last_traded_timestamp"] = self._unpack_data(binary_data, 123, 131, byte_format="q")[0]
parsed_data["open_interest"] = self._unpack_data(binary_data, 131, 139, byte_format="q")[0]
parsed_data["open_interest_change_percentage"] = self._unpack_data(binary_data, 139, 147, byte_format="q")[0]
parsed_data["upper_circuit_limit"] = self._unpack_data(binary_data, 347, 355, byte_format="q")[0]
parsed_data["lower_circuit_limit"] = self._unpack_data(binary_data, 355, 363, byte_format="q")[0]
parsed_data["52_week_high_price"] = self._unpack_data(binary_data, 363, 371, byte_format="q")[0]
parsed_data["52_week_low_price"] = self._unpack_data(binary_data, 371, 379, byte_format="q")[0]
best_5_buy_and_sell_data = self._parse_best_5_buy_and_sell_data(binary_data[147:347])
parsed_data["best_5_buy_data"] = best_5_buy_and_sell_data["best_5_sell_data"]
parsed_data["best_5_sell_data"] = best_5_buy_and_sell_data["best_5_buy_data"]
if parsed_data["subscription_mode"] == self.DEPTH:
parsed_data.pop("sequence_number", None)
parsed_data.pop("last_traded_price", None)
parsed_data.pop("subscription_mode_val", None)
parsed_data["packet_received_time"]=self._unpack_data(binary_data, 35, 43, byte_format="q")[0]
depth_data_start_index = 43
depth_20_data = self._parse_depth_20_buy_and_sell_data(binary_data[depth_data_start_index:])
parsed_data["depth_20_buy_data"] = depth_20_data["depth_20_buy_data"]
parsed_data["depth_20_sell_data"] = depth_20_data["depth_20_sell_data"]
return parsed_data
except Exception as e:
logger.error(f"Error occurred during binary data parsing: {e}")
raise e
def _unpack_data(self, binary_data, start, end, byte_format="I"):
"""
Unpack Binary Data to the integer according to the specified byte_format.
This function returns the tuple
"""
return struct.unpack(self.LITTLE_ENDIAN_BYTE_ORDER + byte_format, binary_data[start:end])
@staticmethod
def _parse_token_value(binary_packet):
token = ""
for i in range(len(binary_packet)):
if chr(binary_packet[i]) == '\x00':
return token
token += chr(binary_packet[i])
return token
def _parse_best_5_buy_and_sell_data(self, binary_data):
def split_packets(binary_packets):
packets = []
i = 0
while i < len(binary_packets):
packets.append(binary_packets[i: i + 20])
i += 20
return packets
best_5_buy_sell_packets = split_packets(binary_data)
best_5_buy_data = []
best_5_sell_data = []
for packet in best_5_buy_sell_packets:
each_data = {
"flag": self._unpack_data(packet, 0, 2, byte_format="H")[0],
"quantity": self._unpack_data(packet, 2, 10, byte_format="q")[0],
"price": self._unpack_data(packet, 10, 18, byte_format="q")[0],
"no of orders": self._unpack_data(packet, 18, 20, byte_format="H")[0]
}
if each_data["flag"] == 0:
best_5_buy_data.append(each_data)
else:
best_5_sell_data.append(each_data)
return {
"best_5_buy_data": best_5_buy_data,
"best_5_sell_data": best_5_sell_data
}
def _parse_depth_20_buy_and_sell_data(self, binary_data):
depth_20_buy_data = []
depth_20_sell_data = []
for i in range(20):
buy_start_idx = i * 10
sell_start_idx = 200 + i * 10
# Parse buy data
buy_packet_data = {
"quantity": self._unpack_data(binary_data, buy_start_idx, buy_start_idx + 4, byte_format="i")[0],
"price": self._unpack_data(binary_data, buy_start_idx + 4, buy_start_idx + 8, byte_format="i")[0],
"num_of_orders": self._unpack_data(binary_data, buy_start_idx + 8, buy_start_idx + 10, byte_format="h")[0],
}
# Parse sell data
sell_packet_data = {
"quantity": self._unpack_data(binary_data, sell_start_idx, sell_start_idx + 4, byte_format="i")[0],
"price": self._unpack_data(binary_data, sell_start_idx + 4, sell_start_idx + 8, byte_format="i")[0],
"num_of_orders": self._unpack_data(binary_data, sell_start_idx + 8, sell_start_idx + 10, byte_format="h")[0],
}
depth_20_buy_data.append(buy_packet_data)
depth_20_sell_data.append(sell_packet_data)
return {
"depth_20_buy_data": depth_20_buy_data,
"depth_20_sell_data": depth_20_sell_data
}
def on_message(self, wsapp, message):
pass
def on_data(self, wsapp, data):
pass
def on_control_message(self, wsapp, message):
pass
def on_close(self, wsapp):
pass
def on_open(self, wsapp):
pass
def on_error(self):
pass
Understanding the Code
Let’s break down the key components of this script:
- Environment Setup: We use
python-dotenv
to load sensitive credentials from a.env
file. - ClickHouse Connection: We establish a connection to ClickHouse using the
clickhouse_connect
library. - Table Creation: The
create_clickhouse_table()
function ensures our table exists, creating it if necessary. - Token Generation:
generate_tokens()
authenticates with the broker and obtains the necessary tokens for the WebSocket connection. - WebSocket Connection:
connect_websocket()
establishes a connection to the broker’s WebSocket feed and continuously receives market data. - Data Processing:
process_and_store_data()
takes each tick of data, adjusts it, and stores it in ClickHouse. - Resilience: We use the
backoff
library to implement exponential backoff for reconnection attempts. - Main Loop: The
main()
function ties everything together, creating the table, generating tokens, and maintaining the WebSocket connection.
Checking Data in ClickHouse
After running your script, you can verify that data is being stored correctly in ClickHouse using the command-line client. Here’s how:
- Open a new terminal window.
- Start the ClickHouse client:
clickhouse-client
If your ClickHouse server is on a different machine or uses non-default settings, you may need to specify connection details:
clickhouse-client --host=your_host --port=your_port --user=your_user --password=your_password
- Once connected, run these SQL queries to check your data:
Count the number of rows:
SELECT COUNT(*)
FROM angelone_market_data;
View the latest 50 entries:
SELECT *
FROM angelone_market_data
ORDER BY timestamp
DESC LIMIT 50;
- To exit the ClickHouse client, type
exit
or press Ctrl+D.
These queries will give you a quick overview of how much data has been collected and allow you to inspect the most recent entries, ensuring that your WebSocket connection is working correctly and market data is being stored as expected.
Scalability and Performance
This setup is designed for scalability:
- ClickHouse can handle large volumes of data efficiently.
- The script uses asynchronous programming, allowing it to handle high-frequency data updates.
- By adjusting the ClickHouse configuration, you can optimize for even larger datasets.
The Value of Storing Market Data
Storing detailed market data offers several benefits:
- Historical Analysis: Analyze past market behavior to inform future strategies.
- Strategy Backtesting: Test trading algorithms against real historical data.
- Compliance: Maintain detailed records for regulatory requirements.
- Machine Learning: Use the data to train predictive models.
Applications for Traders
Traders can leverage this setup in several ways:
- Real-time Analytics: Query recent data to inform immediate trading decisions.
- Custom Indicators: Develop and test custom technical indicators.
- Risk Management: Analyze market volatility and liquidity patterns.
- Performance Tracking: Monitor the performance of specific stocks or sectors over time.
Conclusion
This guide provides a robust solution for capturing and storing WebSocket stock market data using ClickHouse and Python. By implementing this system, you’re setting up a powerful infrastructure for data-driven trading and analysis.
Remember to handle your broker credentials securely and comply with all relevant data usage policies. As you become more familiar with ClickHouse and this data pipeline, you’ll likely discover additional ways to leverage this setup to gain insights and inform your trading strategies.
The next step is to start exploring your data. Consider developing custom queries to extract meaningful insights, or connecting this database to your favorite analysis tools. Happy trading!