Create a Data Pipeline for MongoDB Change Stream Using Pub/Sub BigQuery Subscription
Rate this tutorial
On 1st October 2022, MongoDB and Google announced a set of open source for moving data between MongoDB and BigQuery to run analyses on BigQuery using BQML and to bring back inferences to MongoDB. Three templates were introduced as part of this release, including the MongoDB to BigQuery CDC (change data capture) template.
This template requires users to run the change stream on MongoDB, which will monitor inserts and updates on the collection. These changes will be captured and pushed to a . The CDC template will create a job to read the data from the topic and get the changes, apply the transformation, and write the changes to BigQuery. The transformations will vary based on the user input while running the Dataflow job.
Alternatively, you can use a native Pub/Sub capability to set up a data pipeline between your MongoDB cluster and BigQuery. The Pub/Sub BigQuery subscription writes messages to an existing BigQuery table as they are received. Without the BigQuery subscription type, you need a pull or push subscription and a subscriber (such as Dataflow) that reads messages and writes them to BigQuery.
This article explains how to set up the BigQuery subscription to process data read from a MongoDB change stream. As a prerequisite, you’ll need a MongoDB Atlas cluster.
On Google Cloud, we will create a Pub/Sub topic, a BigQuery dataset, and a table before creating the BigQuery subscription.
Then, add a new table in your dataset. Define it with a name of your choice and the following schema:
Next, we’ll configure a Pub/Sub schema and topic to ingest the messages from our MongoDB change stream. Then, we’ll create a subscription to write the received messages to the BigQuery table we just created.
For this section, we’ll use the Google Cloud Pub/Sub API. Before proceeding, make sure you have enabled the API for your project.
Provide an appropriate identifier, such as “mdb-to-bq-schema,” to your schema. Then, select “Avro” for the type. Finally, add the following definition to match the fields from your BigQuery table:
From the sidebar, navigate to “Topics” and click on Create a topic.
Give your topic an identifier, such as “MongoDBCDC.” Enable the Use a schema field and select the schema that you just created. Leave the rest of the parameters to default and click on Create Topic.
From inside the topic, click on Create new subscription. Configure your subscription in the following way:
- Provide a subscription ID — for example, “mdb-cdc.”
- Define the Delivery type to Write to BigQuery.
- Select your BigQuery dataset from the dropdown.
- Provide the name of the table you created in the BigQuery dataset.
- Enable Use topic schema.
Keep the other fields as default and click on Create subscription.
Finally, we’ll set up a change stream that listens for new documents inserted in our MongoDB cluster.
First, set up a new Node.js project and install the following dependencies.
Then, add an Avro schema, matching the one we created in Google Cloud Pub/Sub:
Finally, we’ll define the
publishDocumentAsMessage() function that will:
- Transform every MongoDB document received through the change stream.
- Convert it to the data buffer following the Avro schema.
- Publish it to the Pub/Sub topic in Google Cloud.
Run the file to start the change stream listener:
Insert a new document in your MongoDB collection to watch it go through the data pipeline and appear in your BigQuery table!
There are multiple ways to load the change stream data from MongoDB to BigQuery and we have shown how to use the BigQuery subscription on Pub/Sub. The change streams from MongoDB are monitored, captured, and later written to a Pub/Sub topic using Java libraries.
The data is then written to BigQuery using BigQuery subscription. The datatype for the BigQuery table is set using Pub/Sub schema. Thus, the change stream data can be captured and written to BigQuery using the BigQuery subscription capability of Pub/Sub.