DecodeException when creating mongo connector

hi,
when i am creating a mongo conenctor in openshift, i get a DecodeException:
Failed to decode:No content to map due to end-of-input
at [Source: (io.netty.buffer.ByteBufInputStream); line 1, column: 0]
reason: DecodeException

my connector jar file is located in /opt/kafka/plugins and when i am specifying the class conenctor,
it recognizes the plugin as a valid conenctor type and yet i am still getting this error…
Has someone encountered this issue?

I can make some guesses but I am not completely sure. There are kafka connector builds that include all the dependencies like Avro, not sure if you used one that included dependencies or not. Im not sure which libraries are available by default on openshift out of the box.

I have tried to use the newest mongo plugin (mongo-kafka-1.5.1-all.jar),
and now the sink connector works properly but the source connector still throws the same decoding exception…
any ideas why the sink works and the source not?

Hi,
I haven’t received a reply,
if you have any idea for the cause of my error (only in source and not in sink) it would be great.
thanks

Hi @ori_iro,

Please post the full stack trace as that may provide some insight to the cause.

Also what version of Kafka and what version of MongoDB are you using?

Ross

I am using 2.5 version of kafka and my mongoDB version is 4.4.0
my connector plugin is 1.5.1 version. my stack trace:

javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class com.mongodb.kafka.connect.source.MongoSourceConfig 
	at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:408) 
	at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) 
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:365) 
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:318) 
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) 
	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:760) 
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:547) 
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) 
	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1607) 
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) 
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1297) 
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) 
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) 
	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1582) 
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) 
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349) 
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) 
	at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) 
	at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:179) 
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) 
	at org.eclipse.jetty.server.Server.handle(Server.java:516) 
	at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383) 
	at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556) 
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375) 
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:273) 
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) 
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) 
	at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:104) 
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336) 
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313) 
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171) 
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129) 
	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:375) 
	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:773) 
	at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:905) 
	at java.base/java.lang.Thread.run(Thread.java:834) 
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: Could not initialize class com.mongodb.kafka.connect.source.MongoSourceConfig 
	at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254) 
	at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236) 
	at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436) 
	at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261) 
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) 
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) 
	at org.glassfish.jersey.internal.Errors.process(Errors.java:292) 
	at org.glassfish.jersey.internal.Errors.process(Errors.java:274) 
	at org.glassfish.jersey.internal.Errors.process(Errors.java:244) 
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) 
	at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232) 
	at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680) 
	at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) 
	... 35 more 

I have been trying to add mongo-driver-sync jar file and it didnt work.
My sink connector works but the source connector not…

I forgot to tag you @Ross_Lawley @Robert_Walters

@ori_iro, thats strange - theres nothing Kafka related in the stacktrace, which I’d expect to see.

The sink and source connector share the same jar, so I don’t know what is happening here to prevent it being loaded for one and not the other.

There is a lot of jetty / jersey code is the error coming from a web UI? Also the error doesn’t appear related to the initial decode error - has that been fixed?

Finally, in the 1.6.0 release the jar packages were updated so now the mongo-kafka-connect-1.6.0-all.jar contain all the dependencies, which are needed for non confluent kafka connect implementations. It also includes the avro dependencies as well as the driver. Does updating it to use the mongo-kafka-connect-1.6.0-all.jar work?

Ross

1 Like

I’m working in Openshift 4.5 with strimzi 0.18 operator.
the logs are provided from the kafka connect cluster (from a console).
the decode exception is written in the connector (with state: Not Ready and reason DecodeException).
Something interesting happend when i tried to add avro and mongo driver jar file to the same plugin path of the mongo-conenct jar file;
suddenly the sink connector stopped working and show me the same decode exception in the connector status (both the sink and source show the same exception).
What could cause MongoSourceConfig initialization error?
and I will also try the new connector

OK, looks like multiple issues here.

I think first step would be to clean up the class path - remove any jars that were added (mongo-driver-sync, avro etc…). Then add the 1.6.0-all jar and see what errors (if any) occur then.

Ross

1 Like

@Ross_Lawley @Robert_Walters I tried to use the 1.6.0 connector and the problem is the same - the sink connector works and the source connector does not with reason of DecodeException.
The logs are pretty much the same with another error:
Caused by: java.lang.ClassNotFoundException: org.apache.avro.Schema
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
… 15 more

Added avro-1.3.2.jar to /opt/kafka/plugins (where the mongo plugin is) and nothing changed. (not sure if Avro is needed and if it is, does it matter the version of Avro)

In my docker file I take kafka 2.5.0 image, creating plugins directory (/opt/kafka/plugins)
and copying the mongo and avro plugins to that directory.

Hi @ori_iro,

Apologies, these seems to be harder to solve than needed! All the errors seem to be classpath related, which indicates something is missing from the classpath for the connector. Kafka uses per connector class loaders, so I think that maybe why just putting the dependency jars in /opt/kafka/plugins/ isn’t working.

There are 3 different 1.6.0 connector jar files:

I hope you will just need the mongo-kafka-connect-1.6.0-all.jar as it contains all the non Kafka dependencies needed for a non confluent Kafka install.

I think the classpath for connectors work as follows: /opt/kafka/plugins/<connector name>/lib/<connectors classpath>

So putting the jar file here hopefully will work: /opt/kafka/plugins/mongodb-kafka-connect/lib/mongo-kafka-connect-all.jar. As it will have all the dependencies needed within its own class loader.

Let me know if that solves the issue.

Ross

1 Like

@Ross_Lawley The decodeException no longer appears after I put the jar file under /opt/kafka/plugins/mongodb-kafka-connect-mongodb-1.6.0.
Now the sink connector works and the source connector status is ready and appears to be working, but the data has not been transferred to my topics…
this is my connector configurations:
database: myDatabase
collection: myCollection
connection.uri: *******
key.converter.schemas.enable: false
value.converter.schemas.enable: false
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
topic.prefix: myTopic

Hi @ori_iro,

Glad to hear you managed to get the jar loaded. Did it not work when added here: /opt/kafka/plugins/mongodb-kafka-connect/lib/mongo-kafka-connect-all.jar

I’m not sure what your source configuration is so impossible to tell why there’s nothing published to the topic. Next place to look would be the logs and see if they report anything.

Ross

@Ross_Lawley It also works with the path you mentioned.
I misunderstood the field of topic.prefix and thought its a prefix of topics which i want the data to be transferred to, instead of prefix.db.collection topic name.
I appreciate the help, thank you very much!

2 Likes