Este exemplo de uso demonstra como configurar um pipeline para personalizar os dados que seu conector de origem MongoDB Kafka consome. Um pipeline é uma aggregation pipeline do MongoDB composta de instruções para o banco de dados para filtrar ou transformar dados.
O MongoDB notifica o conector de alterações de dados que correspondem ao seu pipeline de agregação em um fluxo de alterações. Um change stream é uma sequência de eventos que descreve as alterações de dados que um cliente fez em um sistema do MongoDB em tempo real. Para obter mais informações, consulte a entrada manual do servidor MongoDB em Change Streams.
Exemplo
Suponha que você esteja coordenando um evento e queira coletar os nomes e horários de chegada de cada convidado em um evento específico . Sempre que um convidado faz check-in do evento, um aplicação insere um novo documento que contém os seguintes detalhes:
{ "_id": ObjectId(...), "eventId": 321, "name": "Dorothy Gale", "arrivalTime": 2021-10-31T20:30:00.245Z }
Você pode definir a configuração do connector pipeline
para instruir o change stream a filtrar as informações do evento de alteração da seguinte forma:
Crie eventos de alteração para operações de inserção e omita eventos para todos os outros tipos de operações.
Crie eventos de alteração somente para documentos que correspondam ao valor
fullDocument.eventId
"321" e omita todos os outros documentos.Omita os campos
_id
eeventId
do objetofullDocument
utilizando uma projeção.
Para aplicar essas transformações, atribua o seguinte pipeline de agregação à sua configuração pipeline
:
pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]
Importante
Certifique-se de que os resultados do pipeline contenham os campos de nível superior _id
e ns
do objeto payload
. O MongoDB usa id
como o valor do token de retomada e ns
para gerar o nome do tópico de saída Kafka.
Quando a aplicação insere o documento de amostra, o connector configurado publica o seguinte registro no tópico do Kafka:
{ ... "payload": { _id: { _data: ... }, "operationType": "insert", "fullDocument": { "name": "Dorothy Gale", "arrivalTime": "2021-10-31T20:30:00.245Z", }, "ns": { ... }, "documentKey": { _id: {"$oid": ... } } } }
Para obter mais informações sobre como gerenciar fluxos de alterações com o conector de origem, consulte a documentação do conector em Change Streams.