Out of Memory Issue with source connector in certain scenario

Hi, We are using mongo source connector in our environment. Some of our data are complex. Example: Document having an array say for example students which have 100k-150K ids and the source connectors when it processes multiple documents in a short time frame gets out of memory error. We were able to reproduce the error by following the steps below.

  • I have a document with slightly greater size (>8MB) and in the document i have an array say for example studentids which has 100k-150K ids . “studentids” : [NumberLong(“906019125703444”),NumberLong(“326026735808036”), …] etc
  • In connector configuration I have “change.stream.full.document”: “updateLookup” since we need full document for every update
  • I have a small utility which updates the document in a loop for around 1000 times.
  • The above exercise results in an OOM exception which is attached. Even though the error happens at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:192) we have a hunch the issue is manifested and the issue is with how connector/mongo internals handles this type of data especially huge arrays.

This results in below error:

[2020-09-01 13:46:05,201] INFO WorkerSourceTask{id=mongo-source-1-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-09-01 13:46:05,201] INFO WorkerSourceTask{id=mongo-source-1-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-09-01 13:46:05,201] DEBUG WorkerSourceTask{id=mongo-source-1-0} Finished offset commitOffsets successfully in 0 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-09-01 13:46:05,203] ERROR WorkerSourceTask{id=mongo-source-1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
at java.lang.StringBuffer.append(StringBuffer.java:270)
at java.io.StringWriter.write(StringWriter.java:101)
at org.bson.json.StrictCharacterStreamJsonWriter.write(StrictCharacterStreamJsonWriter.java:368)
at org.bson.json.StrictCharacterStreamJsonWriter.preWriteValue(StrictCharacterStreamJsonWriter.java:288)
at org.bson.json.StrictCharacterStreamJsonWriter.writeStartObject(StrictCharacterStreamJsonWriter.java:203)
at org.bson.json.ExtendedJsonInt64Converter.convert(ExtendedJsonInt64Converter.java:22)
at org.bson.json.ExtendedJsonInt64Converter.convert(ExtendedJsonInt64Converter.java:19)
at org.bson.json.JsonWriter.doWriteInt64(JsonWriter.java:174)
at org.bson.AbstractBsonWriter.writeInt64(AbstractBsonWriter.java:447)
at org.bson.codecs.BsonInt64Codec.encode(BsonInt64Codec.java:36)
at org.bson.codecs.BsonInt64Codec.encode(BsonInt64Codec.java:28)
at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
at org.bson.codecs.BsonArrayCodec.encode(BsonArrayCodec.java:82)
at org.bson.codecs.BsonArrayCodec.encode(BsonArrayCodec.java:37)
at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
at org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:136)
at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:115)
at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:41)
at org.bson.internal.LazyCodec.encode(LazyCodec.java:38)
at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
at org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:136)
at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:115)
at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:41)
at org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
at org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:136)
at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:115)
at org.bson.BsonDocument.toJson(BsonDocument.java:835)
at org.bson.BsonDocument.toJson(BsonDocument.java:825)
at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:192)

We did create a ticket with mongo source connector but they advised us to follow up with support. This is the ticket we created with mongo source connector: https://jira.mongodb.org/browse/KAFKA-151. We did try increasing the heap size and tuning the "poll.max.batch.size " configuration but we still hit the issue. Please let us know for additional information. Thanks in advance.

  • Sounds like you have an infinite loop there or are seriously underestimating your heap requirements.
  • Hope you posted a source sample to support.
  • The Olde Programmer (me) intuits that your code is too complex and should be factored and the factored components thoroughly characterized and tested.

Also to add from the ticket:

With regards to the OOM error - the line in question converts the Change stream document into a raw json string. The polling mechanism in Source connectors batch up changes before publishing them to the topic. This can be configured by setting poll.max.batch.size which by default will try to batch 1,000 source records and publish them to the topic. Reducing this max batch size should prevent OOM errors.

With out error logs, configuration examples and JVM configuration I can’t provide more insight here.

What are your JVM settings? How much memory have you allocated to the heap? It sounds like you dont have enough. Lowering the poll.max.batch.size will reduce the size cached before polling. What did you try?

1 Like

Thanks for your comments. Please see below the steps to reproduce the issue. These are not the exact data but I was able to reproduce the issue following the steps

  1. Followed docker example https://docs.mongodb.com/kafka-connector/master/kafka-docker-example/ except I don’t have any connectors (Confluent Datagen Connector, MongoDB Kafka Sink Connector and MongoDB Kafka Source Connector that’s created in the script).
  2. Attached inserted JSON data : docker exec -i mongo1 sh -c 'mongoimport -c investigate1 -d test' < sample_data.json . As you can see the data/document has two huge arrays studentids and groupids.
  3. Register the source connector using the following configuration using the REST API.
  4. As you can see with above I need the full document for every update and I use the attached small java utility to update the document 1000 times

Regarding JVM Setting. I allocated 4G by adding this in docker-compose using the below config KAFKA_HEAP_OPTS: “-Xmx4G”. As mentioned I see the issue in prod environment where we use containers with 6G and allocated heap size of 5G. We did try lowering the batch size as low as 300 but still end up hitting the issue in some scenario.

Please let me know for additional information or questions regarding steps.

Files mentioned in above comments are added to this ticket: https://jira.mongodb.org/browse/KAFKA-151. I was not able to add files to this topic.