Python Async Multithreader TCP Server to receive data from QRadar and ingest into MongoDB

I have an open-ended question with a reasonably broad scope before I move on to a more specific question I have. I am fairly new to developing applications so please forgive any noobness.

Context:
I am writing a Python Socket Server to accept multiple connections from QRadar event processors. These endpoints forward JSON logs over TCP. I need to ingest each log into MongoDB (the database name depends on the domainName field in each log).
I do something like this to insert data:

log: dict = json.loads(decoded_received_data_line)
log["createdAt"] = datetime.now()
customer = log.get("domainName")

database = mongoclient.get_database(customer)
#I create a Dump Collection for each client. I will later run aggregation on this Dump collection.
collection_name = "Dump" 
database.get_collection(collection_name).insert_one(log)

Question 1: Is there a better way to achieve flawless data transfer between QRadar and MongoDB? Any useful middleware? MongoDB will be my data lake and is for internal use only. What does a continuously running socketserver look like in production? How to properly and professionally deploy it? Any suggestions, books, courses, or articles are welcome. I expect 15,000 logs per second each around 1Kb in size.

The rest of my code is posted below.

mongoclient.py


from pymongo import MongoClient
import os
import urllib
from dotenv import load_env

os.loadenv(".env")

HOST = os.environ.get("MONGO_HOST")
PORT = int(os.environ.get("MONGO_PORT"))
MONGO_USER = os.environ.get("MONGO_USER")
MONGO_PWD = urllib.parse.quote_plus(os.environ.get("MONGO_PWD"))

class MyMongoClient:
    def __init__(self):
        self.client: MongoClient = MongoClient(f"mongodb://{MONGO_USER}:{MONGO_PWD}@{HOST}:{PORT}")
        self.coll = "Dump"

    def get_client(self):
        return self.client

    def insert_data(self, log: dict):
        customer = log.get("domainName").replace(" ", "")
        database = self.client.get_database(customer)
        document_id = database.get_collection(self.coll).insert_one(log)
        print(f"ID: {document_id.inserted_id}")

connectionhandler.py


import json
import socketserver
import threading
from mongoclient import MyMongoClient
from datetime import datetime


def handle_line(data: str, address: str, mongo_client: MyMongoClient):
    stripped_data = data.removeprefix("<01>- hostname ")
    log: dict = json.loads(stripped_data)
    log["createdAt"] = datetime.now()
    log["eventProcessor"] = address[0]
    mongo_client.insert_data(log)


class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    daemon_threads = True
    allow_reuse_address = True


class ConnectionHandler(socketserver.StreamRequestHandler):
    def handle(self):
        mongo_client = MyMongoClient()
        client = f"{self.client_address} on {threading.current_thread().name}"
        print(f"Connected: {client}")
        while True:
            data = self.rfile.readline().decode("utf-8")
            if not data:
                print(f"DATA STREAM WAS EMPTY: {data}")
                break
            else:
                handle_line(data, self.client_address[0], mongo_client)

main.py


from connectionhandler import ThreadedTCPServer, ConnectionHandler
from dotenv import load_env
import os

loadenv(".env")

SERVER_HOST = os.environ.get("SERVER_HOST")
SERVER_PORT = os.environ.get("SERVER_PORT")


def main():
    with ThreadedTCPServer((SERVER_HOST, SERVER_PORT), ConnectionHandler) as server:
        server.serve_forever()


if __name__ == '__main__':
    main()

This solution gets the data into MongoDB as required but I encounter TCP ZERO WINDOW according to Wireshark. I understand this means that my buffer fills up and my application cannot process the data fast enough. Unfortunately, I have not found any other way but to insert a single document at a time into MongoDB since I need to know which customer the data is for.

I have tried an async version as well to do the same. But the performance is much lower (measuring performance by eyeballing the Ethernet Speed in Task Manager. With Multithreading it is around 14-20Mbps and with Async it is 5-12Mbps)

Question 2: How can I make an Async Multithreaded Server to achieve my goal and professionally deploy it into production so that it can run forever? We don’t have devOps at all so no pipelines or anything. I am willing to setup something small just for my team if someone can guide me.

Hi @Vikram_Tatke1 and welcome to the MongoDB community forum!!

The best practice of inserting the data into the database depends on various combinations of the the hardware and the drivers you are using.

Firstly, inserting data into database would depend on how the document structure would looks like.

Secondly, in your use case, it also depends on the performance of the API layer/middleware you use. From the numbers you posted earlier, I believe it should be able to push about 15 MB per second of data.

Another major factor this would depend is based on the type of deployment you have in your setup.

However, to answer your questions,

If you need recommendations regarding middlewares that can connect QRadar to MongoDB, I believe you’ll get more opinions and experience in a programming-related sites such as StackOverflow or ServerFault

In terms of deploying a middleware/API layer on top of MongoDB, you can create your own like your Python code examples. However if you don’t have to use Python, a ready-made package like RestHeart which is Java based might be worth considering.

Let us know if you have any further queries.

Best Reagrds
Aasawari

1 Like

Hello Aasawari,
Thank you so much for your informative and helpful suggestions. Today, I explored RESTheart with Docker (First time using Docker so took my time to understand how it works). I am yet to fully understand how RESTheart benefits from change streams and implements concurrency.
It seems like something I can use with Docker in production. It is quick and easy to setup and monitor.
However, I can’t imagine how it will satisfy my use case, where I need to read data from a socket and ingest data on a per customer basis as mentioned in my post.

QRadar just forwards data to hostip:port. It keeps forwarding forever and I am not required to send any form of request. I only need to open the port and create a socket that listens on it

Can you guide me, if not through code, conceptually to how I may achieve this? It is a simple task but I can’t imagine how RESTheart will help me out here.

Hi @Vikram_Tatke1

Could you confirm if your use case matches with Adding Fowarding Destinations which makes it a good resource for what you are looking for.

Based on the above documentation, QRadar appears to make a TCP connection to a specified IP and begin sending events, which can be configured to be in JSON format.

I believe your assumption is right about having a middleware to receive the events and send them to MongoDB.

Having said that, we don’t really have an expertise on QRadar’s product. Specifically, we cannot confirm what’s the form of this data and the method that forwarding takes when seen from the client side. Perhaps the people in Qradar forum might have a better idea on this?

Let us know if you have any further questions.

Best Regards
Aasawari