Error publishing update message to Kafka topic on Source Connector

I get this message when I launch MongoDB source connector and perform an update on source DB:

“WARN No topic set. Could not publish the message (…)”

Here it is my config:

name=MongoDBSourceConnector
tasks.max=1
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
connection.uri=******
database=mydb
collection=mycollection
publish.full.document.only=true
copy.existing=false
topic.namespace.map={"mydb.mycollection\": "reports"}
change.stream.full.document=updateLookup
offset.partition.name=reports-partition
errors.tolerance=all
pipeline=[{$match: {"fullDocument.vertical": "test-vertical"}},{$project:{"_id":1,"doc_number": { $ifNull: [ { $arrayElemAt: [ "$fullDocument.docfiles", 0 ] }, "" ] }, "doc_date": "$fullDocument.docDate", "product_name": "$fullDocument.product", "sold_date": "$fullDocument.order.paymentStatusUpdatedAt", "partner": "$fullDocument.utm.source", "vertical": "$fullDocument.vertical", "quoteId": "$fullDocument.quoteId"}}]

Previous to this error, I have launched the connector with this same config only changing the settings for copying existing data. Then I relaunched it with this config (for some reason, if I launch the connector with both “pipeline” and “copy.existing.pipeline” set at the same time, the existing data is not published to Kafka but if I remove the “pipeline” config, everything goes OK.
So, now I’m at the stage why I don’t understand what am I doing wrong on configuration, when relaunching the connector that is causing the error “WARN No topic set. Could not publish the message (…)” that results in that I don’t get ant update messages pushed to Kafka.

Have you managed to fix this one so far? I’m getting the same issue and I’m at my wits end trying to source the issue

No, I didn’t. What I believe is that for some reason data reaches pipeline stage with a different payload depending on if it from copying existing data or if it is for updating data. My workaround was to set only pipeline config and then trigger an update on existing documents with data that I further remove, like a field with a flag.
Another important thing is keeping the “_id” field at top level when using “$project” operator. So, here it is a working config for the pipeline:

pipeline=[{"$match":{"operationType":{"$in":["insert","update","replace"]}, "fullDocument.vertical": "some-vertical", "fullDocument.utm.source": "some-utm-source"}},{"$project":{"_id":1, "fullDocument.policy_number": { $ifNull: [ { $arrayElemAt: [ "$fullDocument.policies", 0 ] }, "" ] }, "fullDocument.policy_date": "$fullDocument.coverageStart", "fullDocument.product_name": "$fullDocument.planId", "fullDocument.quoteId": 1,"ns":1,"documentKey":1}}]

I did notice the same - The payload coming from copy.existing doesn’t respect the output formatter (in my case I have SimplifiedJson) so I’m getting payload containing $oid, $date etc which shouldn’t be there.

I’m thinking this could be a bug with topic.namespace.map used alongside copy.existing as the errors states No topic set.

Why is the _id important to be at root level?

1 Like

Hi all,

Sorry to hear there have been some issues using the connector. I hope I can point you in the right direction and if there are any bugs we can get them logged in Jira for fixing.

The copy.existing.pipeline was designed for users who wanted to limit the data being copied or ensure the copying process used the correct index. The pipeline configuration is still applied to the copying data process, and both the pipelines are combined to produce the final result.

Could that be the issue causing the topic mapping to fail as it produces unexpected output when copying?

That shouldn’t be the case as the formatter is applied outside the copy data manager. I haven’t heard any reports of this before - but if you can reproduce the issue please log a Kafka ticket and I’ll investigate.

Not had a report of this, I think it could be the combining of pipelines causing the issue, but please provide a minimal example and file a Kafka ticket if not so I can investigate.

The _id field in change stream events contains the resume token. This is used by the connector to ensure the connector is resilient to some Kafka outage (eg. network issues / topography changes).

When copying data its important as it also contains a copying flag, so the connector knows the data is from the copying process and not a change stream. Again this is only important if there is some Kafka outage during the copying phase.

I hope that helps.

Ross

2 Likes

Hi @Ross_Lawley, thanks so much for replying.

You were right, it was a pipeline problem. In my case, I was only keeping a few of the fields from the change stream to keep the object size small but that was messing with the copy.existing. I solved my issue by not excluding them.

Thanks for explaining the copy.existing part too, very helpful!

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.