package org.sample.test; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.streaming.Trigger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; public class SparkJob implements Serializable { private static final Logger log = LoggerFactory.getLogger(SparkJob.class); private AtomicReference> kafkaProducerAtomicReference = new AtomicReference<>(); public void run() throws TimeoutException, StreamingQueryException { SparkConf conf = new SparkConf() .setMaster("local[*]") .setAppName("SparkConnectorTest") .set("spark.driver.host", "localhost") .set("spark.streaming.backpressure.enabled","true"); //Entry point for spark SparkSession sparkSession = SparkSession .builder() .config(conf) .getOrCreate(); Dataset streamData = getMongoStream(sparkSession); getQuery(streamData, sparkSession).awaitTermination(); } private StreamingQuery getQuery(Dataset streamData, SparkSession sparkSession) throws TimeoutException { return streamData .writeStream() .trigger(Trigger.Continuous("10 seconds")) //10 seconds is the checkpoint interval. .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("checkpointLocation", "/tmp/test-job") .option("topic", "test-topic-2") .start(); } private Dataset getMongoStream(SparkSession sparkSession) { Map mongoDbConfig = new HashMap() {{ put("spark.mongodb.connection.uri", "mongodb://127.0.0.1:27017"); put("spark.mongodb.database", "test"); put("spark.mongodb.collection", "test"); }}; return sparkSession .readStream() .options(mongoDbConfig) .option("spark.mongodb.change.stream.publish.full.document.only", true) .format("mongodb") .load(); } }