Skip to content
Home » WebSocket Implementation for Real-Time Data Streaming Using Python

WebSocket Implementation for Real-Time Data Streaming Using Python

Real-time data has become an essential factor across various fields, including Data Science, the financial sector, IoT, telecom and beyond. WebSocket is a communication protocol that facilitates real-time data streaming by enabling full-duplex (two-way) communication channels between a client and a server.

Why Websocket?

WebSockets plays a crucial role in enabling real-time communication on the web by providing a bidirectional, full-duplex communication channel over a single, long-lived connection.

Real-Time Data Streaming: WebSocket is especially useful in scenarios that require continuous data streams, such as live sports updates, stock market feeds, IoT sensor data, and telemetry systems.

Full-Duplex Communication: WebSocket provides full-duplex communication, meaning both the client and server can send and receive data simultaneously, allowing for more interactive and real-time applications. In contrast, HTTP is request-response based, where the client sends a request, and the server responds, which is less suitable for real-time communication.

Low Latency: WebSocket reduces latency by maintaining an open connection between the client and server. This eliminates the need to repeatedly establish and tear down connections, as is typical with HTTP, making it ideal for applications requiring quick response times which is essential requirement for streaming financial datas, IoT, multiplayer gaming etc.

Scalability: WebSocket can handle a large number of connections simultaneously, making it well-suited for applications like chat systems, online gaming, financial tickers where multiple clients need to stay connected in real time.

Real-Time Data Streaming Design

Below is a simple logical design of a WebSocket implementation on the server, which streams real-time data from an external source:

websocket design
  1. Real-time data from the external source is stored in a database table on the server.
  2. A trigger is set up to notify the system whenever new data is inserted into the database.
  3. In the WebSocket implementation, we will use the publish/subscribe (pub/sub) model. The database publishes new data to a notification channel, while the WebSocket function acts as a subscriber. The WebSocket function listens to this notification channel for updates from the corresponding database table. When the system stores new data, the WebSocket function immediately triggers and sends the updated data to connected clients.
Database Setup

Before getting started to code we first need a trigger function to notify the changes from the database table to a variable.




CREATE TRIGGER market_data_channel 
AFTER INSERT
ON public.app_market_data
FOR EACH ROW
EXECUTE FUNCTION notify_market_data_change();
1.Websocket Initialisation

The first step in implementing WebSocket is understanding how clients will connect to the socket. Typically, a WebSocket connection is established when the server receives a message from the client, along with the relevant parameters that specify the type of data the client expects to receive from the connection. In the example below, the client sends a “subscribe” message when connecting to the WebSocket, indicating the data they wish to receive.



    async def websocket_handler(self, websocket: WebSocketServerProtocol, path: str):
        self.clients[websocket] = set()
        try:
            # handles the clients while connecting with a message 
            # NB:use your logic according to your specific needs
            # example
            async for message in websocket:
                data = json.loads(message)
                if "subscribe" in data:
                    if isinstance(data["subscribe"], int):

                        self.clients[websocket].add(data["subscribe"])

                    elif isinstance(data["subscribe"], list):
                        # Multiple tokens subscription
                        tokens = data["subscribe"]
                        if all(isinstance(t, int) for t in tokens):
                            self.clients[websocket].update(tokens)

        except websockets.exceptions.ConnectionClosedOK:
            #log or handle
            pass
        except Exception as e:
            #log or handle 
            pass
        finally:
            del self.clients[websocket]

    async def start_websocket_server(self):
        self.websockets_server = await websockets.serve(self.websocket_handler, "localhost", 8766) # 8766 or any other availbe port
        await asyncio.Future()  # Keep the event loop running

    async def start_servers(self):
        try:
            await asyncio.gather(
                self.connect_to_database(),
                asyncio.shield(self.start_websocket_server()),
                asyncio.shield(self.listen_pg_notifications())
            )
            
        except KeyboardInterrupt:
            #log or handle
            pass
        finally:
            # Cleanup
            if self.websockets_server:
                self.websockets_server.close()
                await self.websockets_server.wait_closed()
            if self.pg_conn:
                self.pg_conn.close()
2.Socket class initialisation and database connection

Websocket Class and the function that connects to database using credentials.



notification_channel = "market_data_channel"  # variable set when creating database trigger

class SocketServer:
    def __init__(self) -> None:
        self.pg_conn = None
        self.pg_cur = None
        self.websockets_server = None

    async def connect_to_database(self):
            self.pg_conn = psycopg2.connect(
                dbname=db_name,
                user=username,
                password=password,
                host=db_server,
                port=db_port
            )
            self.pg_conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
            self.pg_cur = self.pg_conn.cursor()
            self.pg_cur.execute(f"LISTEN {notification_channel};")
3.Listening to the notification channel and processing the data

This is the function where listens to the notification channel and calls the functions to process the data to process_notification() passing the data from notification channel.

async def listen_pg_notifications(self):
        while True:
            await asyncio.sleep(1)  # Polling delay
            self.pg_conn.poll()
            while self.pg_conn.notifies:
                notify = self.pg_conn.notifies.pop(0)
                await self.process_notification(notify.payload)

    async def process_notification(self, payload):

        data = json.loads(payload)
        data_dict = {"process your {data} from database"}
        await self.broadcast(data_dict)
async def listen_pg_notifications(self):
        while True:
            await asyncio.sleep(1)  # Polling delay
            self.pg_conn.poll()
            while self.pg_conn.notifies:
                notify = self.pg_conn.notifies.pop(0)
                await self.process_notification(notify.payload)

    async def process_notification(self, payload):

        data = json.loads(payload)
        data_dict = {"process your {data} from database"}
        await self.broadcast(data_dict)
4.Broadcast to clients

This function is responsible for handling the logic of broadcasting the data to all clients that connected to websocket

    async def broadcast(self, data_dict: dict):
        # example how we can send data to clients for their requirement
        user_data = data_dict.get("user_data")
        if user_data is not None:
            for client, subscribed_user_data in list(self.clients.items()):
                if client.open and user_data in subscribed_user_data:
                    await client.send(json.dumps(data_dict))