Stream to your Console from MongoDB

we write this new version(10) for streaming from mongodb anything is good but it terminate

bin/spark-submit

the code is :

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;

import static org.apache.spark.sql.functions.*;

import java.util.concurrent.TimeoutException;

public final class MongoStructuredStreaming {

  public static void main(final String[] args)  {
    /*
     * Create the SparkSession.
     * If config arguments are passed from the command line using --conf,
     * parse args for the values to set.
     */
    SparkSession spark = SparkSession.builder()
        .master("local")
        .appName("read_example")
        .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/matching-engine.orders")
        .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/matching-engine.orders")
        .getOrCreate();

    // define a streaming query
    DataStreamWriter<Row> dataStreamWriter = spark.readStream()
        .format("mongodb")
        .load()
        // manipulate your streaming data
        .writeStream()
        .format("console")
        .trigger(Trigger.Continuous("1 second"))
        .outputMode("append");
    // run the query
    try {
      StreamingQuery query = dataStreamWriter.start();

    } catch (TimeoutException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }

  }
}

no errors found but application terminate it not shoould happen

the console is like this

23/03/09 08:23:52 INFO ContinuousExecution: Starting [id = 841c2f80-37f1-48ea-86f3-> 049166c84652, runId = d6a59ee6-44b4-4887-8c4a-b1d6f1b68019]. Use file:/C:/Users/joobin/AppData/Local/Temp/temporary-7336f1d4-9457-4c92-891b-f39ee54771b5 to store the query checkpoint.
23/03/09 08:23:52 INFO ContinuousExecution: Starting new streaming query.
23/03/09 08:23:52 INFO SparkContext: Invoking stop() from shutdown hook
23/03/09 08:23:52 INFO SparkUI: Stopped Spark web UI at http://DESKTOP-KK0D0F9:4041
23/03/09 08:23:52 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/03/09 08:23:52 INFO MemoryStore: MemoryStore cleared
23/03/09 08:23:52 INFO BlockManager: BlockManager stopped
23/03/09 08:23:52 INFO BlockManagerMaster: BlockManagerMaster stopped
23/03/09 08:23:52 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/03/09 08:23:52 INFO SparkContext: Successfully stopped SparkContext
23/03/09 08:23:52 INFO ShutdownHookManager: Shutdown hook called

the reading data from mongodb is ok but streaminng is not run in time

whats idea

alss two warining found

3/03/09 08:23:47 INFO MongoTable: Creating MongoTable: mongo-spark-connector-10.1.1
23/03/09 08:23:50 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
23/03/09 08:23:50 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn’t fail: C:\Users\joobin\AppData\Local\Temp\temporary-7336f1d4-9457-4c92-891b-f39ee54771b5. If it’s required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/09 08:23:51 INFO ResolveWriteToStream: Checkpoint root C:\Users\joobin\AppData\Local\Temp\temporary-7336f1d4-9457-4c92-891b-f39ee54771b5 resolved to file:/C:/Users/joobin/AppData/Local/Temp/temporary-7336f1d4-9457-4c92-891b-f39ee54771b5.
23/03/09 08:23:51 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/03/09 08:23:51 INFO CheckpointFileManager: Writing atomically to file:/C:/Users/joobin/AppData/Local/Temp/temporary-7336f1d4-9457-4c92-891b-f39ee54771b5/metadata using temp file file:/C:/Users/joobin/AppData/Local/Temp/temporary-7336f1d4-9457-4c92-891b-f39ee54771b5/.metadata.3aea750b-05fb-4779-b401-70322f8d1b85.tmp
23/03/09 08:23:51 INFO CheckpointFileManager: Renamed temp file file:/C:/Users/joobin/AppData/Local/Temp/temporary-7336fl/Temp/temporary-7336f1d4-9457-4c92-891b-f39ee54771b5/metadata
23/03/09 08:23:51 INFO ContinuousExecution: Reading table [MongoTable{schema=StructType(StructField(_id,StringType,true),StructField(amount,DoubleType,true),StructField(fill_amount,DoubleType,true),StructField(price,DoubleType,true),StructField(side,StringType,true),StructField(status,StringType,true),StructField(symbol,StringType,true),StructField(trader_id,StringType,true),StructField(trades,ArrayType(StructType(StructField(_id,StringType,true),StructField(amount,DoubleType,true),StructField(price,DoubleType,true),StructField(side,StringType,true),StructField(symbol,StringType,true),StructField(type,StringType,true)),true),true),StructField(type,StringType,true)), partitioning=, mongoConfig=MongoConfig{options=, usageMode=NotSet}}] from DataSourceV2 named ‘mongodb’ [com.mongodb.spark.sql.connector.MongoTableProvider@102f3f05]

Hi kube_ctl,

Can you please verify/try the following:

  1. is the db connection valid? Are you able to use mongo shell to connect to mongodb://127.0.0.1/matching-engine.orders successfully?
  2. Does the collection (matching-engine.orders) have data? the streaming query may not start if there is no data
  3. Can you try changing the configs for streaming (link) . ex: change output mode from “append” to {“complete” , “update”} , change the query trigger duration to a longer period like 5 secs instead of 1 sec
    (link)

With this can you share some additional error logs that may come up