Error running $unwind operation in mongo source connector pipeline

We are trying to break the mongo DB document into chunks in order to fit into Kafka message with the help of $unwind operation through MongoSourceConnector(pipeline aggregation).

org.apache.kafka.connect.errors.ConnectException: com.mongodb.MongoCommandException: Command failed with error 20 (IllegalOperation): ‘$unwind is not permitted in a $changeStream pipeline’ on server :27017. The full response is {“operationTime”: {"$timestamp": {“t”: 1614932863, “i”: 4}}, “ok”: 0.0, “errmsg”: “$unwind is not permitted in a $changeStream pipeline”, “code”: 20, “codeName”: “IllegalOperation”, “$clusterTime”: {“clusterTime”: {"$timestamp": {“t”: 1614932863, “i”: 4}}, “signature”: {“hash”: {"$binary": {“base64”: “x5sWtboaMhg5aSSMWLYNswP3zKE=”, “subType”: “00”}}, “keyId”: 6880913173017264129}}}
at com.mongodb.kafka.connect.source.MongoSourceTask.setCachedResultAndResumeToken(MongoSourceTask.java:508)

Kindly suggest if this is a supported feature through MongoSourceConnector or do we have any workaround for above use case.

HI @vinay_murarishetty,

The Source connector uses change streams functionality to provide change stream events. MongoDB only supports certain aggregation stages when using change streams:

See: https://docs.mongodb.com/manual/changeStreams/#modify-change-stream-output for more information.

Other pipeline stages are not supported by MongoDB so can’t be used with the connector.

Ross

2 Likes

Is there any alternative way to achive above use case if not with pipeline ?

Hi @vinay_murarishetty,

Unfortunately, if you are hitting the 16MB limit then the only option is to reduce the amount of data the change stream cursor produces. Publishing both the fullDocument and updateDescription for very large documents could be the cause.

Ross

Thanks for the information.

myaarpmedicare