Visão geral
Este guia mostra como alterar a maneira como seu conector de pia MongoDB Kafka grava dados no MongoDB.
Você pode alterar como seu conector grava dados no MongoDB para casos de uso, incluindo o seguinte:
- Insira documentos em vez de atualizá-los 
- Substituir ou atualizar documentos que correspondam a um filtro diferente do campo - _id
- Excluir documentos que correspondam a um filtro 
Você pode configurar como seu conector grava dados no MongoDB especificando uma estratégia de modelo de gravação. Uma estratégia de modelo de gravação é uma classe que define como o conector do coletor deve gravar dados usando modelos de gravação. Um modelo de escrita é uma interface de driver MongoDB Java que define a estrutura de uma operação de escrita.
Para saber como modificar os registros de sink que seu conector recebe antes de gravá-los no MongoDB, leia o guia sobre Sink Connector Post Processors.
Para ver uma implementação de estratégia de modelo de gravação, consulte o código fonte da classeInsertOneDefaultStrategy.
Operações de gravação em massa
O conector de pia grava dados no MongoDB usando operações de gravação em massa. As gravações em massa agrupam várias operações de gravação, como inserções, atualizações ou exclusões.
Por padrão, o conector de pia executa gravações em massa ordenadas, o que garante a ordem das alterações de dados. Em uma gravação em massa ordenada, se alguma operação de gravação resultar em um erro, o conector pulará as gravações restantes nesse lote.
Se não precisar garantir a ordem das alterações de dados, você poderá definir a configuração bulk.write.ordered como false para que o conector execute gravações em massa não ordenadas. O conector coletor executa gravações em massa não ordenadas em paralelo, o que pode melhorar o desempenho.
Além disso, quando você habilita gravações em massa não ordenadas e define a configuração de errors.tolerance como all, mesmo que qualquer operação de gravação em sua gravação em massa falhe, o conector continua a executar as operações de gravação restantes no lote que não retornam erros.
Dica
Para saber mais sobre a configuração bulk.write.ordered, consulte as Propriedades de processamento de mensagens do conector.
Para saber mais sobre operações de escrita em massa, consulte a seguinte documentação:
Como especificar estratégias de modelo de escrita
Para especificar uma estratégia de modelo de gravação, use a seguinte configuração:
writemodel.strategy=<write model strategy classname> 
Para obter uma lista das estratégias de modelo de gravação pré-criadas incluídas no conector, consulte o guia sobre configurações de estratégia de modelo de gravação.
Especificar uma chave de negócios
Uma chave de negócios é um valor composto por um ou mais campos em seu registro coletor que o identifica como exclusivo. Por padrão, o conector do coletor usa o campo _id do registro do coletor para recuperar a chave de negócios. Para especificar uma chave de negócios diferente, configure o pós-processador do adicionador de ID de Documentos para usar um valor personalizado.
É possível configurar o Document Id Adder para definir o campo _id da chave de registro do coletor, conforme mostrado no exemplo de propriedades a seguir:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy document.id.strategy.partial.key.projection.list=<comma-separated field names> document.id.strategy.partial.key.projection.type=AllowList 
Como alternativa, é possível configurá-lo para definir o campo _id a partir do valor do registro do coletor, conforme mostrado no exemplo de propriedades a seguir:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=<comma-separated field names> document.id.strategy.partial.value.projection.type=AllowList 
Importante
Melhore o desempenho de gravação
Crie um índice exclusivo em sua coleção de destino que corresponda aos campos de sua chave comercial. Isso melhora o desempenho das operações de gravação do conector do coletor. Consulte o guia sobre índices exclusivos para obter mais informações.
As seguintes estratégias de modelo de gravação exigem uma chave de negócios:
- ReplaceOneBusinessKeyStrategy
- DeleteOneBusinessKeyStrategy
- UpdateOneBusinessKeyTimestampStrategy
Para obter mais informações sobre o pós-processador Document Id Adder, consulte Configurar o pós-processador Document Id Adder.
Exemplos
Esta seção mostra exemplos de configuração e saída das seguintes estratégias de modelo de gravação:
Atualizar estratégia de carimbos de data/hora
Você pode configurar a estratégia Update One Timestamps para adicionar e atualizar carimbos de data/hora ao gravar documentos no MongoDB. Esta estratégia executa as seguintes ações:
- Quando o conector insere um novo documento do MongoDB, ele define os campos - _insertedTSe- _modifiedTScomo a hora atual no servidor do conector.
- Quando o conector atualiza um documento MongoDB existente, ele atualiza o campo - _modifiedTSpara a hora atual no servidor do conector.
Suponha que você queira rastrear a posição de um trem ao longo de uma rota e seu conector de coletor receba mensagens com a seguinte estrutura:
{   "_id": "MN-1234",   "start": "Beacon",   "destination": "Grand Central"   "position": [ 40, -73 ] } 
Use o ProvidedInValueStrategy para especificar que seu conector deve usar o valor _id da mensagem para atribuir o campo _id em seu documento MongoDB. Especifique seu ID e escreva as propriedades da estratégia do modelo da seguinte forma:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy 
Depois que o conector do coletor processa o registro de exemplo anterior, ele insere um documento que contém os campos _insertedTS e _modifiedTS , conforme mostrado no documento a seguir:
{   "_id": "MN-1234",   "_insertedTS": ISODate("2021-09-20T15:08:000Z"),   "_modifiedTS": ISODate("2021-09-20T15:08:000Z"),   "start": "Beacon",   "destination": "Grand Central"   "position": [ 40, -73 ] } 
Depois de uma hora, o trem informa sua nova localização ao longo de sua rota com uma nova posição, conforme mostrado no registro a seguir:
{   "_id": "MN-1234",   "start": "Beacon",   "destination": "Grand Central"   "position": [ 42, -75 ] } 
Depois que o conector de pia processar o registro anterior, ele insere um documento que contém os seguintes dados:
{   "_id": "MN-1234",   "_insertedTS": ISODate("2021-09-20T15:08:000Z"),   "_modifiedTS": ISODate("2021-09-20T16:08:000Z"),   "start": "Beacon",   "destination": "Grand Central"   "position": [ 42, -75 ] } 
Para obter mais informações sobre ProvidedInValueStrategy, consulte a seção sobre como configurar o pós-processador do Document Id Adder.
Substituir uma estratégia-chave de negócio
Você pode configurar a estratégia Substituir uma chave de negócio para substituir documentos que correspondam ao valor da chave de negócios. Para definir uma chave comercial em vários campos de um registro e configurar o conector para substituir documentos que contenham chaves comerciais correspondentes, execute as seguintes tarefas:
- Crie um índice exclusivo em sua coleção que corresponda aos campos de chave de negócios. 
- Especifique a estratégia de ID do - PartialValueStrategypara identificar os campos que pertencem à chave de negócios na configuração do conector.
- Especifique a estratégia do modelo de gravação - ReplaceOneBusinessKeyStrategyna configuração do conector.
Suponha que você queira rastrear a capacidade do avião pelo número do voo e pela localização do aeroporto, representados por flight_no e airport_code, respectivamente. Uma mensagem de exemplo contém as seguintes informações:
{   "flight_no": "Z342",   "airport_code": "LAX",   "seats": {     "capacity": 180,     "occupied": 152   } } 
Para implementar a estratégia, usando flight_no e airport_code como chave de negócios, primeiro crie um índice exclusivo nesses campos no shell do MongoDB:
db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true }) 
Em seguida, especifique os campos de estratégia e chave de negócios do PartialValueStrategy na lista de projeção. Especifique a id e escreva a configuração da estratégia do modelo da seguinte maneira:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=flight_no,airport_code document.id.strategy.partial.value.projection.type=AllowList writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy 
Os dados de amostra inseridos na collection contêm o seguinte:
{   "flight_no": "Z342"   "airport_code": "LAX",   "seats": {     "capacity": 180,     "occupied": 152   } } 
Quando o conector processa dados de coletor que correspondem à chave comercial do documento existente, ele substitui o documento pelos novos valores sem alterar os campos de chave comercial:
{   "flight_no": "Z342"   "airport_code": "LAX",   "status": "canceled" } 
Após o conector processar os dados da pia, ele substitui o documento de amostra original no MongoDB pelo anterior.
Excluir uma estratégia de chave de negócios
Você pode configurar o conector para remover um documento quando ele receber mensagens que correspondam a uma chave de negócio usando a estratégia Excluir uma chave de negócio. Para definir uma chave de negócios a partir de vários campos de um registro e configurar o conector para excluir um documento que contenha uma chave de negócios correspondente, execute as seguintes tarefas:
- Crie um índice exclusivo em sua coleção do MongoDB que corresponda aos seus campos de chave de negócios. 
- Especifique o - PartialValueStrategycomo a estratégia de id para identificar os campos que pertencem à chave de negócios na configuração do conector.
- Especifique a estratégia do modelo de gravação - DeleteOneBusinessKeyStrategyna configuração do conector.
Suponha que você queira excluir um evento de calendário de um ano específico de uma coleção que contenha um documento semelhante ao seguinte:
{   "year": 2005,   "month": 3,   "day": 15,   "event": "Dentist Appointment" } 
Para implementar a estratégia, usando year como chave de negócios, primeiro crie um índice exclusivo nesses campos no shell do MongoDB:
db.collection.createIndex({ "year": 1 }, { unique: true }) 
Em seguida, especifique sua chave de negócios e escreva a estratégia de modelo em sua configuração da seguinte maneira:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=year document.id.strategy.partial.value.projection.type=AllowList writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy 
Se o seu conector processar um registro de sink que contenha a chave de negócios year, ele excluirá o primeiro documento com um valor de campo correspondente retornado pelo MongoDB. Suponha que o conector processe um registro de afundamento que contenha os seguintes dados de valor:
{   "year": 2005,   ... } 
Quando o conector processa o registro anterior, ele exclui o primeiro documento da coleção que contém um campo de year com um valor de "2005", como o documento de amostra "Dentist Appointment".original.
Estratégias de modelos de gravação personalizados
Se nenhuma das estratégias de modelo de gravação incluídas com o conector se ajustar ao seu caso de uso, você poderá criar suas próprias.
Uma estratégia de modelo de escrita é uma classe Java que implementa a interface WriteModelStrategy e deve substituir o método createWriteModel().
Consulte o código fonte para a interface WriteModelStrategy para a assinatura de método exigida.
Exemplo de estratégia de modelo de gravação
A estratégia de modelo de gravação personalizada a seguir retorna uma operação de gravação que substitui um documento do MongoDB que corresponde ao campo _id do registro de coletor pelo valor do campo de fullDocument do registro de coletor:
/**  * Custom write model strategy  *  * This class reads the 'fullDocument' field from a change stream and  * returns a ReplaceOne operation.  */ public class CustomWriteModelStrategy implements WriteModelStrategy {   private static String ID = "_id";      public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {     BsonDocument changeStreamDocument = document.getValueDoc()         .orElseThrow(() -> new DataException("Missing value document"));     BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());     if (fullDocument.isEmpty()) {       return null; // Return null to indicate no op.     }     return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);   } } 
Para obter outro exemplo de estratégia de modelo de escrita personalizada, consulte a estratégia de exemplo UpsertAsPartOfDocumentStrategy no GitHub.
Como instalar sua estratégia
Para configurar seu conector de pia para usar uma estratégia de escrita personalizada, você deve concluir as seguintes ações:
- Compile a classe de estratégia de gravação personalizada a um arquivo JAR. 
- Adicione o JAR compilado ao caminho de classe/plugin para seus trabalhadores do Kafka. Para obter mais informações sobre caminhos de plugin, consulte a documentação confluente. - Observação- O Kafka Connect carrega plugins isoladamente. Quando você implanta uma estratégia de gravação personalizada, tanto o JAR do conector quanto o JAR da estratégia do modelo de gravação devem estar no mesmo caminho. Seus caminhos devem se assemelhar ao seguinte: - <plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar- <plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jar- Para saber mais sobre plug-ins do Kafka Connect, consulte este guia do Confluent. 
- Especifique sua classe personalizada na configuração writemodel.strategy. 
Para saber como compilar uma classe para um arquivo JAR, consulte o Guia de sistema JAR a partir da documentação Java SE.