The example data below is a reading from a power meter sensor (thing) calculating watts via a led pulse count … I have a topic per “thing” and about 50 things atm want to grow .
The data is being created in the kafka topic through a webthings.io adapter, so I dont have too much control, but was contemplating asking the developer if he could toggle JSON in the value
I have tried every combination of converters - even figuring out how to populate kafka schema to entertain arvo converter that resulted in NPE
Your second thought is interesting - I just have no clue how to implement it - or are these parameters for the properties file?
Given programming isnt anywhere near the top of my skils list … it would be good to guide the form of the document somehow - although it is IOT I was have expecting a pattern was available … perhaps that pattern is json in the value field show the data type of the reading - if so, I’ll put a request into the git for the webthings kafka adapter - I have been assisting testing the code
I’m using kafka-connect-standalone using
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSinkConnector.properties
The data in the topic looks like this:
~kafka/kafka/bin/kafka-console-consumer.sh --topic MySensors-81 --property print.key=true --property key.separator=: --from-beginning --bootstrap-server xxxxx.local:9092
81-0-17:1579
81-1-18:628.08
81-0-17:1650
81-1-18:628.09
81-0-17:1579
The connect-standolone.properties are
bootstrap.servers=xxxxx.local:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/kafka/plugins
The MongoSinkConnector.properties
name=mongo-sink
topics=MySensors-81
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
debug=true
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
connection.uri=mongodb://localhost/
database=Sensors
collection=sink
max.num.retries=1
retries.defer.timeout=5000
confluent.topic.security.protocol=“PLAINTEXT”
key.projection.type=none
key.projection.list=
value.projection.type=none
value.projection.list=
field.renamer.mapping=
field.renamer.regex=
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Write configuration
delete.on.null.values=false
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
max.batch.size = 0
rate.limiting.timeout=0
rate.limiting.every.n=0