MongoDB Atlas and Live Chart Update on plotly-dash

Hi,
I am trying to build a customized dashboard using python plotly-dash which needs to get updated whenever a new record is inserted into the MongoDB Atlas collection on the cloud. I was looking for examples online but could not find any. For example, lets assume the data that gets inserted into mongoDB (asynchronously at random time intervals) is a time series data like:

{
"time" : "14.04.2022 21:50:59",
"balance" : 10500.50
}

I understand I have to use change_streams to watch for new insertions into the collection, but can anyone show a simple example of showing a simple plotly -DASH line chart & may be a plotly dash datatable that gets updated using plotly callbacks and pymongo.change_streams() whenever a new record is inserted into a MongoDB collection?

Best Regards,
Dilip

Hey

I just tried creating something similar today, and this is the solution I have come up with so far, using a Thread and a Queue to handle the dynamically updated data at intervals.

in app.py file:

from queue import Queue
from threading import Thread

import dash
import plotly.graph_objects as go
import pymongo
from dash import dcc, html
from dash.dependencies import Input, Output
from pymongo.change_stream import ChangeStream
from queue import Empty

client = pymongo.MongoClient("YourAtlasDB")
stream = client.graph_stream.scatter.watch()


def read_stream(stream: ChangeStream, q: Queue):
    for change in stream:
        x, y = change["fullDocument"]["x"], change["fullDocument"]["y"]
        q.put((x, y))


def update_figure(figure, q):
    try:
        x, y = q.get_nowait()
        figure["data"][0].x += tuple([x])
        figure["data"][0].y += tuple([y])
    except Empty:
        pass


x = [0]
y = [0]
graph = go.Scatter(x=x, y=y)
fig = go.Figure(graph)
fig.layout.autosize = True

q = Queue()

x = Thread(target=read_stream, args=(stream, q))
x.start()

app = dash.Dash()
app.layout = html.Div(
    [
        dcc.Graph(figure=fig, id="live-update-graph"),
        dcc.Interval(
            id="interval-component", interval=1 * 1000, n_intervals=0  # in milliseconds
        ),
    ]
)


@app.callback(
    Output("live-update-graph", "figure"), Input("interval-component", "n_intervals")
)
def update_graph(n):
    update_figure(fig, q)
    return fig


if __name__ == "__main__":
    app.run_server(debug=True, use_reloader=False)

And then an update_db.py file:

import pymongo
import time
client = pymongo.MongoClient("YourAtlasDB")
for x,y in zip(range(50), range(50)):
    db.scatter.insert_one({'x':x, 'y':y*y})
    time.sleep(1)
1 Like

Hi Tobias,
Thank you for your reply. I have some queries regarding your code:

client = pymongo.MongoClient("YourAtlasDB")
stream = client.graph_stream.scatter.watch()


def read_stream(stream: ChangeStream, q: Queue):
    for change in stream:
        x, y = change["fullDocument"]["x"], change["fullDocument"]["y"]
        q.put((x, y))

I can’t find any documentation for this graph_stream() pymongo function, all i could find was documentation for change_stream. So,
Can you please clarify what is your database name, collection name and field names in this script?

1.) Is graph_stream the name of your database in your MongoDB cluster?
2.) Is scatter the name of your collection in the graph_stream database?
3.) What is “fullDocument” and [“x”]? in the def read_stream() function?

A screenshot your mongoDB → Browse collections page with this collection selected, would also be helpful?

Lastly, in my case the x-axis is a time series and the y-axis a floating number, so any guidance on appropriate alterations of your code for that will be very helpful

Thanks and Best Regards,
Dilip