MongoDB & Data Streaming – Implementing a MongoDB Kafka Consumer

Data Streaming

In today's data landscape, no single system can provide all of the required perspectives to deliver real insight. Deriving the full meaning from data requires mixing huge volumes of information from many sources.

At the same time, we're impatient to get answers instantly; if the time to insight exceeds 10s of milliseconds then the value is lost – applications such as high frequency trading, fraud detection, and recommendation engines can't afford to wait. This often means analyzing the inflow of data before it even makes it to the database of record. Add in zero tolerance for data loss and the challenge gets even more daunting.

Kafka and data streams are focused on ingesting the massive flow of data from multiple fire-hoses and then routing it to the systems that need it – filtering, aggregating, and analyzing en-route.

This blog introduces Apache Kafka and then illustrates how to use MongoDB as a source (producer) and destination (consumer) for the streamed data. A more complete study of this topic can be found in the Data Streaming with Kafka & MongoDB white paper.

Apache Kafka

Kafka provides a flexible, scalable, and reliable method to communicate streams of event data from one or more producers to one or more consumers. Examples of events include:

  • A periodic sensor reading such as the current temperature
  • A user adding an item to the shopping cart in an online store
  • A Tweet being sent with a specific hashtag

Streams of Kafka events are organized into topics. A producer chooses a topic to send a given event to, and consumers select which topics they pull events from. For example, a financial application could pull NYSE stock trades from one topic, and company financial announcements from another in order to look for trading opportunities.

In Kafka, topics are further divided into partitions to support scale out. Each Kafka node (broker) is responsible for receiving, storing, and passing on all of the events from one or more partitions for a given topic. In this way, the processing and storage for a topic can be linearly scaled across many brokers. Similarly, an application may scale out by using many consumers for a given topic, with each pulling events from a discrete set of partitions.

Figure 1: Kafka Producers, Consumers, Topics, and Partitions

MongoDB As A Kafka Consumer – A Java Example

In order to use MongoDB as a Kafka consumer, the received events must be converted into BSON documents before they are stored in the database. In this example, the events are strings representing JSON documents. The strings are converted to Java objects so that they are easy for Java developers to work with; those objects are then transformed into BSON documents.

Complete source code, Maven configuration, and test data can be found further down, but here are some of the highlights; starting with the main loop for receiving and processing event messages from the Kafka topic:

MongoClient client = new MongoClient();
MongoDatabase db = client.getDatabase("clusterdb");
MongoCollection<Document> fishCollection = 
    db.getCollection("fish");
Gson gson = new Gson();
Type type = new TypeToken<Fish>() {}.getType();

// Receive and process all available messages 
// from the Kafka topic
for (MessageAndOffset messageAndOffset : 
    fetchResponse.messageSet(a_topic, 
        a_partition)) 
    {
    long currentOffset = 
        messageAndOffset.offset();
    ...
    readOffset = messageAndOffset.nextOffset();
    ByteBuffer payload = 
        messageAndOffset.message().payload();
    byte[] bytes = new byte[payload.limit()];
    payload.get(bytes);
    Fish incomingFish = gson.fromJson(new String(
        bytes, "UTF-8"), type);
    fishCollection.insertOne(
        incomingFish.getFishAsDocument());
    ...
    }
}

The Fish class includes helper methods to hide how the objects are converted into BSON documents:

public String getBreedAsString() {
    String breedString;
    switch (breed) {
        case Cod:  breedString = "Cod";
            break;
        case Goldfish:  breedString = 
            "Goldfish";
            break;
        case Bass:  breedString = "Bass";
            break;
        case Billy:  breedString = "Billy";
            break;
        case Kipper:  breedString = 
            "Kipper";
            break;
        case Turbot:  breedString = 
            "Turbot";
            break;
        default: breedString = 
            "Unknown breed";
            break;
    }
    return breedString;
};
public void setBreed(Breed breed) {
    this.breed = breed;
}

public Document getFishAsDocument() {
    Document fishDocument = new Document(
        "_id", getInternationalFishId())
            .append("name", getName())
            .append("breed", 
                getBreedAsString());
    return fishDocument;
};

In a real application more would be done with the received messages – they could be combined with reference data read from MongoDB, acted on and then passed along the pipeline by publishing to additional topics. In this example, the final step is to confirm from the mongo shell that the data has been added to the database:

db.fish.find()
{ "_id" : 93734195, "name" : "Gerald", 
    "breed" : "Turbot" }
{ "_id" : 71858458, "name" : "Douglas", 
    "breed" : "Turbot" }
{ "_id" : 25992945, "name" : "Louise", 
    "breed" : "Turbot" }
{ "_id" : 95476834, "name" : "Helen", 
    "breed" : "Kipper" }
{ "_id" : 27950146, "name" : "Ronald", 
    "breed" : "Goldfish" }
{ "_id" : 19155329, "name" : "Jerry", 
    "breed" : "Kipper" }
...

Full Java Code for MongoDB Kafka Consumer

Business Object – Fish.java

package com.clusterdb.kafka;

import org.bson.Document;

public class Fish {
    private int internationalFishId;
    private String name;
    private Breed breed;

    public enum Breed {
        Cod, Goldfish, Bass, Billy, Kipper, Turbot
    };

    public Fish(int internationalFishId, String name, Breed breed) {
        this.internationalFishId = internationalFishId;
        this.name = name;
        this.breed = breed;
    }

    public int getInternationalFishId() {
        return internationalFishId;
    }

    public void setInternationalFishId(int internationalFishId) {
        this.internationalFishId = internationalFishId;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Breed getBreed() {
        return breed;
    }

    public String getBreedAsString() {
        String breedString;
        switch (breed) {
            case Cod:  breedString = "Cod";
                break;
            case Goldfish:  breedString = "Goldfish";
                break;
            case Bass:  breedString = "Bass";
                break;
            case Billy:  breedString = "Billy";
                break;
            case Kipper:  breedString = "Kipper";
                break;            case Turbot:  breedString = "Turbot";
                break;
            default: breedString = "Unknown breed";
                break;
        }
        return breedString;
    };
    public void setBreed(Breed breed) {
        this.breed = breed;
    }

    public Document getFishAsDocument() {
        Document fishDocument = new Document("_id", getInternationalFishId())
                .append("name", getName())
                .append("breed", getBreedAsString());
        return fishDocument;
    };

    @Override
    public String toString() {
        return "Fish Object: {" +
                "internationalFishId=" + internationalFishId +
                ", name='" + name + '\'' +
                ", breed=" + breed +
                '}';
    }
}

Kafka Consumer for MongoDB – MongoDBSimpleConsumer.java

Note that this example consumer is written using the Kafka Simple Consumer API – there is also a Kafka High Level Consumer API which hides much of the complexity – including managing the offsets. The Simple API provides more control to the application but at the cost of writing extra code.

package com.clusterdb.kafka;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.lang.reflect.Type;

public class MongoDBSimpleConsumer {
    public static void main(String args[]) {
        MongoDBSimpleConsumer example = new MongoDBSimpleConsumer();
        //long maxReads = Long.parseLong(args[0]);
        long maxReads = 100;
        //String topic = args[1];
        String topic = "clusterdb-topic1";
        //int partition = Integer.parseInt(args[2]);
        int partition = 0;
        List<String> seeds = new ArrayList<String>();
        //seeds.add(args[3]);
        seeds.add("127.0.0.1");
        //int port = Integer.parseInt(args[4]);
        int port = 9092;
        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
            e.printStackTrace();
        }
    }

    private List<String> m_replicaBrokers = new ArrayList<String>();

    public MongoDBSimpleConsumer() {
        m_replicaBrokers = new ArrayList<String>();
    }

    public void run(long a_maxReads, String a_topic, int a_partition,
                    List<String> a_seedBrokers, int a_port) throws Exception {
        // find the meta data about the topic and partition we are interested in
        //
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;

        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
                100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer,a_topic, a_partition,
                kafka.api.OffsetRequest.EarliestTime(), clientName);

        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port,
                        100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder()
                    .clientId(clientName)
                    .addFetch(a_topic, a_partition, readOffset, 100000)
                    .build();
            FetchResponse fetchResponse = consumer.fetch(req);

            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:"
                        + leadBroker + " Reason: " + code);
                if (numErrors > 5) break;
                if (code == ErrorMapping.OffsetOutOfRangeCode())  {
                    // We asked for an invalid offset. For simple case ask
                    // for the last element to reset
                    readOffset = getLastOffset(consumer,a_topic, a_partition,
                            kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;
            long numRead = 0;

            MongoClient client = new MongoClient();
            MongoDatabase db = client.getDatabase("clusterdb");
            MongoCollection<Document> fishCollection = db.getCollection("fish");
            Gson gson = new Gson();
            Type type = new TypeToken<Fish>() {}.getType();

            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic,
                    a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset
                            + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                Fish incomingFish = gson.fromJson(new String(bytes, "UTF-8"), type);
                System.out.println(incomingFish);
                fishCollection.insertOne(incomingFish.getFishAsDocument());

                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null) consumer.close();
    }

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
                new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: "
                    + response.errorCode(topic, partition) );
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition,
                                 int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic,
                    a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give ZooKeeper
                // a second to recover second time, assume the broker did recover before failover,
                // or it was a non-Broker issue
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port,
                                         String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for ["
                        + a_topic
                        + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}
Maven Dependencies – pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.clusterdb</groupId>
  <artifactId>M101J</artifactId>
  <version>1.0-SNAPSHOT</version>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <packaging>jar</packaging>

  <name>M101J</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.mongodb</groupId>
      <artifactId>mongodb-driver</artifactId>
      <version>3.2.2</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.6.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
        <version>0.8.2.2</version>
    </dependency>
      <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-core</artifactId>
          <version>2.7.3</version>
      </dependency>

      <!-- Just the annotations; use this dependency if you want to attach annotations
           to classes without connecting them to the code. -->
      <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-annotations</artifactId>
          <version>2.7.3</version>
      </dependency>

      <!-- databinding; ObjectMapper, JsonNode and related classes are here -->
      <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-databind</artifactId>
          <version>2.7.3</version>
      </dependency>

      <!-- smile (binary JSON). Other artifacts in this group do other formats. -->
      <dependency>
          <groupId>com.fasterxml.jackson.dataformat</groupId>
          <artifactId>jackson-dataformat-smile</artifactId>
          <version>2.7.3</version>
      </dependency>
      <!-- JAX-RS provider -->
      <dependency>
          <groupId>com.fasterxml.jackson.jaxrs</groupId>
          <artifactId>jackson-jaxrs-json-provider</artifactId>
          <version>2.7.3</version>
      </dependency>
      <!-- Support for JAX-B annotations as additional configuration -->
      <dependency>
          <groupId>com.fasterxml.jackson.module</groupId>
          <artifactId>jackson-module-jaxb-annotations</artifactId>
          <version>2.7.3</version>
      </dependency>
  </dependencies>
</project>

Test Data – Fish.json A sample of the test data injected into Kafka is shown below:

{"internationalFishId": 93734195, "name": "Gerald", "breed": "Turbot"}
{"internationalFishId": 71858458, "name": "Douglas", "breed": "Turbot"}
{"internationalFishId": 25992945, "name": "Louise", "breed": "Turbot"}
{"internationalFishId": 95476834, "name": "Helen", "breed": "Kipper"}
{"internationalFishId": 27950146, "name": "Ronald", "breed": "Goldfish"}
{"internationalFishId": 19155329, "name": "Jerry", "breed": "Kipper"}
{"internationalFishId": 67784636, "name": "Benjamin", "breed": "Kipper"}
{"internationalFishId": 72704562, "name": "Stephanie", "breed": "Turbot"}
{"internationalFishId": 84804136, "name": "Evelyn", "breed": "Billy"}
{"internationalFishId": 49356570, "name": "Patrick", "breed": "Bass"}
{"internationalFishId": 22463391, "name": "Gerald", "breed": "Kipper"}
{"internationalFishId": 25494987, "name": "Jonathan", "breed": "Bass"}
{"internationalFishId": 25984696, "name": "Martin", "breed": "Turbot"}
{"internationalFishId": 89316196, "name": "Joe", "breed": "Kipper"}
{"internationalFishId": 93704506, "name": "Debra", "breed": "Bass"}
{"internationalFishId": 90875449, "name": "Susan", "breed": "Billy"}
{"internationalFishId": 3302594, "name": "Bruce", "breed": "Cod"}
{"internationalFishId": 23941776, "name": "Gerald", "breed": "Billy"}
{"internationalFishId": 14868491, "name": "Diane", "breed": "Bass"}
{"internationalFishId": 15475987, "name": "Joan", "breed": "Cod"}
{"internationalFishId": 82261217, "name": "Kathleen", "breed": "Billy"}
{"internationalFishId": 88362208, "name": "Nancy", "breed": "Cod"}
{"internationalFishId": 84881229, "name": "Aaron", "breed": "Cod"}
{"internationalFishId": 68008775, "name": "Randy", "breed": "Turbot"}
{"internationalFishId": 3246036, "name": "Larry", "breed": "Goldfish"}
{"internationalFishId": 25346448, "name": "Annie", "breed": "Billy"}
{"internationalFishId": 99978187, "name": "Wanda", "breed": "Goldfish"}
{"internationalFishId": 95566251, "name": "Susan", "breed": "Goldfish"}
{"internationalFishId": 3885361, "name": "Katherine", "breed": "Goldfish"}
{"internationalFishId": 12010058, "name": "Amy", "breed": "Kipper"}
{"internationalFishId": 81095784, "name": "Gerald", "breed": "Turbot"}
{"internationalFishId": 51150986, "name": "Laura", "breed": "Cod"}
{"internationalFishId": 62232475, "name": "Walter", "breed": "Kipper"}
{"internationalFishId": 58979946, "name": "Frances", "breed": "Goldfish"}
{"internationalFishId": 43801537, "name": "Carl", "breed": "Goldfish"}
{"internationalFishId": 23888593, "name": "Jason", "breed": "Cod"}
{"internationalFishId": 49527129, "name": "Shawn", "breed": "Billy"}
{"internationalFishId": 4168540, "name": "John", "breed": "Kipper"}
{"internationalFishId": 91915998, "name": "Amanda", "breed": "Cod"}
{"internationalFishId": 84999277, "name": "Ruth", "breed": "Cod"}
{"internationalFishId": 2823005, "name": "Phyllis", "breed": "Kipper"}
{"internationalFishId": 54375889, "name": "Patricia", "breed": "Billy"}
{"internationalFishId": 50481402, "name": "Joseph", "breed": "Cod"}
{"internationalFishId": 31330205, "name": "Patricia", "breed": "Turbot"}
{"internationalFishId": 68315446, "name": "Craig", "breed": "Billy"}
{"internationalFishId": 99509344, "name": "Peter", "breed": "Goldfish"}
{"internationalFishId": 99750447, "name": "Wayne", "breed": "Turbot"}
{"internationalFishId": 35470235, "name": "Rebecca", "breed": "Bass"}
{"internationalFishId": 22131676, "name": "David", "breed": "Kipper"}
{"internationalFishId": 14579895, "name": "Rachel", "breed": "Cod"}
{"internationalFishId": 59394346, "name": "Willie", "breed": "Cod"}
{"internationalFishId": 44310345, "name": "Anthony", "breed": "Bass"}
{"internationalFishId": 80822300, "name": "Willie", "breed": "Turbot"}
{"internationalFishId": 81693909, "name": "Laura", "breed": "Kipper"}
{"internationalFishId": 60343577, "name": "Harry", "breed": "Goldfish"}
{"internationalFishId": 28041557, "name": "Peter", "breed": "Kipper"}
{"internationalFishId": 61731232, "name": "Joshua", "breed": "Billy"}
{"internationalFishId": 67890018, "name": "Christina", "breed": "Goldfish"}
{"internationalFishId": 21402806, "name": "Shirley", "breed": "Bass"}
{"internationalFishId": 97849346, "name": "Carlos", "breed": "Goldfish"}
{"internationalFishId": 98194670, "name": "Thomas", "breed": "Bass"}
{"internationalFishId": 62769667, "name": "Richard", "breed": "Cod"}
{"internationalFishId": 26442193, "name": "Mark", "breed": "Cod"}
{"internationalFishId": 50659450, "name": "Tammy", "breed": "Kipper"}
{"internationalFishId": 73450338, "name": "Adam", "breed": "Goldfish"}
{"internationalFishId": 90544847, "name": "Margaret", "breed": "Bass"}
{"internationalFishId": 9763724, "name": "Harold", "breed": "Bass"}
{"internationalFishId": 49421808, "name": "Lawrence", "breed": "Cod"}
{"internationalFishId": 90370973, "name": "Kathleen", "breed": "Kipper"}
{"internationalFishId": 83311163, "name": "Billy", "breed": "Kipper"}
{"internationalFishId": 14814821, "name": "Ann", "breed": "Turbot"}
{"internationalFishId": 60732672, "name": "Mark", "breed": "Turbot"}
{"internationalFishId": 99621807, "name": "Donna", "breed": "Turbot"}
{"internationalFishId": 21966510, "name": "Jimmy", "breed": "Kipper"}
{"internationalFishId": 57278762, "name": "Benjamin", "breed": "Kipper"}
{"internationalFishId": 39839485, "name": "Eric", "breed": "Kipper"}
{"internationalFishId": 14616731, "name": "Julia", "breed": "Bass"}
{"internationalFishId": 68590077, "name": "Nicole", "breed": "Goldfish"}
{"internationalFishId": 16595863, "name": "Dennis", "breed": "Turbot"}
{"internationalFishId": 51139271, "name": "Brian", "breed": "Goldfish"}
{"internationalFishId": 29777274, "name": "Marie", "breed": "Turbot"}
{"internationalFishId": 13574988, "name": "Sandra", "breed": "Cod"}
{"internationalFishId": 19393831, "name": "Andrea", "breed": "Turbot"}
{"internationalFishId": 33182829, "name": "Jerry", "breed": "Kipper"}
{"internationalFishId": 37685705, "name": "William", "breed": "Kipper"}
{"internationalFishId": 88965921, "name": "Janice", "breed": "Turbot"}
{"internationalFishId": 98974498, "name": "Deborah", "breed": "Goldfish"}
{"internationalFishId": 20433714, "name": "Daniel", "breed": "Billy"}
{"internationalFishId": 2721376, "name": "Antonio", "breed": "Goldfish"}
{"internationalFishId": 93776684, "name": "Brandon", "breed": "Billy"}
{"internationalFishId": 15673501, "name": "Daniel", "breed": "Goldfish"}
{"internationalFishId": 10207475, "name": "Joseph", "breed": "Bass"}
{"internationalFishId": 51689664, "name": "Jack", "breed": "Cod"}
{"internationalFishId": 87980387, "name": "Brandon", "breed": "Goldfish"}
{"internationalFishId": 84577085, "name": "Steven", "breed": "Kipper"}
{"internationalFishId": 45082334, "name": "Tina", "breed": "Cod"}
{"internationalFishId": 81295703, "name": "Charles", "breed": "Turbot"}
{"internationalFishId": 84808531, "name": "Amanda", "breed": "Turbot"}
{"internationalFishId": 49809743, "name": "Adam", "breed": "Turbot"}
{"internationalFishId": 77388184, "name": "Benjamin", "breed": "Turbot"}

For simple testing, this data can be injected into the clusterdb-topic1 topic using the kafka-console-producer.sh command.

Next Steps

To learn much more about data streaming and how MongoDB fits in (including Apache Kafka and competing and complementary technologies) read the Data Streaming with Kafka & MongoDB white paper.

Data Streaming with Kafka & MongoDB

About the Author - Andrew Morgan

Andrew is a Principal Product Marketing Manager working for MongoDB. He joined at the start last summer from Oracle where he spent 6+ years in product management, focused on High Availability. He can be contacted @andrewmorgan or through comments on his blog (clusterdb.com).