Rajandran R Creator of OpenAlgo - OpenSource Algo Trading framework for Indian Traders. Building GenAI Applications. Telecom Engineer turned Full-time Derivative Trader. Mostly Trading Nifty, Banknifty, High Liquid Stock Derivatives. Trading the Markets Since 2006 onwards. Using Market Profile and Orderflow for more than a decade. Designed and published 100+ open source trading systems on various trading tools. Strongly believe that market understanding and robust trading frameworks are the key to the trading success. Building Algo Platforms, Writing about Markets, Trading System Design, Market Sentiment, Trading Softwares & Trading Nuances since 2007 onwards. Author of Marketcalls.in

Storing WebSocket Stock Market Tick Data in ClickHouse using Python

17 min read

Credit & Inspiration
This guide was inspired by the article ““Storing tick by tick Webscocket data into ClickHouse” by Ravindra Elicherla. The original article, published on Medium, provided valuable insights and served as a foundation for this expanded tutorial.

We highly recommend reading the original article for additional context and perspectives on working with WebSocket data and ClickHouse.

Model used to Build the Code : Claude Sonnet 3.5

Total Time taken – 2 hours for Installation of Click House, Prompts , Testing and Documentation

Real-time market data is essential for traders and investors making informed decisions. This guide will walk you through the process of capturing WebSocket stock market data and storing it efficiently in ClickHouse, a powerful column-oriented database.

Prerequisites

Before you begin, ensure you have the following:

Brokerage Account: You need an account with a broker that provides WebSocket market data.

API Credentials: Obtain the following from your broker’s API portal:

    • Client ID
    • API Key
    • Client PIN
    • TOTP Secret (for generating Time-based One-Time Passwords)

    System Requirements:

      • Python 3.7 or higher
      • ClickHouse database installed on your system or a remote server
      • Stable internet connection for WebSocket data streaming

      Basic Knowledge:

        • Familiarity with Python programming
        • Understanding of WebSocket concepts
        • Basic SQL knowledge for querying ClickHouse

        Additional Information: (Optional)

          • Your local and public IP addresses
          • Your system’s MAC address
          • Any state variable required by your broker’s API

          Once you have these prerequisites in place, you’re ready to set up the environment and start capturing WebSocket stock market data.

          Complete source code is available at GitHub Storing Websocket Feed to Clickhouse using Python

          Why ClickHouse for Stock Market Data?

          ClickHouse offers several advantages for storing and analyzing market data:

          • High-speed Queries: Process billions of rows in seconds for quick market analysis.
          • Efficient Storage: Column-oriented structure allows for better data compression.
          • Real-time Data Handling: Efficiently manages high-frequency data inserts from live market feeds.
          • Scalability: Easily accommodate growing data volumes by adding more servers.
          • SQL Compatibility: Familiar query language for most financial analysts and developers.

          Setting Up the Environment

          Before we dive into the code, ensure you have the following set up:

          1. Install ClickHouse on your system.
          2. Install required Python libraries:
          pip install clickhouse-connect websockets asyncio aiohttp python-dotenv backoff
          1. Set up a .env file with your broker credentials to store securely and ClickHouse configuration.
          # AngelOne credentials
          ANGEL_CLIENT_ID=YOUR_CLIENT_ID
          ANGEL_CLIENT_PIN=YOUR_PIN
          ANGEL_TOTP_CODE=YOUR_TOTP_CODE
          ANGEL_API_KEY=YOUR_API_KEY
          ANGEL_CLIENT_LOCAL_IP=YOUR_LOCAL_IP
          ANGEL_CLIENT_PUBLIC_IP=YOUR_PUBLIC_IP
          ANGEL_MAC_ADDRESS=YOUR_MAC_ADDRESS
          ANGEL_STATE_VARIABLE=YOUR_STATE_VARIABLE
          
          # ClickHouse configuration
          CLICKHOUSE_HOST=localhost
          CLICKHOUSE_PORT=8123
          CLICKHOUSE_USER=default
          CLICKHOUSE_PASSWORD=

          The Python Script to Fetch Websockets and Store in ClickHouse

          Here’s the Python script clickhouse-websockets-angelone.py that captures WebSocket market data and stores it in ClickHouse:

          import asyncio
          import websockets
          import json
          import aiohttp
          from datetime import datetime
          import clickhouse_connect
          from smartWebSocketV2 import SmartWebSocketV2
          import os
          from dotenv import load_dotenv
          import backoff
          
          # Load environment variables
          load_dotenv()
          
          # AngelOne credentials and configuration
          client_id = os.getenv('ANGEL_CLIENT_ID')
          client_pin = os.getenv('ANGEL_CLIENT_PIN')
          totp_code = os.getenv('ANGEL_TOTP_CODE')
          api_key = os.getenv('ANGEL_API_KEY')
          client_local_ip = os.getenv('ANGEL_CLIENT_LOCAL_IP')
          client_public_ip = os.getenv('ANGEL_CLIENT_PUBLIC_IP')
          mac_address = os.getenv('ANGEL_MAC_ADDRESS')
          state_var = os.getenv('ANGEL_STATE_VARIABLE')
          
          # Global variables for tokens
          AUTH_TOKEN = None
          FEED_TOKEN = None
          
          # ClickHouse configuration
          CLICKHOUSE_HOST = os.getenv('CLICKHOUSE_HOST')
          CLICKHOUSE_PORT = int(os.getenv('CLICKHOUSE_PORT'))
          CLICKHOUSE_USER = os.getenv('CLICKHOUSE_USER')
          CLICKHOUSE_PASSWORD = os.getenv('CLICKHOUSE_PASSWORD')
          
          # Initialize ClickHouse client
          client = clickhouse_connect.get_client(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT, 
                                                 username=CLICKHOUSE_USER, password=CLICKHOUSE_PASSWORD)
          
          def create_clickhouse_table():
              query = '''
              CREATE TABLE IF NOT EXISTS angelone_market_data (
                  token String,
                  timestamp DateTime64(3),
                  last_traded_price Float64,
                  open_price Float64,
                  high_price Float64,
                  low_price Float64,
                  close_price Float64,
                  volume Float64
              ) ENGINE = MergeTree()
              ORDER BY timestamp
              '''
              client.command(query)
              print("Connected to ClickHouse. Server version:", client.server_version)
          
          def store_data_in_clickhouse(data):
              try:
                  values = [
                      (data['token'],
                       datetime.now(),
                       data['last_traded_price'],
                       data['open_price_of_the_day'],
                       data['high_price_of_the_day'],
                       data['low_price_of_the_day'],
                       data['closed_price'],
                       data['volume_trade_for_the_day'])
                  ]
                  client.insert('angelone_market_data', values)
              except Exception as e:
                  print(f"Error storing data in ClickHouse: {e}")
          
          async def generate_tokens():
              global AUTH_TOKEN, FEED_TOKEN
              
              url = "https://apiconnect.angelone.in/rest/auth/angelbroking/user/v1/loginByPassword"
              
              payload = json.dumps({
                  "clientcode": client_id,
                  "password": client_pin,
                  "totp": totp_code,
                  "state": state_var
              })
              
              headers = {
                  'Content-Type': 'application/json',
                  'Accept': 'application/json',
                  'X-UserType': 'USER',
                  'X-SourceID': 'WEB',
                  'X-ClientLocalIP': client_local_ip,
                  'X-ClientPublicIP': client_public_ip,
                  'X-MACAddress': mac_address,
                  'X-PrivateKey': api_key
              }
              
              async with aiohttp.ClientSession() as session:
                  async with session.post(url, data=payload, headers=headers) as response:
                      if response.status == 200:
                          data = await response.json()
                          AUTH_TOKEN = data['data']['jwtToken']
                          FEED_TOKEN = data['data']['feedToken']
                          print("Tokens generated successfully.")
                      else:
                          print(f"Failed to generate tokens. Status: {response.status}")
                          print(await response.text())
                          raise Exception("Token generation failed")
          
          @backoff.on_exception(backoff.expo, Exception, max_tries=10)
          async def connect_with_retry():
              return await connect_websocket()
          
          async def connect_websocket():
              if not AUTH_TOKEN or not FEED_TOKEN:
                  print("Tokens are not set. Make sure to generate tokens first.")
                  return
          
              uri = "wss://smartapisocket.angelone.in/smart-stream"
              headers = {
                  "Authorization": f"Bearer {AUTH_TOKEN}",
                  "x-api-key": api_key,
                  "x-client-code": client_id,
                  "x-feed-token": FEED_TOKEN
              }
          
              while True:
                  try:
                      async with websockets.connect(uri, extra_headers=headers, 
                                                    ping_interval=20, ping_timeout=10) as websocket:
                          print("WebSocket connected")
                          await subscribe_to_quotes(websocket)
                          
                          ws_parser = SmartWebSocketV2(AUTH_TOKEN, api_key, client_id, FEED_TOKEN)
                          
                          while True:
                              try:
                                  message = await asyncio.wait_for(websocket.recv(), timeout=30)
                                  if isinstance(message, bytes):
                                      parsed_message = ws_parser._parse_binary_data(message)
                                      if parsed_message:
                                          process_and_store_data(parsed_message)
                              except asyncio.TimeoutError:
                                  print("No data received for 30 seconds, sending ping...")
                                  pong_waiter = await websocket.ping()
                                  await asyncio.wait_for(pong_waiter, timeout=10)
                              except websockets.exceptions.ConnectionClosed:
                                  print("WebSocket connection closed, reconnecting...")
                                  break
                  except Exception as e:
                      print(f"Error in WebSocket connection: {e}")
                  
                  print("Attempting to reconnect in 5 seconds...")
                  await asyncio.sleep(5)
          
          def process_and_store_data(data):
              try:
                  adjusted_data = {
                      'token': str(data['token']),
                      'last_traded_price': float(data['last_traded_price']) / 100,
                      'open_price_of_the_day': float(data.get('open_price_of_the_day', 0)) / 100,
                      'high_price_of_the_day': float(data.get('high_price_of_the_day', 0)) / 100,
                      'low_price_of_the_day': float(data.get('low_price_of_the_day', 0)) / 100,
                      'closed_price': float(data.get('closed_price', 0)) / 100,
                      'volume_trade_for_the_day': float(data.get('volume_trade_for_the_day', 0))
                  }
                  
                  store_data_in_clickhouse(adjusted_data)
                  
                  current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
                  minute = datetime.now().strftime("%H:%M")
                  
                  output = f"[{current_time}] {adjusted_data['token']} - minute: {minute} | "
                  output += f"open: {adjusted_data['open_price_of_the_day']:.1f} | "
                  output += f"close: {adjusted_data['last_traded_price']:.1f} | "
                  output += f"high: {adjusted_data['high_price_of_the_day']:.1f} | "
                  output += f"low: {adjusted_data['low_price_of_the_day']:.1f} | "
                  output += f"volume: {adjusted_data['volume_trade_for_the_day']:.3f}"
                  
                  print(output, flush=True)
              except Exception as e:
                  print(f"Error in process_and_store_data: {e}")
                  print(f"Error occurred with data: {data}")
          
          async def subscribe_to_quotes(websocket):
              subscribe_message = json.dumps({
                  "correlationID": "ws_test",
                  "action": 1,
                  "params": {
                      "mode": 2,
                      "tokenList": [{"exchangeType": 1, "tokens": ["2885"]}]  # RELIANCE token
                  }
              })
              await websocket.send(subscribe_message)
              print("Subscribed to RELIANCE quotes")
          
          async def main():
              create_clickhouse_table()
              while True:
                  try:
                      await generate_tokens()
                      await connect_with_retry()
                  except Exception as e:
                      print(f"An error occurred: {e}")
                  print("Restarting the entire process in 10 seconds...")
                  await asyncio.sleep(10)
          
          if __name__ == "__main__":
              asyncio.run(main())

          Decoding the Websocket Data

          Use this python code smartWebSocketV2.py to decode the websocket data. This websocket decoding coded is extracted from AngelOne Python Library

          import struct
          import time
          import ssl
          import json
          import websocket
          import os
          import logging
          import logzero
          from logzero import logger
          
          class SmartWebSocketV2(object):
              """
              SmartAPI Web Socket version 2
              """
          
              ROOT_URI = "wss://smartapisocket.angelone.in/smart-stream"
              HEART_BEAT_MESSAGE = "ping"
              HEART_BEAT_INTERVAL = 10  # Adjusted to 10s
              LITTLE_ENDIAN_BYTE_ORDER = "<"
              RESUBSCRIBE_FLAG = False
              # HB_THREAD_FLAG = True
          
              # Available Actions
              SUBSCRIBE_ACTION = 1
              UNSUBSCRIBE_ACTION = 0
          
              # Possible Subscription Mode
              LTP_MODE = 1
              QUOTE = 2
              SNAP_QUOTE = 3
              DEPTH = 4
          
              # Exchange Type
              NSE_CM = 1
              NSE_FO = 2
              BSE_CM = 3
              BSE_FO = 4
              MCX_FO = 5
              NCX_FO = 7
              CDE_FO = 13
          
              # Subscription Mode Map
              SUBSCRIPTION_MODE_MAP = {
                  1: "LTP",
                  2: "QUOTE",
                  3: "SNAP_QUOTE",
                  4: "DEPTH"
              }
          
              wsapp = None
              input_request_dict = {}
              current_retry_attempt = 0
          
              def __init__(self, auth_token, api_key, client_code, feed_token, max_retry_attempt=1,retry_strategy=0, retry_delay=10, retry_multiplier=2, retry_duration=60):
                  """
                      Initialise the SmartWebSocketV2 instance
                      Parameters
                      ------
                      auth_token: string
                          jwt auth token received from Login API
                      api_key: string
                          api key from Smart API account
                      client_code: string
                          angel one account id
                      feed_token: string
                          feed token received from Login API
                  """
                  self.auth_token = auth_token
                  self.api_key = api_key
                  self.client_code = client_code
                  self.feed_token = feed_token
                  self.DISCONNECT_FLAG = True
                  self.last_pong_timestamp = None
                  self.MAX_RETRY_ATTEMPT = max_retry_attempt
                  self.retry_strategy = retry_strategy
                  self.retry_delay = retry_delay
                  self.retry_multiplier = retry_multiplier
                  self.retry_duration = retry_duration        
                  # Create a log folder based on the current date
                  log_folder = time.strftime("%Y-%m-%d", time.localtime())
                  log_folder_path = os.path.join("logs", log_folder)  # Construct the full path to the log folder
                  os.makedirs(log_folder_path, exist_ok=True) # Create the log folder if it doesn't exist
                  log_path = os.path.join(log_folder_path, "app.log") # Construct the full path to the log file
                  logzero.logfile(log_path, loglevel=logging.INFO)  # Output logs to a date-wise log file
                  
                  if not self._sanity_check():
                      logger.error("Invalid initialization parameters. Provide valid values for all the tokens.")
                      raise Exception("Provide valid value for all the tokens")
          
              def _sanity_check(self):
                  if not all([self.auth_token, self.api_key, self.client_code, self.feed_token]):
                      return False
                  return True
          
              def _on_message(self, wsapp, message):
                  logger.info(f"Received message: {message}")
                  if message != "pong":
                      parsed_message = self._parse_binary_data(message)
                      # Check if it's a control message (e.g., heartbeat)
                      if self._is_control_message(parsed_message):
                          self._handle_control_message(parsed_message)
                      else:
                          self.on_data(wsapp, parsed_message)
                  else:
                      self.on_message(wsapp, message)
          
              def _is_control_message(self, parsed_message):
                  return "subscription_mode" not in parsed_message
          
              def _handle_control_message(self, parsed_message):
                  if parsed_message["subscription_mode"] == 0:
                      self._on_pong(self.wsapp, "pong")
                  elif parsed_message["subscription_mode"] == 1:
                      self._on_ping(self.wsapp, "ping")
                  # Invoke on_control_message callback with the control message data
                  if hasattr(self, 'on_control_message'):
                      self.on_control_message(self.wsapp, parsed_message)
          
              def _on_data(self, wsapp, data, data_type, continue_flag):
                  if data_type == 2:
                      parsed_message = self._parse_binary_data(data)
                      self.on_data(wsapp, parsed_message)
          
              def _on_open(self, wsapp):
                  if self.RESUBSCRIBE_FLAG:
                      self.resubscribe()
                  else:
                      self.on_open(wsapp)
          
              def _on_pong(self, wsapp, data):
                  if data == self.HEART_BEAT_MESSAGE:
                      timestamp = time.time()
                      formatted_timestamp = time.strftime("%d-%m-%y %H:%M:%S", time.localtime(timestamp))
                      logger.info(f"In on pong function ==> {data}, Timestamp: {formatted_timestamp}")
                      self.last_pong_timestamp = timestamp
          
              def _on_ping(self, wsapp, data):
                  timestamp = time.time()
                  formatted_timestamp = time.strftime("%d-%m-%y %H:%M:%S", time.localtime(timestamp))
                  logger.info(f"In on ping function ==> {data}, Timestamp: {formatted_timestamp}")
                  self.last_ping_timestamp = timestamp
          
              def subscribe(self, correlation_id, mode, token_list):
                  """
                      This Function subscribe the price data for the given token
                      Parameters
                      ------
                      correlation_id: string
                          A 10 character alphanumeric ID client may provide which will be returned by the server in error response
                          to indicate which request generated error response.
                          Clients can use this optional ID for tracking purposes between request and corresponding error response.
                      mode: integer
                          It denotes the subscription type
                          possible values -> 1, 2 and 3
                          1 -> LTP
                          2 -> Quote
                          3 -> Snap Quote
                      token_list: list of dict
                          Sample Value ->
                              [
                                  { "exchangeType": 1, "tokens": ["10626", "5290"]},
                                  {"exchangeType": 5, "tokens": [ "234230", "234235", "234219"]}
                              ]
                              exchangeType: integer
                              possible values ->
                                  1 -> nse_cm
                                  2 -> nse_fo
                                  3 -> bse_cm
                                  4 -> bse_fo
                                  5 -> mcx_fo
                                  7 -> ncx_fo
                                  13 -> cde_fo
                              tokens: list of string
                  """
                  try:
                      request_data = {
                          "correlationID": correlation_id,
                          "action": self.SUBSCRIBE_ACTION,
                          "params": {
                              "mode": mode,
                              "tokenList": token_list
                          }
                      }
                      if mode == 4:
                          for token in token_list:
                                  if token.get('exchangeType') != 1:
                                      error_message = f"Invalid ExchangeType:{token.get('exchangeType')} Please check the exchange type and try again it support only 1 exchange type"
                                      logger.error(error_message)
                                      raise ValueError(error_message)
                      
                      if self.input_request_dict.get(mode) is None:
                          self.input_request_dict[mode] = {}
          
                      for token in token_list:
                          if token['exchangeType'] in self.input_request_dict[mode]:
                              self.input_request_dict[mode][token['exchangeType']].extend(token["tokens"])
                          else:
                              self.input_request_dict[mode][token['exchangeType']] = token["tokens"]
          
                      if mode == self.DEPTH:
                          total_tokens = sum(len(token["tokens"]) for token in token_list)
                          quota_limit = 50
                          if total_tokens > quota_limit:
                              error_message = f"Quota exceeded: You can subscribe to a maximum of {quota_limit} tokens only."
                              logger.error(error_message)
                              raise Exception(error_message)
          
                      self.wsapp.send(json.dumps(request_data))
                      self.RESUBSCRIBE_FLAG = True
          
                  except Exception as e:
                      logger.error(f"Error occurred during subscribe: {e}")
                      raise e
                  
              def unsubscribe(self, correlation_id, mode, token_list):
                  """
                      This function unsubscribe the data for given token
                      Parameters
                      ------
                      correlation_id: string
                          A 10 character alphanumeric ID client may provide which will be returned by the server in error response
                          to indicate which request generated error response.
                          Clients can use this optional ID for tracking purposes between request and corresponding error response.
                      mode: integer
                          It denotes the subscription type
                          possible values -> 1, 2 and 3
                          1 -> LTP
                          2 -> Quote
                          3 -> Snap Quote
                      token_list: list of dict
                          Sample Value ->
                              [
                                  { "exchangeType": 1, "tokens": ["10626", "5290"]},
                                  {"exchangeType": 5, "tokens": [ "234230", "234235", "234219"]}
                              ]
                              exchangeType: integer
                              possible values ->
                                  1 -> nse_cm
                                  2 -> nse_fo
                                  3 -> bse_cm
                                  4 -> bse_fo
                                  5 -> mcx_fo
                                  7 -> ncx_fo
                                  13 -> cde_fo
                              tokens: list of string
                  """
                  try:
                      request_data = {
                          "correlationID": correlation_id,
                          "action": self.UNSUBSCRIBE_ACTION,
                          "params": {
                              "mode": mode,
                              "tokenList": token_list
                          }
                      }
                      self.input_request_dict.update(request_data)
                      self.wsapp.send(json.dumps(request_data))
                      self.RESUBSCRIBE_FLAG = True
                  except Exception as e:
                      logger.error(f"Error occurred during unsubscribe: {e}")
                      raise e
          
              def resubscribe(self):
                  try:
                      for key, val in self.input_request_dict.items():
                          token_list = []
                          for key1, val1 in val.items():
                              temp_data = {
                                  'exchangeType': key1,
                                  'tokens': val1
                              }
                              token_list.append(temp_data)
                          request_data = {
                              "action": self.SUBSCRIBE_ACTION,
                              "params": {
                                  "mode": key,
                                  "tokenList": token_list
                              }
                          }
                          self.wsapp.send(json.dumps(request_data))
                  except Exception as e:
                      logger.error(f"Error occurred during resubscribe: {e}")
                      raise e
          
              def connect(self):
                  """
                      Make the web socket connection with the server
                  """
                  headers = {
                      "Authorization": self.auth_token,
                      "x-api-key": self.api_key,
                      "x-client-code": self.client_code,
                      "x-feed-token": self.feed_token
                  }
          
                  try:
                      self.wsapp = websocket.WebSocketApp(self.ROOT_URI, header=headers, on_open=self._on_open,
                                                          on_error=self._on_error, on_close=self._on_close, on_data=self._on_data,
                                                          on_ping=self._on_ping,
                                                          on_pong=self._on_pong)
                      self.wsapp.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, ping_interval=self.HEART_BEAT_INTERVAL,
                                             ping_payload=self.HEART_BEAT_MESSAGE)
                  except Exception as e:
                      logger.error(f"Error occurred during WebSocket connection: {e}")
                      raise e
          
              def close_connection(self):
                  """
                  Closes the connection
                  """
                  self.RESUBSCRIBE_FLAG = False
                  self.DISCONNECT_FLAG = True
                  if self.wsapp:
                      self.wsapp.close()
          
              def _on_error(self, wsapp, error):
                  self.RESUBSCRIBE_FLAG = True
                  if self.current_retry_attempt < self.MAX_RETRY_ATTEMPT:
                      logger.warning(f"Attempting to resubscribe/reconnect (Attempt {self.current_retry_attempt + 1})...")
                      self.current_retry_attempt += 1
                      if self.retry_strategy == 0: #retry_strategy for simple
                          time.sleep(self.retry_delay)
                      elif self.retry_strategy == 1: #retry_strategy for exponential
                          delay = self.retry_delay * (self.retry_multiplier ** (self.current_retry_attempt - 1))
                          time.sleep(delay)
                      else:
                          logger.error(f"Invalid retry strategy {self.retry_strategy}")
                          raise Exception(f"Invalid retry strategy {self.retry_strategy}")
                      try:
                          self.close_connection()
                          self.connect()
                      except Exception as e:
                          logger.error(f"Error occurred during resubscribe/reconnect: {e}")
                          if hasattr(self, 'on_error'):
                              self.on_error("Reconnect Error", str(e) if str(e) else "Unknown error")
                  else:
                      self.close_connection()
                      if hasattr(self, 'on_error'):
                          self.on_error("Max retry attempt reached", "Connection closed")
                      if self.retry_duration is not None and (self.last_pong_timestamp is not None and time.time() - self.last_pong_timestamp > self.retry_duration * 60):
                          logger.warning("Connection closed due to inactivity.")
                      else:
                          logger.warning("Connection closed due to max retry attempts reached.")
          
              def _on_close(self, wsapp):
                  self.on_close(wsapp)
          
              def _parse_binary_data(self, binary_data):
                  parsed_data = {
                      "subscription_mode": self._unpack_data(binary_data, 0, 1, byte_format="B")[0],
                      "exchange_type": self._unpack_data(binary_data, 1, 2, byte_format="B")[0],
                      "token": SmartWebSocketV2._parse_token_value(binary_data[2:27]),
                      "sequence_number": self._unpack_data(binary_data, 27, 35, byte_format="q")[0],
                      "exchange_timestamp": self._unpack_data(binary_data, 35, 43, byte_format="q")[0],
                      "last_traded_price": self._unpack_data(binary_data, 43, 51, byte_format="q")[0]
                  }
                  try:
                      parsed_data["subscription_mode_val"] = self.SUBSCRIPTION_MODE_MAP.get(parsed_data["subscription_mode"])
          
                      if parsed_data["subscription_mode"] in [self.QUOTE, self.SNAP_QUOTE]:
                          parsed_data["last_traded_quantity"] = self._unpack_data(binary_data, 51, 59, byte_format="q")[0]
                          parsed_data["average_traded_price"] = self._unpack_data(binary_data, 59, 67, byte_format="q")[0]
                          parsed_data["volume_trade_for_the_day"] = self._unpack_data(binary_data, 67, 75, byte_format="q")[0]
                          parsed_data["total_buy_quantity"] = self._unpack_data(binary_data, 75, 83, byte_format="d")[0]
                          parsed_data["total_sell_quantity"] = self._unpack_data(binary_data, 83, 91, byte_format="d")[0]
                          parsed_data["open_price_of_the_day"] = self._unpack_data(binary_data, 91, 99, byte_format="q")[0]
                          parsed_data["high_price_of_the_day"] = self._unpack_data(binary_data, 99, 107, byte_format="q")[0]
                          parsed_data["low_price_of_the_day"] = self._unpack_data(binary_data, 107, 115, byte_format="q")[0]
                          parsed_data["closed_price"] = self._unpack_data(binary_data, 115, 123, byte_format="q")[0]
          
                      if parsed_data["subscription_mode"] == self.SNAP_QUOTE:
                          parsed_data["last_traded_timestamp"] = self._unpack_data(binary_data, 123, 131, byte_format="q")[0]
                          parsed_data["open_interest"] = self._unpack_data(binary_data, 131, 139, byte_format="q")[0]
                          parsed_data["open_interest_change_percentage"] = self._unpack_data(binary_data, 139, 147, byte_format="q")[0]
                          parsed_data["upper_circuit_limit"] = self._unpack_data(binary_data, 347, 355, byte_format="q")[0]
                          parsed_data["lower_circuit_limit"] = self._unpack_data(binary_data, 355, 363, byte_format="q")[0]
                          parsed_data["52_week_high_price"] = self._unpack_data(binary_data, 363, 371, byte_format="q")[0]
                          parsed_data["52_week_low_price"] = self._unpack_data(binary_data, 371, 379, byte_format="q")[0]
                          best_5_buy_and_sell_data = self._parse_best_5_buy_and_sell_data(binary_data[147:347])
                          parsed_data["best_5_buy_data"] = best_5_buy_and_sell_data["best_5_sell_data"]
                          parsed_data["best_5_sell_data"] = best_5_buy_and_sell_data["best_5_buy_data"]
          
                      if parsed_data["subscription_mode"] == self.DEPTH:
                          parsed_data.pop("sequence_number", None)
                          parsed_data.pop("last_traded_price", None)
                          parsed_data.pop("subscription_mode_val", None)
                          parsed_data["packet_received_time"]=self._unpack_data(binary_data, 35, 43, byte_format="q")[0]
                          depth_data_start_index = 43
                          depth_20_data = self._parse_depth_20_buy_and_sell_data(binary_data[depth_data_start_index:])
                          parsed_data["depth_20_buy_data"] = depth_20_data["depth_20_buy_data"]
                          parsed_data["depth_20_sell_data"] = depth_20_data["depth_20_sell_data"]
          
                      return parsed_data
                  except Exception as e:
                      logger.error(f"Error occurred during binary data parsing: {e}")
                      raise e
          
              def _unpack_data(self, binary_data, start, end, byte_format="I"):
                  """
                      Unpack Binary Data to the integer according to the specified byte_format.
                      This function returns the tuple
                  """
                  return struct.unpack(self.LITTLE_ENDIAN_BYTE_ORDER + byte_format, binary_data[start:end])
          
              @staticmethod
              def _parse_token_value(binary_packet):
                  token = ""
                  for i in range(len(binary_packet)):
                      if chr(binary_packet[i]) == '\x00':
                          return token
                      token += chr(binary_packet[i])
                  return token
          
              def _parse_best_5_buy_and_sell_data(self, binary_data):
          
                  def split_packets(binary_packets):
                      packets = []
          
                      i = 0
                      while i < len(binary_packets):
                          packets.append(binary_packets[i: i + 20])
                          i += 20
                      return packets
          
                  best_5_buy_sell_packets = split_packets(binary_data)
          
                  best_5_buy_data = []
                  best_5_sell_data = []
          
                  for packet in best_5_buy_sell_packets:
                      each_data = {
                          "flag": self._unpack_data(packet, 0, 2, byte_format="H")[0],
                          "quantity": self._unpack_data(packet, 2, 10, byte_format="q")[0],
                          "price": self._unpack_data(packet, 10, 18, byte_format="q")[0],
                          "no of orders": self._unpack_data(packet, 18, 20, byte_format="H")[0]
                      }
          
                      if each_data["flag"] == 0:
                          best_5_buy_data.append(each_data)
                      else:
                          best_5_sell_data.append(each_data)
          
                  return {
                      "best_5_buy_data": best_5_buy_data,
                      "best_5_sell_data": best_5_sell_data
                  }
          
              def _parse_depth_20_buy_and_sell_data(self, binary_data):
                  depth_20_buy_data = []
                  depth_20_sell_data = []
          
                  for i in range(20):
                      buy_start_idx = i * 10
                      sell_start_idx = 200 + i * 10
          
                      # Parse buy data
                      buy_packet_data = {
                          "quantity": self._unpack_data(binary_data, buy_start_idx, buy_start_idx + 4, byte_format="i")[0],
                          "price": self._unpack_data(binary_data, buy_start_idx + 4, buy_start_idx + 8, byte_format="i")[0],
                          "num_of_orders": self._unpack_data(binary_data, buy_start_idx + 8, buy_start_idx + 10, byte_format="h")[0],
                      }
          
                      # Parse sell data
                      sell_packet_data = {
                          "quantity": self._unpack_data(binary_data, sell_start_idx, sell_start_idx + 4, byte_format="i")[0],
                          "price": self._unpack_data(binary_data, sell_start_idx + 4, sell_start_idx + 8, byte_format="i")[0],
                          "num_of_orders": self._unpack_data(binary_data, sell_start_idx + 8, sell_start_idx + 10, byte_format="h")[0],
                      }
          
                      depth_20_buy_data.append(buy_packet_data)
                      depth_20_sell_data.append(sell_packet_data)
          
                  return {
                      "depth_20_buy_data": depth_20_buy_data,
                      "depth_20_sell_data": depth_20_sell_data
                  }
          
              def on_message(self, wsapp, message):
                  pass
          
              def on_data(self, wsapp, data):
                  pass
          
              def on_control_message(self, wsapp, message):
                  pass
          
              def on_close(self, wsapp):
                  pass
          
              def on_open(self, wsapp):
                  pass
          
              def on_error(self):
                  pass

          Understanding the Code

          Let’s break down the key components of this script:

          1. Environment Setup: We use python-dotenv to load sensitive credentials from a .env file.
          2. ClickHouse Connection: We establish a connection to ClickHouse using the clickhouse_connect library.
          3. Table Creation: The create_clickhouse_table() function ensures our table exists, creating it if necessary.
          4. Token Generation: generate_tokens() authenticates with the broker and obtains the necessary tokens for the WebSocket connection.
          5. WebSocket Connection: connect_websocket() establishes a connection to the broker’s WebSocket feed and continuously receives market data.
          6. Data Processing: process_and_store_data() takes each tick of data, adjusts it, and stores it in ClickHouse.
          7. Resilience: We use the backoff library to implement exponential backoff for reconnection attempts.
          8. Main Loop: The main() function ties everything together, creating the table, generating tokens, and maintaining the WebSocket connection.

          Checking Data in ClickHouse

          After running your script, you can verify that data is being stored correctly in ClickHouse using the command-line client. Here’s how:

          1. Open a new terminal window.
          2. Start the ClickHouse client:
          clickhouse-client

          If your ClickHouse server is on a different machine or uses non-default settings, you may need to specify connection details:

          clickhouse-client --host=your_host --port=your_port --user=your_user --password=your_password
          1. Once connected, run these SQL queries to check your data:

          Count the number of rows:

          SELECT COUNT(*) 
          FROM angelone_market_data;

          View the latest 50 entries:

          SELECT * 
          FROM angelone_market_data 
          ORDER BY timestamp 
          DESC LIMIT 50;
          Storing Reliance Websocket Realtime Data into Clickhouse DB
          1. To exit the ClickHouse client, type exit or press Ctrl+D.

          These queries will give you a quick overview of how much data has been collected and allow you to inspect the most recent entries, ensuring that your WebSocket connection is working correctly and market data is being stored as expected.

          Scalability and Performance

          This setup is designed for scalability:

          • ClickHouse can handle large volumes of data efficiently.
          • The script uses asynchronous programming, allowing it to handle high-frequency data updates.
          • By adjusting the ClickHouse configuration, you can optimize for even larger datasets.

          The Value of Storing Market Data

          Storing detailed market data offers several benefits:

          • Historical Analysis: Analyze past market behavior to inform future strategies.
          • Strategy Backtesting: Test trading algorithms against real historical data.
          • Compliance: Maintain detailed records for regulatory requirements.
          • Machine Learning: Use the data to train predictive models.

          Applications for Traders

          Traders can leverage this setup in several ways:

          • Real-time Analytics: Query recent data to inform immediate trading decisions.
          • Custom Indicators: Develop and test custom technical indicators.
          • Risk Management: Analyze market volatility and liquidity patterns.
          • Performance Tracking: Monitor the performance of specific stocks or sectors over time.

          Conclusion

          This guide provides a robust solution for capturing and storing WebSocket stock market data using ClickHouse and Python. By implementing this system, you’re setting up a powerful infrastructure for data-driven trading and analysis.

          Remember to handle your broker credentials securely and comply with all relevant data usage policies. As you become more familiar with ClickHouse and this data pipeline, you’ll likely discover additional ways to leverage this setup to gain insights and inform your trading strategies.

          The next step is to start exploring your data. Consider developing custom queries to extract meaningful insights, or connecting this database to your favorite analysis tools. Happy trading!

          Rajandran R Creator of OpenAlgo - OpenSource Algo Trading framework for Indian Traders. Building GenAI Applications. Telecom Engineer turned Full-time Derivative Trader. Mostly Trading Nifty, Banknifty, High Liquid Stock Derivatives. Trading the Markets Since 2006 onwards. Using Market Profile and Orderflow for more than a decade. Designed and published 100+ open source trading systems on various trading tools. Strongly believe that market understanding and robust trading frameworks are the key to the trading success. Building Algo Platforms, Writing about Markets, Trading System Design, Market Sentiment, Trading Softwares & Trading Nuances since 2007 onwards. Author of Marketcalls.in

          How to Speed Up a 1 Billion Iterations Loop…

          Python is a versatile and user-friendly programming language, but it’s often criticized for being slow compared to compiled languages like C or C++. A...
          Rajandran R
          9 min read

          Mastering Pydantic for Traders: A Step-by-Step Guide

          Trading in India, whether in stocks, commodities, or cryptocurrencies, revolves around data. From NSE tickers to API responses from brokers handling structured and...
          Rajandran R
          3 min read

          SketchMaker AI: Create Stunning AI Visuals and Your Own…

          SketchMaker AI is an open-source tool that transforms text into art, allowing you to create stunning AI images, blog banners, Instagram and YouTube thumbnails...
          Rajandran R
          3 min read

          Leave a Reply

          Get Notifications, Alerts on Market Updates, Trading Tools, Automation & More