Este exemplo de uso demonstra como você pode configurar o conector de origem do MongoDB Kafka para aplicar um esquema personalizado aos seus dados. Um esquema é uma definição que especifica a estrutura e digita informações sobre dados em um tópico do Apache Kafka. Use um esquema quando precisar garantir que os dados sobre o tópico preenchidos pelo conector de origem tenham uma estrutura consistente.
Para saber mais sobre como usar esquemas com o conector, consulte o guia Aplicar esquemas.
Exemplo
Suponha que seu aplicação mantenha o controle dos dados do cliente em uma coleção MongoDB e você deseja publicar esses dados em um tópico do Kafka. Você deseja que os assinantes dos dados do cliente recebam dados formatados consistentemente. Você escolhe aplicar um esquema aos seus dados.
Suas necessidades e suas soluções são as seguintes:
Requerimento | várias plataformas |
|---|---|
Receber dados de clientes de uma coleção MongoDB | Configure um conector de origem MongoDB para receber atualizações de dados de um banco de dados e coleção específicos. Consulte |
Fornecer o esquema de dados do cliente | Especifique um esquema que corresponda à estrutura e aos tipos de dados dos dados do cliente . Consulte |
Omitir metadados do Kafka dos dados do cliente | Incluir somente os dados do |
Para o arquivo de configuração completo que atende aos requisitos acima, consulte Especificar a configuração.
Receber dados de uma coleção
Para configurar seu conector de origem para receber dados de uma coleção MongoDB, especifique o nome do banco de dados e da coleção. Para este exemplo, você pode configurar o conector para ler a partir da coleção purchases no banco de dados customers como segue:
database=customers collection=purchases
Criar um esquema personalizado
Um exemplo de documento de dados do cliente da sua coleção contém as seguintes informações:
{ "name": "Zola", "visits": [ { "$date": "2021-07-25T17:30:00.000Z" }, { "$date": "2021-10-03T14:06:00.000Z" } ], "goods_purchased": { "apples": 1, "bananas": 10 } }
A partir do documento de amostra, você decide que seu esquema deve apresentar os campos usando os seguintes tipos de dados:
Nome do campo | Tipos de dados | Descrição |
|---|---|---|
name | Nome do cliente | |
visitas | array of timestamps | Datas em que o cliente visitou |
bens_comprados | Nomes de mercadorias e quantidade de cada item que o cliente comprou |
Você pode descrever seus dados usando o formato de esquema Apache Avro, como mostrado no esquema de exemplo abaixo:
{ "type": "record", "name": "Customer", "fields": [{ "name": "name", "type": "string" },{ "name": "visits", "type": { "type": "array", "items": { "type": "long", "logicalType": "timestamp-millis" } } },{ "name": "goods_purchased", "type": { "type": "map", "values": "int" } } ] }
Importante
Conversores
Para enviar seus dados com codificação binário Avro através do Apache Kafka, é necessário utilizar um conversor Avro. Para obter mais informações, consulte o guia sobre Conversores.
Omitir metadados de registros publicados
O conector publica os documentos e metadados de dados do cliente que descrevem o documento em um tópico do Kafka. Você pode definir o conector para incluir somente os dados do documento contidos no campo fullDocument do registro usando a seguinte configuração:
publish.full.document.only=true
Para obter mais informações sobre o campo fullDocument , consulte o guia Change Streams .
Especifique a configuração
A configuração do conector do esquema personalizado deve se assemelhar ao seguinte:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your MongoDB connection URI> database=customers collection=purchases publish.full.document.only=true output.format.value=schema output.schema.value={\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]} value.converter.schemas.enable=true value.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter
Observação
Esquema incorporado
Na configuração anterior, o conversor de JSON schema do Kafka Connect incorpora o esquema personalizado em suas mensagens. Para saber mais sobre o conversor de JSON schema, consulte o guia Conversores.
Para mais informações sobre como especificar esquemas, consulte o guia Aplicar Esquemas.