I have an open-ended question with a reasonably broad scope before I move on to a more specific question I have. I am fairly new to developing applications so please forgive any noobness.
Context:
I am writing a Python Socket Server to accept multiple connections from QRadar event processors. These endpoints forward JSON logs over TCP. I need to ingest each log into MongoDB (the database name depends on the domainName field in each log).
I do something like this to insert data:
log: dict = json.loads(decoded_received_data_line)
log["createdAt"] = datetime.now()
customer = log.get("domainName")
database = mongoclient.get_database(customer)
#I create a Dump Collection for each client. I will later run aggregation on this Dump collection.
collection_name = "Dump"
database.get_collection(collection_name).insert_one(log)
Question 1: Is there a better way to achieve flawless data transfer between QRadar and MongoDB? Any useful middleware? MongoDB will be my data lake and is for internal use only. What does a continuously running socketserver look like in production? How to properly and professionally deploy it? Any suggestions, books, courses, or articles are welcome. I expect 15,000 logs per second each around 1Kb in size.
The rest of my code is posted below.
mongoclient.py
from pymongo import MongoClient
import os
import urllib
from dotenv import load_env
os.loadenv(".env")
HOST = os.environ.get("MONGO_HOST")
PORT = int(os.environ.get("MONGO_PORT"))
MONGO_USER = os.environ.get("MONGO_USER")
MONGO_PWD = urllib.parse.quote_plus(os.environ.get("MONGO_PWD"))
class MyMongoClient:
def __init__(self):
self.client: MongoClient = MongoClient(f"mongodb://{MONGO_USER}:{MONGO_PWD}@{HOST}:{PORT}")
self.coll = "Dump"
def get_client(self):
return self.client
def insert_data(self, log: dict):
customer = log.get("domainName").replace(" ", "")
database = self.client.get_database(customer)
document_id = database.get_collection(self.coll).insert_one(log)
print(f"ID: {document_id.inserted_id}")
connectionhandler.py
import json
import socketserver
import threading
from mongoclient import MyMongoClient
from datetime import datetime
def handle_line(data: str, address: str, mongo_client: MyMongoClient):
stripped_data = data.removeprefix("<01>- hostname ")
log: dict = json.loads(stripped_data)
log["createdAt"] = datetime.now()
log["eventProcessor"] = address[0]
mongo_client.insert_data(log)
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
daemon_threads = True
allow_reuse_address = True
class ConnectionHandler(socketserver.StreamRequestHandler):
def handle(self):
mongo_client = MyMongoClient()
client = f"{self.client_address} on {threading.current_thread().name}"
print(f"Connected: {client}")
while True:
data = self.rfile.readline().decode("utf-8")
if not data:
print(f"DATA STREAM WAS EMPTY: {data}")
break
else:
handle_line(data, self.client_address[0], mongo_client)
main.py
from connectionhandler import ThreadedTCPServer, ConnectionHandler
from dotenv import load_env
import os
loadenv(".env")
SERVER_HOST = os.environ.get("SERVER_HOST")
SERVER_PORT = os.environ.get("SERVER_PORT")
def main():
with ThreadedTCPServer((SERVER_HOST, SERVER_PORT), ConnectionHandler) as server:
server.serve_forever()
if __name__ == '__main__':
main()
This solution gets the data into MongoDB as required but I encounter TCP ZERO WINDOW according to Wireshark. I understand this means that my buffer fills up and my application cannot process the data fast enough. Unfortunately, I have not found any other way but to insert a single document at a time into MongoDB since I need to know which customer the data is for.
I have tried an async version as well to do the same. But the performance is much lower (measuring performance by eyeballing the Ethernet Speed in Task Manager. With Multithreading it is around 14-20Mbps and with Async it is 5-12Mbps)
Question 2: How can I make an Async Multithreaded Server to achieve my goal and professionally deploy it into production so that it can run forever? We don’t have devOps at all so no pipelines or anything. I am willing to setup something small just for my team if someone can guide me.