I have a question. Is there anyway to read change streams with multiple threads?
Ex: we have 3 messages in change streams messages = [A, B, C]
and my client has 2 threads. I want these 2 threads can receive messages in parallel. Ex:
thread 1: receive [A, C]
thread 2: receive [B]
Assume that I don’t care about the ordering of messages, i.e: timestamp. I just want to read as fast as possible.
Basically, I want multiple threads can receive different messages in parallel from a change streams (ex: change streams from a specific collection in MongoDB).
It seems that there’s no way to do that.
My problem is that I have 10.000 updates/sec in MongoDB. In my client program, I want to receive these 10.000 messages via change streams (FULL_DOCUMENT, each document is around 100KB) with Java API of change streams. I want to speed this process up by using multiple threads (Assume that I don’t care about the ordering of messages).
Could you please give me the link to that documentation? I’ve got stuck for a few days for this problem and couldn’t find any helpful link.
Another thing, I’m not sure why I can’t edit my previous comment. I want to correct my problem a little bit here.
I have 10k changes/sec in MongoDB. In my client program, I want to receive these 10k changes (fullDocument, each doc is around 100KB, so it’s around 1GB data/sec) with Java API of change streams. I want to speed this process up by using multiple threads (assume that I don’t care about the ordering of messages). Below is a normal way to work with change streams. How to speed it up?
MongoCollection<Document> collection = db.getCollection("my_collection");
ChangeStreamIterable<Document> iterable = collection.watch();
iterable.forEach(new Consumer<ChangeStreamDocument<Document>>() {
@Override
public void accept(ChangeStreamDocument<Document> doc) {
// do something with doc
}
});
Oh Steeve, maybe my explanation is pretty bad. What I want is “multiple threads can receive different messages in parallel from a change streams”. You can see the problem (10k changes/sec) that I described above.
The key problem is I want “mutiple threads” and “parallel”.
I mean I want to receive messages in parallel from change stream of something like: database, a single collection, multiple collections. So I wrote “ex: change streams from a specific collection”.
Of course, I know how to receive message in single thread as same as the code that I shared. It’s easy.
yeah, so there’s no way to parallel the following code
ChangeStreamIterable<Document> iterable = collection.watch();
iterable.forEach(new Consumer<ChangeStreamDocument<Document>>() {
@Override
public void accept(ChangeStreamDocument<Document> doc) {
// do something with doc
}
});
If we want to parallelize, we have to as follows but it’s very tricky
ChangeStreamIterable<Document> iterable1 = collection.watch(query1); // query1 can be id in a specific range
ChangeStreamIterable<Document> iterable2 = collection.watch(query2);
I don’t understand this idea
The main problem is how to parallelize if the query is the same? How to use message queue here? My pain point is that if the query is the same, we won’t be able to properly parallelize it without tricky. The bottleneck is if it’s the same query, the speed of reading from change stream is pretty bad.
How is that tricky? The queries are different, they will receive different change stream events.
ChangeStreamThread thread_query_1 = new ChangeStreamThread( query1 ).start() ;
ChangeStreamThread thread_query_2 = new ChangeStreamThread( query2 ).start() ;
class ChangeStreamThread extends Thread
{
Document m_Query ;
ChangeStreamThread( Document query )
{
m_Query = query ;
}
public void run()
{
// What ever code you have to process the events matching m_Query from the stream
}
}
The tricky is I split the change stream to multiple change streams by queries as I wrote (note that I just have 1 collection). So which queries should be chose?
ChangeStreamIterable<Document> iterable1 = collection.watch(query1); // query1 can be id in a specific range
ChangeStreamIterable<Document> iterable2 = collection.watch(query2);
Of course, I can use 2 threads here with this 2 queries. The trick is I have to think about some queries to split a change stream to multiple change streams.
So I want to use the same query (no query) as follows.
Which thing did you suggest to scale? client or mongo or both? Note that it’s just a single collection.
Maybe you suggests to distribute this collection to multiple mongo nodes (machines) and then will use different clients (on different machines), right?
I haven’t had experiences about mongo. Sorry if there’re some stupid questions.
I’m also experimenting with consuming change streams from multiple threads, and I guess it could be possible to do like this:
MongoCollection<SomeModel> events = test.getCollection("some-model", SomeModel.class);
ChangeStreamIterable<SomeModel> watch = events.watch();
ExecutorService executorService = Executors.newSingleThreadExecutor(); // adjust the thread pool as preferred
executorService.execute(() -> {
watch.forEach(csd -> {
System.out.printnl("changeStreamDocument: " + csd); // this logic is called in each thread
});
});
I’d like to correct what I wrote in the previous post:
to consume the change stream from >1 threads, probably it’s best to use 1 thread to dispatch to a queue and then consume that queue with multiple consumers.
The main issue when attempting to directly consume the change stream from 2 threads is that, since the watch() query would be the same, both will receive the same changes.