Robert Walters

30 results

Streaming Time-Series Data Using Apache Kafka and MongoDB

There is one thing the world agrees on and it is the concept of time. Many applications are heavily time-based. Consider solar field power generation, stock trading, and health monitoring. These are just a few of the plethora of applications that produce and use data that contains a critical time component. In general, time-series data applications are heavy on inserts, rarely perform updates and are even more unlikely to delete the data. These applications generate a tremendous amount of data and need a robust data platform to effectively manage and query data. With MongoDB, you can easily: Pre-aggregate data using the MongoDB Query language and window functions Optimally store large amounts of time-series data with MongoDB time-series collections Archive data to cost effective storage using MongoDB Atlas Online Archive Apache Kafka is often used as an ingestion point for data due to its scalability. Through the use of the MongoDB Connector for Apache Kafka and the Apache Kafka Connect service, it is easy to transfer data between Kafka topics and MongoDB clusters. Starting in the 1.6 release of the MongoDB Connector for Apache Kafka, you can configure kafka topic data to be written directly into a time-series collection in MongoDB. This configuration happens in the sink. Configuring time series collections in the sink With MongoDB, applications do not need to create the database and collection before they start writing data. These objects are created automatically upon first arrival of data into MongoDB. However, a time-series collection type needs to be created first before you start writing data. To make it easy to ingest time-series data into MongoDB from Kafka, these collection options are exposed as sink parameters and the time-series collection is created by the connector if it doesn’t already exist . Some of the new parameters are defined as follows: timeseries.timefield Name of the top level field used for time. timeseries.expire.after.seconds This optional field determines the amount of time the data will be in MongoDB before being automatically deleted. Omitting this field means data will not be deleted automatically. If you are familiar with TTL indexes in MongoDB, setting this field provides a similar behavior. timeseries.timefield.auto.convert This optional field tells the connector to convert the data in the field into a BSON Date format. Supported formats include integer, long, and string. For a complete list of the new time-seris parameters check out the MongoDB Sink connector online documentation . When data is stored in time-series collections, MongoDB optimizes the storage and bucketization of your data behind the scenes. This saves a tremendous amount of storage space compared to the typical one document per data point data structure in regular collections. You can also explore the many new time and window functionalities within the MongoDB Query Language. For example, consider this sample document structure: { tx_time: 2021-06-30T15:47:31.000Z, _id: '60dc921372f0f39e2cd6cba5', company_name: 'SILKY CORNERSTONE LLC', price: 94.0999984741211, company_symbol: 'SCL' } You can use the new $setWindowFields pipeline to define the window of documents to perform an operation on then perform rankings, cumulative totals, and other analytics of complex time series data. For example, using the data generated in the tutorial, let’s determine the rolling average to the data as follows: db.StockDataTS.aggregate( [ { $match: {company_symbol: 'SCL'} }, { $setWindowFields: { partitionBy: '$company_name', sortBy: { 'tx_time': 1 }, output: { averagePrice: { $avg: "$price", window: { Documents: [ "unbounded", "current" ] } } } } } ]) A sample of the result set is as follows: { tx_time: 2021-06-30T15:47:45.000Z, _id: '60dc922172f0f39e2cd6cbeb', company_name: 'SILKY CORNERSTONE LLC', price: 94.06999969482422, company_symbol: 'SCL', averagePrice: 94.1346669514974 }, { tx_time: 2021-06-30T15:47:47.000Z, _id: '60dc922372f0f39e2cd6cbf0', company_name: 'SILKY CORNERSTONE LLC', price: 94.1500015258789, company_symbol: 'SCL', averagePrice: 94.13562536239624 }, { tx_time: 2021-06-30T15:47:48.000Z, _id: '60dc922472f0f39e2cd6cbf5', company_name: 'SILKY CORNERSTONE LLC', price: 94.0999984741211, company_symbol: 'SCL', averagePrice: 94.13352966308594 } Notice the additional “averagePrice” field is now populated with a rolling average. For more information on time-series collection in MongoDB check out the online documentation . Migrating existing collections To convert an existing MongoDB collection to a time-series collection you can use the MongoDB Connector for Apache Kafka. Simply configure the source connection to your existing collection and configure the sink connector to write to a MongoDB time series collection by using the “timeseries.timefield” parameter. You can configure the source connector to copy existing data by setting the “copy.existing” parameter to true. This will create insert events for all existing documents in the source. Any documents that were inserted during the copying process will be inserted once the copying process has finished. While not always possible, it is recommended to pause writes to the source data while the copy process is running. To see when it finishes, you can view the logs for the message, “Finished copying existing data from the collection(s).”. For example, consider a source document that has this structure: { company_symbol: (STRING), company_name: (STRING), price: (DECIMAL), tx_time: (STRING) } For the initial release of MongoDB Time series collections, the field that represents the time is required to be stored as a Date. In our example, we are using a string to showcase the ability for the connector to automatically convert from a string to a Date. If you chose to perform the conversion outside of the connector you could use a Single Message Transform in Kafka Connect to convert the string into a Date at the Sink. However, certain SMTs like Timestampconverter require schemas to be defined for the data in the Kafka topic in order to work. This may add some complexity to the configuration. Instead of using an SMT you can automatically convert into Dates using the new timeseries.timefield.auto.convert, and timeseries.timefield.auto.convert.date.format options. Here is a sample source configuration that will copy all the existing data from the StockData collection then continue to push data changes to the stockdata.Stocks.StockData topic: {"name": "mongo-source-stockdata", "config": { "tasks.max":"1", "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "publish.full.document.only": true, "connection.uri":(MONGODB SOURCE CONNECTION STRING), "topic.prefix":"stockdata", "database":"Stocks", "collection":"StockData", "copy.existing":"true" }} This is a sample configuration for the sink to write the data from the stockdata.Stocks.StockData topic to a MongoDB time series collection: {"name": "mongo-sink-stockdata", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "tasks.max":"1", "topics":"stockdata.Stocks.StockData", "connection.uri":(MONGODB SINK CONNECTION STRING), "database":"Stocks", "collection":"StockDataMigrate", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "timeseries.timefield":"tx_time", "timeseries.timefield.auto.convert":"true", "timeseries.timefield.auto.convert.date.format":"yyyy-MM-dd'T'HH:mm:ss'Z'" }} In this sink example, the connector will convert the data in the “tx_time” field into a Date and parse it expecting the string format yyyy-MM-ddTHH:mm:ssZ (e.g. '2021-07-06T12:25:45Z') Note that in the initial version of time-series collections, only insert into a time-series collection is supported. Updating or deleting documents on the source will not propagate to the destination. Also, you can not use the MongoDB CDC Handler in this scenario because the handler uses ReplaceOne which is a type of update command. These are limitations of the initial release of time-series in MongoDB and may be irrelevant by the time you read this post. Check the online documentation for the latest information. The MongoDB Connector for Apache Kafka version 1.6 is available to download from GitHub . Look for it on the Confluent Hub later this week!

July 13, 2021

Exploring Data with MongoDB Atlas, Databricks, and Google Cloud

MongoDB Atlas supports Google Cloud (GC), enabling you to easily spin up managed MongoDB clusters within GC in minutes. We’re excited to share that Databricks recently launched Databricks on GC, giving customers the freedom to move and analyze their data within GC and MongoDB Atlas. With the latest update to Databricks, it’s now easier to get started with a cloud-first approach on GC that leverages MongoDB Atlas with its flexible data model designed for modern applications and Databricks for more advanced analytics use cases. The following tutorial illustrates how to use MongoDB Atlas on GC and Databricks. We’ll use sample sales data in MongoDB Atlas and calculate the rolling average using Databricks on GC. This tutorial covers the following: How to read data from MongoDB Atlas on GC into Spark How to run the MongoDB Connector for Spark as a library in Databricks How to use the PySpark libraries to perform rolling averages of sales data How to write these averages back to MongoDB so they are accessible to applications Create Databricks Workspace To provision a new Databricks workspace, you will need to have a GC project already created. If you do not already have a Databricks cluster deployed on GC, follow the online documentation to create one. Note: It is important to follow the documentation, because there are a few key settings you will need to make in your GC project, such as enabling container.googleapis.com, storage.googleapis.com, and deploymentmanager.googleapis.com services and adjusting certain Google Cloud quotas before creating your Databricks cluster. In this example we have already created the Google Cloud project mongodb-supplysales and are ready to go to the Google Marketplace and add Databricks to our project. Within your Google project, click on “Marketplace” and enter “Databricks” in the search box. Click on the resulting tile and follow the instructions. Once your Databricks cluster is created, navigate to the Databricks cluster with the URL provided. Here you can create a new workspace. Once you’ve created your workspace, you will be able to launch it from the URL provided: Logging into your workspace brings up the following welcome screen: In this article, we will create a notebook to read data from MongoDB and use the PySpark libraries to perform the rolling average calculation. We can create our Databricks cluster by selecting the “+ Create Cluster” button from the Clusters menu. Note: For the purposes of this walkthrough we chose only one worker and preemptible instances; in a production environment you would want to include more workers and autoscaling. Before we create our cluster, we have the option under Advanced Options to provide Spark configuration variables. One of the common settings for Spark config is to define spark.mongodb.output.uri and spark.mongodb.input.uri . First we need to create the MongoDB Atlas cluster so we have a connection string to enter for these values. At this point, open a new browser tab and navigate to MongoDB Atlas. Prepare a MongoDB Atlas Instance Once in the MongoDB Atlas portal, you will need to do the following before you can use Atlas with Databricks: Create your MongoDB Atlas cluster Define user credentials for use in the Spark connector Define network access Add sample data (optional for this article) Create Your MongoDB Atlas Cluster If you already have a MongoDB Atlas account, log in and create a new Atlas cluster. If you do not have an account, you can set up a free cluster at the following URL: https://www.mongodb.com/cloud . Once your account is set up, you can create a new Atlas cluster by using the “+ New Cluster” dialog. MongoDB provides a free tier for Google Cloud. Once you provide a cluster name and click on “create,” Atlas will take approximately five to seven minutes to create your Atlas cluster. Define Database Access By default there are no users created in an Atlas cluster. To create an identity for our Spark cluster to connect to MongoDB Atlas, launch the “Add New Database User” dialog from the Database Access menu item. Notice that there are three options for authentication to MongoDB Atlas: Password, Certificate, and AWS IAM authentication. Select “Password,” and enter a username and password. Atlas provides granular access control: For example, you could restrict this user account to work only with a specific Atlas cluster or define the account as temporary and have Atlas expire within a specific time period. Defining Network Access MongoDB Atlas does not allow any connection from the internet by default. You need to include MongoDB Atlas as part of a VPC peering or AWS PrivateLink configuration. If you do not have that set up with your cloud provider, you need to specify from which IP addresses Atlas can accept incoming connections. You can do this via the “Add IP Address” dialog in the Network Access menu. In this article, we will add “0.0.0.0,” allowing access from anywhere, because we don’t know specifically which IP our Databricks cluster will be running on. MongoDB Atlas can also make this IP access list temporary, which is great for situations where you need to allow access from anywhere. Add Sample Data Now that we have added our user account and allowed network access to our Atlas cluster, we need to add some sample data. Atlas provides several sample collections that are accessible from the menu item on the cluster. In this example, we will use the sales collection within the sample_supplies database. Update Spark Configuration with Atlas Connection String Copy the MongoDB Atlas connection string by clicking on the Connect button and selecting “Connect your application.” Copy the contents of the connection string and note the placeholders for username and password . You will have to change those to your own credentials. Return to your Databricks workspace. Under Advanced Options in your Databricks workspace, paste the connection string for both the spark.mongodb.output.uri and spark.mongodb.input.uri variables. Note that you will need to update the credentials in the MongoDB Atlas connection string with those you defined previously. For simplicity in your PySpark code, change the default database in the connection string from MyFirstDatabase to sample_supplies. (This is optional, because you can always define the database name via Spark configuration options at runtime.) Start the Databricks Cluster Now that your Spark config is set, start the cluster. Note: If the cluster fails to start, check the event log and view the JSON tab. This is an example error message you will receive if you forgot to increase the SSD storage quota: Add MongoDB Spark Connector Once the cluster is up and running, click on “Install New” from the Libraries menu. Here we have a variety of ways to create a library, including uploading a JAR file or downloading the Spark connector from Maven. In this example, we will use Maven and specify org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 as the coordinates. Click on “Install” to add our MongoDB Spark Connector library to the cluster. Note: If you get the error message “Maven libraries are only supported on Databricks Runtime version 7.3 LTS, and versions >= 8.1,” you can download the MongoDB Spark Connector JAR file from https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/3.0.1/ and then upload it to Databricks by using the Upload menu option. Create a New Notebook Click on the Databricks home icon from the menu and select “Create a blank notebook.” Attach this new notebook to the cluster you created in the previous step. Because we defined our MongoDB connection string as part of the Spark conf cluster configuration, your notebook already has the MongoDB Atlas connection context. In the first cell, paste the following: from pyspark.sql import SparkSession pipeline="[{'$match': { 'items.name':'printer paper' }}, {'$unwind': { path: '$items' }}, {'$addFields': { totalSale: { \ '$multiply': [ '$items.price', '$items.quantity' ] } }}, {'$project': { saleDate:1,totalSale:1,_id:0 }}]" salesDF = spark.read.format("mongo").option("collection","sales").option("pipeline", pipeline).option("partitioner", "MongoSinglePartitioner").load() Run the cell to make sure you can connect the Atlas cluster. Note: If you get an error such as “MongoTimeoutException,” make sure your MongoDB Atlas cluster has the appropriate network access configured. The notebook gave us a schema view of what the data looks like. Although we could have continued to transform the data in the Mongo pipeline before it reached Spark, let’s use PySpark to transform it. Create a new cell and enter the following: from pyspark.sql.window import Window from pyspark.sql import functions as F salesAgg=salesDF.withColumn('saleDate', F.col('saleDate').cast('date')).groupBy("saleDate").sum("totalSale").orderBy("saleDate") w = Window.orderBy('saleDate').rowsBetween(-7, 0) df = salesAgg.withColumn('rolling_average', F.avg('sum(totalSale)').over(w)) df.show(truncate=False) Once the code is executed, the notebook will display our new dataframe with the rolling averages column: It is this cell where we will provide some additional transformation of the data such as grouping the data by saleDate and provide a summation of the totalSale per day. Once the data is in our desired format, we define a window of time as the past seven entries and then add a column to our data frame that is a rolling average of the total sales data. Once we have performed our analytics, we can write the data back to MongoDB for additional reporting, analytics, or archiving. In this scenario, we are writing the data back to a new collection called sales-averages: df.write.format("mongo").option("collection","sales-averages").save() You can see the data by using the Collections tab within the MongoDB Atlas cluster UI. WIth the data in MongoDB Atlas, you can now leverage many of the services available, including Atlas Online Archive, Atlas Search, and Atlas Data Lake. Summary The integration between MongoDB Atlas, Google Cloud, and Databricks enables you to gain deep insights into your data and gives you freedom to move and analyze data as your needs evolve. Check out the resources below for more information: Getting started with MongoDB Atlas MongoDB Spark Connector MongoDB Atlas on Google Cloud

May 11, 2021

How to Get Started with MongoDB Atlas and Confluent Cloud

Every year more and more applications are leveraging the public cloud and reaping the benefits of elastic scale and rapid provisioning. Forward-thinking companies such as MongoDB and Confluent have embraced this trend, building cloud-based solutions such as MongoDB Atlas and Confluent Cloud that work across all three major cloud providers. Companies across many industries have been leveraging Confluent and MongoDB to drive their businesses forward for years. From insurance providers gaining a customer-360 view for a personalized experience to global retail chains optimizing logistics with a real-time supply chain application, the connected technologies have made it easier to build applications with event-driven data requirements. The latest iteration of this technology partnership simplifies getting started with a cloud-first approach, ultimately improving developer’s productivity when building modern cloud-based applications with data in motion. Today, the MongoDB Atlas source and sink connectors are generally available within Confluent Cloud. With Confluent’s cloud-native service for Apache Kafka® and these fully managed connectors, setup of your MongoDB Atlas integration is simple. There is no need to install Kafka Connect or the MongoDB Connector for Apache Kafka, or to worry about scaling your deployment. All the infrastructure provisioning and management is taken care of for you, enabling you to focus on what brings you the most value — developing and releasing your applications rapidly. Let’s walk through a simple example of taking data from a MongoDB cluster in Virginia and writing it into a MongoDB cluster in Ireland. We will use a python application to write fictitious data into our source cluster. Step 1: Set up Confluent Cloud First, if you’ve not done so already, sign up for a free trial of Confluent Cloud . You can then use the Quick Start for Apache Kafka using Confluent Cloud tutorial to create a new Kafka cluster. Once the cluster is created, you need to enable egress IPs and copy the list of IP addresses. This list of IPs will be used as an IP Allow list in MongoDB Atlas. To locate this list, select “Custer Settings” and then the “Networking” tab. Keep this tab open for future reference: you will need to copy these IP addresses into the Atlas cluster in Step 2. Step 2: Set Up the Source MongoDB Atlas Cluster For a detailed guide on creating your own MongoDB Atlas cluster, see the Getting Started with Atlas tutorial. For the purposes of this article, we have created an M10 MongoDB Atlas cluster using the AWS cloud in the us-east-1 (Virginia) data center to be used as the source, and an M10 MongoDB Atlas cluster using the AWS cloud in the eu-west-1 (Ireland) data center to be used as the sink. Once your clusters are created, you will need to configure two settings in order to make a connection: database access and network access. Network Access You have two options for allowing secure network access from Confluent Cloud to MongoDB Atlas: You can use AWS PrivateLink, or you can secure the connection by allowing only specific IP connections from Confluent Cloud to your Atlas cluster. In this article, we cover securing via IPs. For information on setting up using PrivateLink, read the article Using the Fully Managed MongoDB Atlas Connector in a Secure Environment . To accept external connections in MongoDB Atlas via specific IP addresses, launch the “IP Access List” entry dialog under the Network Access menu. Here you add all the IP addresses that were listed in Confluent Cloud from Step 1. Once all the egress IPs from Confluent Cloud are added, you can configure the user account that will be used to connect from Confluent Cloud to MongoDB Atlas. Configure user authentication in the Database Access menu. Database Access You can authenticate to MongoDB Atlas using username/password, certificates, or AWS identity and access management (IAM) authentication methods. To create a username and password that will be used for connection from Confluent Cloud, select the “+ Add new Database User” option from the Database Access menu. Provide a username and password and make a note of this credential, because you will need it in Step 3 and Step 4 when you configure the MongoDB Atlas source and sink connectors in Confluent Cloud. Note: In this article we are creating one credential and using it for both the MongoDB Atlas source and MongoDB sink connectors. This is because both of the clusters used in this article are from the same Atlas project. Now that the Atlas cluster is created, the Confluent Cloud egress IPs are added to the MongoDB Atlas Allow list, and the database access credentials are defined, you are ready to configure the MongoDB Atlas source and MongoDB Atlas sink connectors in Confluent Cloud. Step 3: Configure the Atlas Source Now that you have two clusters up and running, you can configure the MongoDB Atlas connectors in Confluent Cloud. To do this, select “Connectors” from the menu, and type “MongoDB Atlas” in the Filters textbox. Note: When configuring MongoDB Atlas source And MongoDB Atlas sink, you will need the connection host name of your Atlas clusters. You can obtain this host name from the MongoDB connection string. An easy way to do this is by clicking on the "Connect" button for your cluster. This will launch the Connect dialog. You can choose any of the Connect options. For purposes of illustration, if you click on “Connect using MongoDB Compass.” you will see the following: The highlighted part in the above figure is the connection hostname you will use when configuring the source and sink connectors in Confluent Cloud. Configuring the MongoDB Atlas Source Connector Selecting “MongoDbAtlasSource” from the list of Confluent Cloud connectors presents you with several configuration options. The “Kafka Cluster credentials” choice is an API-based authentication that the connector will use for authentication with the Kafka broker. You can generate a new API key and secret by using the hyperlink. Recall that the connection host is obtained from the MongoDB connection string. Details on how to find this are described at the beginning of this section. The “Copy existing data” choice tells the connector upon initial startup to copy all the existing data in the source collection into the desired topic. Any changes to the data that occur during the copy process are applied once the copy is completed. By default, messages from the MongoDB source are sent to the Kafka topic as strings. The connector supports outputting messages in formats such as JSON and AVRO. Recall that the MongoDB source connector reads change stream data as events. Change stream event metadata is wrapped in the message sent to the Kafka topic. If you want just the message contents, you can set the “Publish full document only” output message to true. Note: For source connectors, the number of tasks will always be “1”: otherwise you will run the risk of duplicate data being written to the topic, because multiple workers would effectively be reading from the same change stream event stream. To scale the source, you could create multiple source connectors and define a pipeline that looks at only a portion of the collection. Currently this capability for defining a pipeline is not yet available in Confluent Cloud. Step 4: Generate Test Data At this point, you could run your python data generator application and start inserting data into the Stocks.StockData collection at your source. This will cause the connector to automatically create the topic “demo.Stocks.StockData.” To use the generator, git-clone the stockgenmongo folder in the above-referenced repository and launch the data generation as follows: python stockgen.py -c "< >" Where the MongoDB connection URL is the full connection string obtained from the Atlas source cluster. An example connection string is as follows: mongodb+srv://kafkauser:kafkapassword123@democluster.lkyil.mongodb.net Note: You might need to pip-install pymongo and dnspython first. If you do not wish to use this data generator, you will need to create the Kafka topic first before configuring the MongoDB Atlas sink. You can do this by using the Add a Topic dialog in the Topics tab of the Confluent Cloud administration portal. Step 5: Configuring the MongoDB Atlas Sink Selecting “MongoDB Atlas Sink” from the list of Confluent Cloud connectors will present you with several configuration options. After you pick the topic to source data from Kafka, you will be presented with additional configuration options. Because you chose to write your data in the source by using JSON, you need to select “JSON” in the input message format. The Kafka API key is an API key and secret used for connector authentication with Confluent Cloud. Recall that you obtain the connection host from the MongoDB connection string. Details on how to find this are described previously at the beginning of Step 3. The “Connection details” section allows you to define behavior such as creating a new document for every topic message or updating an existing document based upon a value in the message. These behaviors are known as document ID and write model strategies. For more information, check out the MongoDB Connector for Apache Kafka sink documentation . If order of the data in the sink collection is not important, you could spin up multiple tasks to gain an increase in write performance. Step 6: Verify Your Data Arrived at the Sink You can verify the data has arrived at the sink via the Atlas web interface. Navigate to the collection data via the Collections button. Now that your data is in Atlas, you can leverage many of the Atlas platform capabilities such as Atlas Search, Atlas Online Archive for easy data movement to low-cost storage, and MongoDB Charts for point-and-click data visualization. Here is a chart created in about one minute using the data generated from the sink cluster. Summary Apache Kafka and MongoDB help power many strategic business use cases, such as modernizing legacy monolithic systems, single views, batch processing, and event-driven architectures, to name a few. Today, Confluent and MongoDB Cloud and MongoDB Atlas provide fully managed solutions that enable you to focus on the business problem you are trying to solve versus spinning your tires in infrastructure configuration and maintenance. Register for our joint webinar to learn more!

May 6, 2021

MongoDB Connector for Apache Kafka 1.5 Available Now

Today, MongoDB has released version 1.5 of the MongoDB Connector for Apache Kafka! This article highlights some of the key features of this new release in addition to continuing to improve the overall quality & stability of the Connector . DeleteOne write model strategy When messages arrive on Kafka topics, the MongoDB Sink Connector reads them and by default will upsert them into the MongoDB cluster specified in the sink configuration. However, what if you didn’t want to always upsert them? This is where write strategies come in and provide you with the flexibility to define what you want to do with the document. While the concept of write strategies is not new to the connector, in this release there is a new write strategy available called DeleteOneBusinessKeyStrategy . This is useful for when a topic contains records identifying data that should be removed from a collection in the MongoDB sink. Consider the following: You run an online store selling fashionable face masks. As part of your architecture, the website sends orders to a Kafka topic, “web-orders” which upon message arrival kicks off a series of actions such as sending an email confirmation, and inserting the order details into an “Orders” collection in a MongoDB cluster. A sample Orders document: { _id: ObjectId("6053684f2fe69a6ad3fed028"), 'customer-id': 123, 'order-id': 100, order: { lineitem: 1, SKU: 'FACE1', quantity: 1 } } This process works great, however, when a customer cancels an order, we need to have another business process to update our inventory, send the cancellation, email and remove the order from our MongoDB sink. In this scenario a cancellation message is sent to another Kafka topic, “canceled-orders”. For messages in this topic, we don’t just want to upsert this into a collection, we want to read the message from the topic and use a field within the document to identify the documents to delete in the sink. For this example, let’s use the order-id key field and define a sink connector using the DeleteOneBusinessKeyStrategy as follows: "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics":"FaceMaskWeb.OrderCancel", "connection.uri":"mongodb://mdb1", "database":"FaceMaskWeb", "collection":"Orders", "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy", "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy", "document.id.strategy.partial.value.projection.type": "AllowList", "document.id.strategy.partial.value.projection.list": "order-id", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":false, "document.id.strategy.overwrite.existing": true Now when messages arrive in the “FakeMaskWeb.OrderCancel” topic, the “order-id” field is used to delete documents in the Orders collection. For example, using the sample document above, if we put this value into the OrderCancel topic { “order-id”: 100 } It would cause the document in the Orders collection with order-id and value 100 to be deleted. For a complete list of write model strategies check out the MongoDB Kafka Connector Sink documentation . Qlik Replicate Qlik Replicate is recognized as an industry leader in data replication and ingestion. With this new release of the Connector, you can now replicate and stream heterogeneous data from data sources like Oracle, MySQL, PostGres and others to MongoDB via Kafka and the Qlik Replicate CDC handler . To configure the MongoDB Connector for Apache Kafka to consume Qlik Replicate CDC events, use “com.mongodb.kafka.connect.sink.cdc.qlik.rdbms.RdbmsHandler” as the value for the change data capture handler configuration parameter. The handler supports, insert, refresh, read, update and delete events. Errant Record Reporting Kafka Connect, the service which manages connectors that integrate with a Kafka deployment, has the ability to write records to a dead letter queue (DLQ) topic if those records could not be serialized or deserialized. Starting with Apache Kafka version 2.6, there was added support for error reporting within the sink connectors. This gives sink connectors the ability to send individual records to the DLQ if the connector deems the records to be invalid or problematic. For example, if you are projecting fields in the sink that do not exist in the kafka message or if your sink is expecting a JSON document and the message arrives in a different format. In these cases an error is written to the DLQ versus failing the connector. Various Improvements As with every release of the connector, we are constantly improving the quality and functionality. This release is no different. You’ll also see pipeline errors now showing up in the connect logs, and the sink connector can now be configured to write to the dead letter queue! Next Steps Download the latest MongoDB Connector for Apache Kafka 1.5 from the Confluent Hub ! Read the MongoDB Connector for Apache Kafka documentation . Questions/Need help with the connector? Ask the Community . Have a feature request? Provide Feedback or a file a JIRA .

April 7, 2021

MongoDB Connector for Apache Kafka 1.4 Available Now

As businesses continue to embrace event-driven architectures and tackle Big Data opportunities, companies are finding great success integrating Apache Kafka and MongoDB. These two complementary technologies provide the power and flexibility to solve these large scale challenges. Today, MongoDB continues to invest in the MongoDB Connector for Apache Kafka releasing version 1.4! Over the past few months, we’ve been collecting feedback and learning how to best help our customers integrate MongoDB within the Apache Kafka ecosystem. This article highlights some of the key features of this new release. Selective Replication in MongoDB Being able to track just the data that has changed is an important use case in many solutions. Change Data Capture (CDC) has been available on the sink since the original version of the connector. However, up until version 1.4, the source for CDC events could only be sourced from MongoDB via the Debezium MongoDB Connector. WIth the latest release you can specify the MongoDB Change Stream Handler on the sink to read and replay MongoDB events sourced from MongoDB using the MongoDB Connector for Apache Kafka. This feature enables you to record insert, update, and delete activities on a namespace in MongoDB and replay them on a destination MongoDB cluster. In effect you have a lightweight way to perform basic replication of MongoDB data via Kafka. Let’s dive in and see what is happening under the hood. Recall that when the connector is used as a source to MongoDB, it starts a change stream on a specific namespace. Depending on how you configure the source connector, documents are written into a Kafka topic based on this namespace and pipeline that match your criteria. These documents are by default in the change stream event format . Here is a partial message in the Kafka topic that was generated from the following statement: db.Source.insert({proclaim: "Hello World!"}); { "schema": { "type": "string", "optional": false }, "payload": { "_id": { "_data": "82600B38...." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1611348141, "i": 2 } }, "fullDocument": { "_id": { "$oid": "600b38ad6011ef6265c3acd1" }, "proclaim": "Hello World!" }, "ns": { "db": "Tutorial3", "coll": "Source" }, "documentKey": { "_id": { "$oid": "600b38ad6011ef6265c3acd1" } } } } Now that our change stream message is in the Kafka topic, we can use the connector as a sink to read the stream of messages and replay them at the destination cluster. To set up the sink to consume these events, set the “change.data.capture.handler" to the new com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler property. Notice that one of the fields is “operationType”. The sink connector will only support insert, update and delete operations on the namespace and does not support actions like creation of database objects such as users, namespaces, indexes, views, and other metadata that occurs in more traditional replication solutions. In addition this capability is not intended as a replacement for a full featured replication system as it can not guarantee transactional consistency between the two clusters. That said, if all you are looking to do is move data and can accept its lack of consistency then you have a simple solution using the new ChangeStreamHandler. To work through a tutorial on this new feature, check out Tutorial 3 of the MongoDB Connector for Apache Kafka Tutorials in GitHub . Dynamic Namespace Mapping When we use the MongoDB connector as a sink we take data that resides on a Kafka Topic and insert it into a collection. Prior to 1.4, once this mapping is defined it isn’t possible to route topic data to another collection. In this release we added the ability to dynamically map a namespace to the contents of the kafka topic message. For example, consider a Kafka Topic “Customers.Orders” that contains the following messages: {"orderid":1,"country":"ES"} {"orderid":2,"country":"US"} We would like these messages to be placed in their own collection based upon the country value. Thus, the message with the field “orderid” that has a value of 1 will be copied in a collection called, “ES”. Likewise, the message with the field “orderid” that has a value of 2 will be copied to a collection called, “US”. To see how we configure this scenario, we will define a sink using the new namespace.mapper property configured with a value of “ com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper ”. Using this mapper, we can use a key or value field to determine the database and collection respectively. In our example above let’s define our config using the value of the country field as the collection name to sink to: '{"name": "mongo-dynamic-sink", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "topics":"Customers.Orders", "connection.uri":"mongodb://mongo1:27017,mongo2:27017,mongo3:27017", "database":"Orders", "collection":"Other" "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "namespace.mapper":"com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper", "namespace.mapper.value.collection.field":"country" }} Messages that do not have a country value will by default be written to the namespace defined in the configuration just like they would have been without the mapping. However, If you want messages that do not conform to the map to generate an error simply set the property namespace.mapper.error.if.invalid to true. This will raise an error and stop the connector when messages can not be mapped to a namespace due to missing fields or fields that are not strings. If you’d like to have more control over the namespace you can use the new “getNamespace” method of the interface com.mongodb.kafka.connect.sink.namespace.mapping.NamespaceMapper . Implementations of this method can implement more complex business rules and can access the SinkRecord or SinkDocument as part of the logic to determine the destination namespace. Dynamic Topic Mapping Once the source connector is configured, change stream events flow from the namespace defined in the connector to a Kafka Topic. The name of the Kafka Topic is made up of three configuration parameters: topic.prefix, database and collection. For example, if you had as part of your source connector configuration: “topic.prefix”:”Stocks”, “database”:”Customers”, “collection”:”Orders” The Kafka topic that would be created would be “Stocks.Customers.Orders”. However, what if you didn’t always want the events in the Orders collection to always go to this specific topic? What if you wanted to determine at run-time which topic a specific message should be routed to? In 1.4 you can now specify a namespace map that defines which kafka topic a namespace should be written to. For example, consider the following map: {"Customers": "CustomerTopic", "Customers.Orders": "Orders"} This will map all change stream documents from the Customers database to CustomerTopic.<collectionName> apart from any documents from the Customers.Orders namespace which map to the Orders topic. If you need to use complex business logic to determine the route, you can implement the getTopic method in the new TopicMapper class to handle this mapping logic. Also note that 1.4 introduced a topic.suffix configuration property in addition to the topic.prefix. Using our example above, you can configure “topic.prefix”:”Stocks”, “database”:”Customers”, “collection”:”Orders”, topics.suffix:”US” This will define the topic to write to as “Stocks.Customers.Orders.US” Next Steps Download the latest MongoDB Connector for Apache Kafka 1.4 from the Confluent Hub ! Read the MongoDB Connector for Apache Kafka documentation Questions/Need help with the connector? Ask the Community Have a feature request? Provide Feedback or a file a JIRA

February 9, 2021

Legacy Modernization with MongoDB and Confluent

In many organizations, crucial enterprise data is locked in dozens or hundreds of silos that may be, controlled by different teams, and stuck in systems that aren’t able to serve new workloads or access patterns. This is a blocker for innovation and insight ultimately hampering the business. For example, imagine building a new mobile app for your customers that enables them to view their account data in a single view. Designing the app could require months of time to simply navigate the internal processes necessary to gain access to the legacy systems and even more time to figure out how to integrate them. An Operational Data Layer, or ODL, can offer a “best of both worlds” approach, providing the benefits of modernization without the risk of a full rip and replace. Legacy systems are left intact – at least at first – meaning that existing applications can continue to work as usual without interruption. New or improved data consumers will access the ODL rather than the legacy data stores, protecting those stores from new workloads that may strain their capacity and expose single points of failure. At the same time, building an ODL offers a chance to redesign the application’s data model, allowing for new development and features that aren’t possible with the rigid tabular structure of existing relational systems. With an ODL, it’s possible to combine data from multiple legacy sources into a single repository where new applications, such as a customer single view or artificial intelligence processes, can access the entire corpus of data. Existing workloads can gradually shift to the ODL, delivering value at each step. Eventually, the ODL can be promoted to a system of record and legacy systems can be decommissioned. Read our blog covering DaaS with MongoDB and Confluent to learn more. There’s also a push today for applications and databases to be entirely cloud-based, but the reality is that current business applications are often too complex to be migrated easily or completely. Instead, many businesses are opting to move application data between on-premises and cloud deployments in an effort to leverage the full advantage of public cloud computing without having to undertake a complete, massive data lift-and-shift. Confluent can be used for both one-time and real-time data synchronization between legacy data sources and modern data platforms like MongoDB, whose fully managed global cloud database service, MongoDB Atlas , is supported across AWS, Google Cloud, and Azure. Confluent Platform can be self-managed in your own data center while Confluent Cloud can be used on the public clouds. Whether leaving your application on-premise is a personal choice or a corporate mandate, there are many good reasons to integrate with MongoDB Atlas. Bring your data closer to your users in more than 70 regions with Atlas’s global clusters Address your most intense workloads with one-click, automated sharding for scale out and zero-downtime scale up Quickly provision TBs of database storage, all on high performance SSDs with dedicated I/O bandwidth Natively query and analyze data across AWS S3 and MongoDB Atlas with MongoDB Atlas Data Lake Perform full-text search queries with MongoDB Atlas Search Build native mobile applications that seamlessly synchronize data with MongoDB Realm Create powerful visualizations and dashboards of your MongoDB data with MongoDB Charts Off-load older data to cost effective storage with MongoDB Atlas Online Archive In this video we will show one time migration and Real time continuous data synchronization from a Relational System to MongoDB Atlas using Confluent Platform and the MongoDB Connector for Apache Kafka . Also we will be talking about different ways to store and consume the data within MongoDB Atlas. Git repository for the demo is here . Learn more about the MongoDB and Confluent partnership here and download the joint Reference Architecture here . Click here to learn more about modernizing to MongoDB.

January 7, 2021