Hi, We are using mongo source connector in our environment. Some of our data are complex. Example: Document having an array say for example students which have 100k-150K ids and the source connectors when it processes multiple documents in a short time frame gets out of memory error. We were able to reproduce the error by following the steps below.
- I have a document with slightly greater size (>8MB) and in the document i have an array say for example studentids which has 100k-150K ids . “studentids” : [NumberLong(“906019125703444”),NumberLong(“326026735808036”), …] etc
- In connector configuration I have “change.stream.full.document”: “updateLookup” since we need full document for every update
- I have a small utility which updates the document in a loop for around 1000 times.
- The above exercise results in an OOM exception which is attached. Even though the error happens at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:192) we have a hunch the issue is manifested and the issue is with how connector/mongo internals handles this type of data especially huge arrays.
This results in below error:
[2020-09-01 13:46:05,201] INFO WorkerSourceTask{id=mongo-source-1-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-09-01 13:46:05,201] INFO WorkerSourceTask{id=mongo-source-1-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-09-01 13:46:05,201] DEBUG WorkerSourceTask{id=mongo-source-1-0} Finished offset commitOffsets successfully in 0 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-09-01 13:46:05,203] ERROR WorkerSourceTask{id=mongo-source-1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
at java.lang.StringBuffer.append(StringBuffer.java:270)
at java.io.StringWriter.write(StringWriter.java:101)
at org.bson.json.StrictCharacterStreamJsonWriter.write(StrictCharacterStreamJsonWriter.java:368)
at org.bson.json.StrictCharacterStreamJsonWriter.preWriteValue(StrictCharacterStreamJsonWriter.java:288)
at org.bson.json.StrictCharacterStreamJsonWriter.writeStartObject(StrictCharacterStreamJsonWriter.java:203)
at org.bson.json.ExtendedJsonInt64Converter.convert(ExtendedJsonInt64Converter.java:22)
at org.bson.json.ExtendedJsonInt64Converter.convert(ExtendedJsonInt64Converter.java:19)
at org.bson.json.JsonWriter.doWriteInt64(JsonWriter.java:174)
at org.bson.AbstractBsonWriter.writeInt64(AbstractBsonWriter.java:447)
at org.bson.codecs.BsonInt64Codec.encode(BsonInt64Codec.java:36)
at org.bson.codecs.BsonInt64Codec.encode(BsonInt64Codec.java:28)
at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
at org.bson.codecs.BsonArrayCodec.encode(BsonArrayCodec.java:82)
at org.bson.codecs.BsonArrayCodec.encode(BsonArrayCodec.java:37)
at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
at org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:136)
at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:115)
at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:41)
at org.bson.internal.LazyCodec.encode(LazyCodec.java:38)
at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
at org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:136)
at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:115)
at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:41)
at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
at org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:136)
at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:115)
at org.bson.BsonDocument.toJson(BsonDocument.java:835)
at org.bson.BsonDocument.toJson(BsonDocument.java:825)
at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:192)
We did create a ticket with mongo source connector but they advised us to follow up with support. This is the ticket we created with mongo source connector: https://jira.mongodb.org/browse/KAFKA-151. We did try increasing the heap size and tuning the "poll.max.batch.size " configuration but we still hit the issue. Please let us know for additional information. Thanks in advance.