Read change streams with multiple threads

Hi everyone,

I have a question. Is there anyway to read change streams with multiple threads?

Ex: we have 3 messages in change streams
messages = [A, B, C]
and my client has 2 threads. I want these 2 threads can receive messages in parallel. Ex:
thread 1: receive [A, C]
thread 2: receive [B]

Assume that I don’t care about the ordering of messages, i.e: timestamp. I just want to read as fast as possible.

You can have multiple thread receiving different change stream message if the query used to set up the stream is different.

If the query is the same then all receive the message.

If you want to load balance you can easily do it internally or with any message queue framework.

Basically, I want multiple threads can receive different messages in parallel from a change streams (ex: change streams from a specific collection in MongoDB).
It seems that there’s no way to do that.

My problem is that I have 10.000 updates/sec in MongoDB. In my client program, I want to receive these 10.000 messages via change streams (FULL_DOCUMENT, each document is around 100KB) with Java API of change streams. I want to speed this process up by using multiple threads (Assume that I don’t care about the ordering of messages).

please revise the documentation because you can do the above

Thank Steeve for your reply.

Could you please give me the link to that documentation? I’ve got stuck for a few days for this problem and couldn’t find any helpful link.

  • Another thing, I’m not sure why I can’t edit my previous comment. I want to correct my problem a little bit here.

I have 10k changes/sec in MongoDB. In my client program, I want to receive these 10k changes (fullDocument, each doc is around 100KB, so it’s around 1GB data/sec) with Java API of change streams. I want to speed this process up by using multiple threads (assume that I don’t care about the ordering of messages). Below is a normal way to work with change streams. How to speed it up?

        MongoCollection<Document> collection = db.getCollection("my_collection");
        ChangeStreamIterable<Document> iterable = collection.watch();

        iterable.forEach(new Consumer<ChangeStreamDocument<Document>>() {
            @Override
            public void accept(ChangeStreamDocument<Document> doc) {
                // do something with doc
            }
        });

Documentation can be found with google search.

https://www.google.com/search?q=mongodb+change+streams

You write

but you share the code

MongoCollection<Document> collection = db.getCollection("my_collection");
ChangeStreamIterable<Document> iterable = collection.watch();

which does exactly

Oh Steeve, maybe my explanation is pretty bad. What I want is “multiple threads can receive different messages in parallel from a change streams”. You can see the problem (10k changes/sec) that I described above.

The key problem is I want “mutiple threads” and “parallel”.

I mean I want to receive messages in parallel from change stream of something like: database, a single collection, multiple collections. So I wrote “ex: change streams from a specific collection”.

Of course, I know how to receive message in single thread as same as the code that I shared. It’s easy.

As I originally wrote:

yeah, so there’s no way to parallel the following code

        ChangeStreamIterable<Document> iterable = collection.watch();
        iterable.forEach(new Consumer<ChangeStreamDocument<Document>>() {
            @Override
            public void accept(ChangeStreamDocument<Document> doc) {
                // do something with doc
            }
        });

If we want to parallelize, we have to as follows but it’s very tricky

ChangeStreamIterable<Document> iterable1 = collection.watch(query1); // query1 can be id in a specific range
ChangeStreamIterable<Document> iterable2 = collection.watch(query2); 

I don’t understand this idea

The main problem is how to parallelize if the query is the same? How to use message queue here? My pain point is that if the query is the same, we won’t be able to properly parallelize it without tricky. The bottleneck is if it’s the same query, the speed of reading from change stream is pretty bad.

How is that tricky? The queries are different, they will receive different change stream events.

ChangeStreamThread thread_query_1 = new ChangeStreamThread( query1 ).start() ;
ChangeStreamThread thread_query_2 = new ChangeStreamThread( query2 ).start() ;
class ChangeStreamThread extends Thread
{
  Document m_Query ;

  ChangeStreamThread( Document query )
  {
    m_Query = query ;
  }
  public void run()
  {
     // What ever code you have to process the events matching m_Query from the stream
  }
}

Like you wrote:

As for

See how to use message queues - Google Search.

If reading becomes the bottleneck you will have to distribute the load among different machines rather than multiple threads.

1 Like

Thank Steevej for your response.

The problem is it’s just a collection. Normally, we use a change stream for this collection as follows:

ChangeStreamIterable<Document> iterable = collection.watch();

The tricky is I split the change stream to multiple change streams by queries as I wrote (note that I just have 1 collection). So which queries should be chose?

ChangeStreamIterable<Document> iterable1 = collection.watch(query1); // query1 can be id in a specific range
ChangeStreamIterable<Document> iterable2 = collection.watch(query2); 

Of course, I can use 2 threads here with this 2 queries. The trick is I have to think about some queries to split a change stream to multiple change streams.

So I want to use the same query (no query) as follows.

ChangeStreamIterable<Document> iterable = collection.watch();

How can I parallelize/use multi threads here?

My question is HOW to use it in MY USE CASE.

There’re 2 things here:

  1. My client program

  2. MongoDB

Which thing did you suggest to scale? client or mongo or both? Note that it’s just a single collection.
Maybe you suggests to distribute this collection to multiple mongo nodes (machines) and then will use different clients (on different machines), right?

I haven’t had experiences about mongo. Sorry if there’re some stupid questions.