Kafka Connect mongo connector with CosmosDB Mongo API

Hi,
I’m trying to use the Kafka Connect MongoDB connector to sink data to CosmosDB with Mongo API.

When I create the connector, Connect returns an error immediately because the user doesn’t have the necessary privileges (insert, update, delete). However, it does have them, it’s just that CosmosDB seems to do something “peculiar” with Mongo RABC. For example, when running from Compass this:

db.runCommand({connectionStatus: 1, showPrivileges: true});

The result I’m getting is:

{
  authInfo: {
    authenticatedUsers: [],
    authenticatedUserRoles: [],
    authenticatedUserPrivileges: []
  },
  ok: 1
}

This is happening despite having activated the newly created (Role-based access control in Azure Cosmos DB API for MongoDB: Now in preview - Azure Cosmos DB Blog).

Does anybody have any idea or suggestion to make this work? I’m thinking if a small contribution to the mongo-kafka project adding a config setting that allowed disabling this check would be considered at all.

Thanks.

The CosmosDB API for MongoDB supports only a fraction of the available APIs that MongoDB does. Comparing Microsoft Cosmos DB And MongoDB | MongoDB. If you must use CosmosDB it might be better to ask the question on a Microsoft forum. If you’d like a self-hosted MongoDB database try MongoDB Atlas. https://www.mongodb.com/atlas/database. It works in Azure too.

1 Like

Hi Robert,

I’m aware of that post, and also the fact that it is not up to date; CosmosDB launched support for Mongo API 4.2 in Feb '22 and the post is based on the feature set in August '21. Also, the post doesn’t mention anything about RABC. In any case, I do need to use CosmosDB so it doesn’t really matter.

Regarding asking a Microsoft forum, AFAIK it is the MongoDB community building the Kafka Connect connector, not Microsoft. I’m used to address open-source developers directly in Github and/or specific chats instead of general forums. I assume tor developers might monitor this forum, since it’s linked in the Github readme.

Thanks.

You are correct in asking on this forum as the engineers for the connector are here however the connector is designed and tested against a MongoDB instance not CosmosDB or any other third party MongoDB API. Can you use the native CosmosDB connector for Kafka in your solution ?

2 Likes

Hi Robert,

We tried that connector first. As you mentioned, being a native connector made it the perfect candidate. However, it is a bit more immature than we were hoping.

For example, I personally fixed a bug where empty collections would cause it to fail: https://github.com/microsoft/kafka-connect-cosmosdb/pull/466

Considering how basic the scenario is, I didn’t feel me with confidence. I thought that MongoDB Connector would have seen more usage, be more polished and, assuming CosmosDB Mongo API lived up to its compatibility promise, a viable alternative.

@Javier_Holguera - Had no luck to connect to consmosDB with using above connector from Microsoft
released tag 1.4.0

receiving
PUT /connectors/cosmosdbpoc-<topicname>-mongo-sink-connector/config returned
        400 (Bad Request): Connector config {connector.class=null,
        connect.cosmos.connection.endpoint=https://<azurecomosdb>.documents.azure.com:443/,
        tasks.max=1, topics=<topic>,
        max.num.retries=1,
        connect.cosmos.containers.topicmap=<topic>#<collectionname>,
        input.data.format=PROTOBUF,
        value.converter.protoClassName=<protocolbuffer classname>,
        connect.cosmos.master.key=<primarykey>,
        key.converter.schemas.enable=false,
        value.converter.schemas.enable=false,
        name=cosmosdbpoc-<topicname>-mongo-sink-connector,
        connect.cosmos.databasename=<db_name>,
		max.batch.size=0,
        value.converter=com.blueapron.connect.protobuf.ProtobufConverter,
        retries.defer.timeout=10, rate.limiting.timeout=0,
        rate.limiting.every.n=0} contains no connector type
      reason: ConnectRestException

BadRequest shows the connector.class= null but have provided the class as
com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector

really appreciate if you could share the connection properties you have used ?

Hi @Javier_Holguera,

To help set your expectations correctly: the Cosmos DB API for MongoDB is an independent implementation emulating a subset of MongoDB features for the associated server version. Cosmos’ native interface is their SQL/Core API, and there are emulated APIs supporting wire protocols and approximate feature mapping for MongoDB, Cassandra, and Gremlin.

There are some differences in behaviour including Cosmos-specific Request Units (RUs), rate limiting, and error codes. Official MongoDB drivers and connectors are not currently tested against emulated APIs like Cosmos and there is quite a gap in core compatibility.

These compatibility caveats may be fine for your use case, but you should not expect full compatibility as these are different codebases and underlying implementations.

Regards,
Stennie

2 Likes

Hi @Javier_Holguera, have you solved this issue? I’m running into the exact same thing with the insert, update, delete privileges.

I found the answer in case anyone else comes across it. The problem is that the CosmosDB API for MongoDB does not support the connectionStatus command that Javier has shown: https://learn.microsoft.com/en-us/azure/cosmos-db/mongodb/feature-support-42

This same command is used by the Kafka connector sink code to validate the connection on startup. You can edit the connector to simply skip the user validation step if you want this to work. Change would go here → https://github.com/mongodb/mongo-kafka/blob/ee5edf317508da42a62917d32fb21c2d46660991/src/main/java/com/mongodb/kafka/connect/MongoSinkConnector.java#L105

I don’t know if the maintainers would be open to making the user validation step optional and exposing that in the configs?