对于 AI 代理:可在 https://www.mongodb.com/zh-cn/docs/llms.txt 获取文档索引—通过在任何 URL 路径后添加 .md 可获取所有页面的 Markdown 版本。
Docs 菜单

所有 Sink 连接器配置属性

在此页面上,您可以查看 MongoDB Kafka connector 的所有可用属性。此页面重复其他接收器连接器配置属性页面的内容。

要查看所有connector配置属性页面的列表,请参阅connector配置属性页面。

使用以下配置设置指定MongoDB Kafka Sink connector如何与MongoDB cluster连接并通信。

要仅查看与配置 MongoDB 连接相关的选项,请参阅MongoDB 连接配置属性页面。

名称
说明

connection.uri

必需

类型:字符串


描述:用于连接到MongoDB实例或集群的MongoDB连接 URI
字符串。有关更多信息,请参阅连接到MongoDB指南

重要提示:为避免在 connection.uri 设置中暴露身份验证凭证,请使用ConfigProvider并设立适当的配置参数。

默认值:mongodb://localhost:27017
接受的值: MongoDB连接 URI 字符串

server.api.version

类型:string

说明:
要与 MongoDB 服务器一起使用的 Stable API 版本。有关 Stable API 和支持它的服务器版本的更多信息,请参阅 Stable API MongoDB 服务器手册指南。

默认""
接受的值:空 string 或有效的 Stable API 版本。

server.api.deprecationErrors

类型:布尔值 描述:设立为


true时,如果Connector在MongoDB实例上调用已声明 Stable API版本中已弃用的命令,则会引发异常。您可以使用

配置选项设立API版本。有关server.api.version Stable API的更多信息,请参阅MongoDB手册中有关 Stable API 的条目。默认值:

false
接受值:truefalse

server.api.strict

类型:布尔值 描述:设立为


true时,如果Connector在MongoDB实例上调用已声明的 Stable API版本中未涵盖的命令,则会引发异常。您可以使用

配置选项设立API版本。有关server.api.version Stable API的更多信息,请参阅MongoDB手册中有关 Stable API 的条目。默认值:

false
接受值:truefalse

使用以下配置设置指定 MongoDB Kafka connector 将数据写入哪个 MongoDB database 和 collection。您可以使用默认的DefaultNamespaceMapper或指定自定义类。

要仅查看与指定connector写入数据的位置相关的选项,请参阅MongoDB 命名空间映射配置属性页面。

名称
说明

namespace.mapper

类型:字符串


描述:指定将数据汇入哪个数据库或集合的类的完全限定类名。默认DefaultNamespaceMapper 使用 和 属性中指定的值。Connector包括一个用于指定数据库和集合的替代类,名为 。有关更多信息,请参阅database collection


FieldPathNamespaceMapper
FieldPathNamespaceMapper 设置。默认值:


com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper

接受值:实现NamespaceMapper接口的类的完全限定 Java 类名称。

database

必需

类型:字符串


描述:接收器Connector写入的MongoDB 数据库的名称。接受值:

MongoDB 数据库名称

集合

类型:字符串


描述:SinkConnector写入的MongoDB集合的名称。如果 SinkConnector关注多个主题,则这是任何未另行指定的写入操作的默认集合。

默认:主题名称。

接受值:MongoDB collection名称

如果将接收器连接器配置为使用FieldPathNamespaceMapper ,则可以根据数据的字段值指定用于接收文档的数据库和集合。

要启用此映射行为,请将接收器connectornamespace.mapper配置属性设置为完全限定的类名称,如下所示:

namespace.mapper=com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper

FieldPathNamespaceMapper要求您指定以下设置:

  • 将属性映射到数据库和集合的一个或两个

  • 到数据库的keyvalue映射之一

  • 到collection的keyvalue映射之一

您可以使用以下设置自定义FieldPathNamespaceMapper的行为:

名称
说明

namespace.mapper.key.database.field

类型:字符串

描述:关键文档字段的名称,用于指定要写入
的数据库的名称。

namespace.mapper.key.collection.field

类型:字符串

描述:关键文档字段的名称,用于指定要在其中写入
的集合的名称。

namespace.mapper.value.database.field

类型:字符串 描述:值文档字段的名称,该字段指定要写入


的数据库的名称。

namespace.mapper.value.collection.field

类型:字符串 描述:值文档字段的名称,该字段指定要在其中写入


的集合的名称。

namespace.mapper.error.if.invalid

类型:布尔值 描述:当文档缺少映射字段或其BSON类型无效时是否引发异常。设立为


时,Connector不会进程缺少映射字段或包含无效BSON类型的文档。根据相关的错误处理配置设置,Connector可能会停止或跳过处理。设立为

true

false时,如果文档缺少映射字段或其BSON类型无效,则Connector默认为写入指定的databasecollection 设置。默认值:

false
接受值:truefalse

使用以下配置设置指定 MongoDB Kafka 接收器连接器应监视哪些 Kafka 主题的数据。

要仅查看与指定 Kafka 主题相关的选项,请参阅 Sink Connector 的 Kafka 主题属性 页面。

名称
说明

话题

必需

类型:列表


描述:接收器Connector监视的Kafka主题列表。

您可以定义topicstopics.regex设置,但不能同时定义两者。

接受值:以逗号分隔的有效 Kafka 主题列表

topic.regex

必需

类型:字符串


描述:与接收器Connector监视的Kafka主题匹配的正则表达式。

示例,以下正则表达式匹配主题名称,例如“action.landing.clicks” 和“活动.支持.点击”。 它与主题名称“action.landing.views”不匹配 和“Activity.Clicks”。

topics.regex=activity\\.\\w+\\.clicks$

您可以定义topicstopics.regex设置,但不能同时定义两者。

接受值:使用java.util.regex.Pattern的有效正则表达式模式。

使用此页面上的设置可配置 MongoDB Kafka connector 的消息处理行为,包括:

  • 消息批量大小

  • 速率限制

  • 并行任务数

要仅查看与变更数据捕获处理程序相关的选项,请参阅connector消息处理Properties页面。

名称
说明

max.batch.size

类型:int




描述:同时处理的批处理记录的最大数量。考虑包含以下记录的批处理:

[ 1, 2, 3, 4, 5 ]

设立为0 时,Connector会对整个批处理执行单次批量写入。设立为

1时,Connector会对批处理中的每条记录执行一次批量写入,总共执行五次批量写入,如以下示例所示:

[1], [2], [3], [4], [5]

默认值:0
接受值:整数

bulk.write.ordered

类型:布尔值


描述:Connector是否会将批处理记录写入有序或无序的批量写入操作。当设立为默认值 时,Connector会将批处理记录写入为有序的批量写入操作。要学习;了解有关批量写入操作的更多信息,请参阅批量写入操作。默认值:true



true
接受值:truefalse

rate.limiting.every.n

类型:int


描述:接收器Connector器为触发速率限制超时而处理的记录批次数。值为 表示没有速率限制。默认值:0

0
接受的值:整数

rate.limiting.timeout

类型:int


描述:接收器Connector在达到速率限制阈值后应恢复处理之前的等待时间(以毫秒为单位)。默认值:

0
接受值:整数

tasks.max

类型:int


描述:为此Connector创建的最大任务数。如果Connector无法处理您指定的并行度,则它创建的任务数可能会少于指定的最大任务数。重要提示:如果指定大于

1的值,Connector将启用任务的并行处理。如果您的主题有多个分区日志(这使Connector能够并行从主题读取),则任务可能会无序进程消息。默认值:

1
接受的值:整数

使用以下配置设置指定 MongoDB Kafka connector 如何处理错误并配置死信队列(DLQ)。

要仅查看与处理错误相关的选项,请参阅connector错误处理属性页面。

名称
说明

mongo.errors.tolerance

类型:字符串


描述:Connector出现错误时是否继续处理消息。允许Connector覆盖errors.tolerance Kafka集群设置。当设立为 时,Connector会报告任何错误并阻止进一步处理其余消息。设立为 时,Connector会忽略任何有问题的消息。设立为

none

all时,Connector仅允许数据错误,并在出现所有其他错误时失败。要学习;了解有关错误处理策略的更多信息,请参阅处理错误页面。此属性会覆盖

data




Connect 框架的errors.tolerance属性。默认值:继承

errors.tolerance
设置中的值。接受值:"none""all"

mongo.errors.log.enable

类型:布尔值


描述:Connector是否应将包括失败操作在内的错误详细信息写入日志文件。Connector使用 或 设置将错误分类为“可容忍”或“不可容忍”。设立为 时,Connector会记录“允许”和“不允许”的错误。设立为 时,Connector会记录“不允许”错误。此属性会覆盖错误。日志。errors.tolerance mongo.errors.tolerance

true
false


Connect 框架的启用属性。默认值:

false
接受值:truefalse

errors.log.include.messages

类型:布尔值


描述:Connector在记录错误时是否应包含无效消息。无效消息包括记录键、值和标头等数据。默认值:

false
接受值:truefalse

errors.deadletterqueue.topic.name

类型:字符串


描述:用作死信队列(DLQ)的主题名称。如果为空,则Connector不会向死信队列(DLQ)发送任何无效消息。要学习;了解有关死信队列(DLQ)的更多信息,请参阅死信队列配置示例。默认值:



""
接受的值:有效的Kafka主题名称

errors.deadletterqueue.context.headers.enable

类型:布尔值


描述:Connector在将消息写入死信队列(DLQ)时是否应包含上下文标头。要学习;了解有关死信队列(DLQ)的更多信息,请参阅死信队列配置示例。要学习;了解Connector通过上下文标头定义和报告的异常,请参阅批量写入异常。默认值:





false
接受值:truefalse

errors.deadletterqueue.topic.replication.factor

类型:整数


描述:要复制死信队列(DLQ)主题的节点数。如果您运行的是单节点Kafka集群,则必须将其设立为 。要学习;了解有关死信队列(DLQ)的更多信息,请参阅死信队列配置示例。默认值:1



3
接受的值:有效的节点数

使用以下配置设置指定 MongoDB Kafka connector 在将 Kafka 数据插入 MongoDB 之前应如何转换该数据。

要仅查看与帖子处理器相关的选项,请参阅Sink connector 帖子处理器属性页面。

名称
说明

post.processor.chain

类型:列表


描述:Connector在将数据保存到MongoDB之前应应用以进程数据的后处理器类的列表。要学习;了解有关后处理器的更多信息并查看其用法示例,请参阅



Sink Connector 后处理器。默认值:

com.mongodb.kafka.connect.sink.processor.DocumentIdAdder

接受的值:以逗号分隔的完全限定 Java 类名称列表

field.renamer.mapping

类型:字符串


描述:键字段和值字段的字段名称映射列表。按以下格式在内联JSON大量中定义映射:

[ { "oldName":"key.fieldA", "newName":"field1" }, { "oldName":"value.xyz", "newName":"abc" } ]

默认值:[]
接受的值:有效的JSON大量

field.renamer.regexp

类型:字符串


描述:使用正则表达式的键和值字段的字段名称映射列表。按以下格式在内联JSON大量中定义映射:

[ {"regexp":"^key\\\\..*my.*$", "pattern":"my", "replace":""}, {"regexp":"^value\\\\..*$", "pattern":"\\\\.", "replace":"_"} ]

默认值:[]
接受的值:有效的JSON大量

key.projection.list

类型:字符串


描述:Connector应包含在键投影中的字段名称列表。默认值:

""
接受的值:以逗号分隔的字段名称列表

key.projection.type

类型:字符串


描述:Connector应使用的键投影类型。默认值:

none
接受的值:noneBlockListAllowList (已弃用:黑名单、白名单)

value.projection.list

类型:字符串


描述:Connector应包含在值投影中的字段名称列表。默认值:

""
接受的值:以逗号分隔的字段名称列表

value.projection.type

类型:字符串


描述:Connector应使用的值投影类型。默认值:

none
接受的值:noneBlockListAllowList (已弃用:黑名单、白名单)

writemodel.strategy

类型:字符串 描述:该类指定Connector应用于批量写入的


WriteModelStrategy。要学习;了解有关如何创建自己的策略的更多信息,请参阅自定义写入模型策略。默认值:




com.mongodb.kafka.connect.sink.writemodel.strategy.DefaultWriteModelStrategy

接受值:完全限定的 Java 类名称

使用以下配置设置,指定 MongoDB Kafka Sink 连接器应如何确定写入 MongoDB 的每个文档的 _id 值。

要仅查看与确定文档_id字段相关的选项,请参阅“connector ID 策略属性”页面。

名称
说明

document.id.strategy

类型:字符串

描述:Connector用于生成唯一
_id字段的类。默认值:

com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy

接受的值:空字符串或完全限定的 Java 类名

document.id.strategy.overwrite.existing

类型:布尔值 描述:Connector在应用


属性定义的策略时是否应覆盖_id document.id.strategy字段中的现有值。默认值:

false
接受值:truefalse

document.id.strategy.uuid.format

类型:字符串 描述:Connector应以字符串格式还是


_idBsonBinary 格式在 字段中输出 UUID。默认值:

string
接受值:stringbinary

delete.on.null.values

类型:布尔值


描述:当键值与MongoDB中的文档匹配且值字段为 null 时,Connector是否应删除文档。当您指定对密钥文档进行操作的

ID 生成策略(例如FullKeyStrategy 、 和 )时,此设置适用。默认值:PartialKeyStrategy ProvidedInKeyStrategy

false
接受值:truefalse

您可以设置配置属性来指定MongoDB Kafka Sink connector如何将数据写入MongoDB 。 以下部分介绍了可以设置的配置属性,可自定义此行为。

writemodel.strategy 配置属性设置为 ,以指定接收器connector在接收接收器记录时如何写入数据。

您可以将writemodel.strategy的值设置为本页“策略”部分中描述的写入模型策略的任何完全限定类名称。 您可以通过设置以下配置来指定策略:

writemodel.strategy=<a write model strategy>

delete.writemodel.strategy 配置属性设置为 ,以指定 Sink connector在收到 tombstone 事件时如何写入数据。 逻辑删除事件是指包含键但不包含值的记录,这表示记录已删除。

您可以将delete.writemodel.strategy的值设置为本页“策略”部分中描述的写入模型策略的任何完全限定类名称。 您可以通过设置以下配置来指定策略:

delete.writemodel.strategy=<a write model strategy>

要仅查看与写入模型策略相关的选项,请参阅接收器连接器写入模型策略页面。

名称
说明

DefaultWriteModelStrategy

描述:
此策略默认使用ReplaceOneDefaultStrategy InsertOneDefaultStrategytimeseries.timefield,如果设立了 选项,则使用 。这是

writemodel.strategy配置属性的默认值。

InsertOneDefaultStrategy

描述:
将每条 Sink记录作为文档插入MongoDB 。要指定此策略,设立属性设置为以下类名:

com.mongodb.kafka.connect.sink.writemodel.strategy.InsertOneDefaultStrategy

ReplaceOneDefaultStrategy

描述:
在MongoDB中最多替换一个由_id 字段匹配 Sink记录的文档。如果没有匹配的文档,Connector会将
Sink记录作为新文档插入。要指定此策略,设立属性设置为以下类名:

com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy

替换一个业务键策略

描述:
最多替换一个与指定业务键 Sink记录匹配的文档。如果没有匹配的文档,Connector会将 Sink记录作为新文档插入。要指定此策略,设立属性设置为以下类名:

com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy

要查看如何使用此策略的示例,请参阅我们的 写模型策略 指南

DeleteOneDefaultStrategy

描述:仅当文档包含 null 值结构时,才最多删除一个通过
字段与接收器连接器的键结构匹配的文档。这是 配置属性的默认值。当您设立_id

delete.writemodel.strategy

时,此策略将被设立为 属性的默认值。要指定此策略,设立属性设置为以下类名:writemodel.strategydelete.on.null.values=true

com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy

删除一个业务键策略

描述:
最多删除一个通过业务键与 Sink记录匹配的MongoDB文档。此策略需要使用valueDoc 来生成密钥。要利用PartialKeyStrategy 生成密钥,请使用 。要指定此策略,设立属性设置为以下类名:DeleteOneTombstoneBusinessKeyStrategy

com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy

要查看如何使用此策略的示例,请参阅我们的 写模型策略 指南

DeleteOneTombstone BusinessKeyStrategy

描述:
最多删除一个通过业务键与 Sink记录匹配的MongoDB文档。此策略利用 创建用于删除的密钥。要指定此策略,设立属性设置为以下类名:PartialKeyStrategy

com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneTombstoneBusinessKeyStrategy

UpdateOneDefaultStrategy

描述:
在MongoDB中最多更新一个通过_id 字段匹配 Sink记录的文档。如果没有匹配的文档,Connector会将
Sink记录作为新文档插入。要指定此策略,设立属性设置为以下类名:

com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneDefaultStrategy

更新一个时间戳策略

描述:
_insertedTS将 (插入时间戳)和_modifiedTS (修改时间戳)字段添加到文档中。要指定此策略,设立属性设置为以下类名:

com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

要查看如何使用此策略的示例,请参阅我们的 写模型策略 指南

UpdateOneBusinessKeyTimestampStrategy

描述:
_insertedTS将 (插入时间戳)和_modifiedTS (修改时间戳)字段添加到与业务键匹配的文档中。要指定此策略,设立属性设置为以下类名:

com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy

使用以下 MongoDB Kafka 接收器连接器配置设置来覆盖特定主题的全局或默认属性设置。

要仅查看与覆盖主题设置相关的选项,请参阅主题覆盖属性页面。

名称
说明

topic.override.<topicName>.<propertyName>

类型:字符串 描述:指定主题和属性名称以覆盖相应的全局或默认属性设置。示例,




topic.override.foo.collection=bar
设置指示 SinkConnector将来自 主题的数据存储在 集合中。您可以在 段中按主题指定任何有效的配置设置,但foo bar



<propertyName>
connection.uri和 除外。默认值:topics

""
接受值:特定于覆盖属性的接受值

使用以下配置设置指定MongoDB Kafka接收器connector用于处理变更数据捕获 (CDC) 事件的类。

有关使用内置ChangeStreamHandler和Debezium及Qlik Replicate事件生成器的处理程序的示例,请参阅变更数据捕获处理程序指南

要仅查看与变更数据捕获处理程序相关的选项,请参阅变更数据捕获属性页面。

名称
说明

change.data.capture.handler

类型:字符串

描述:用于将更改转换为事件流的
CDC 处理程序的类名称。有关 CDC 处理程序的列表,请参阅可用的 CDC 处理程序。默认值:

""
接受的值:空字符串或完全限定的Java类名

使用以下配置设置指定 MongoDB Kafka 接收器连接器应如何将数据接收到 MongoDB 时间序列集合。

要仅查看与time-series collection相关的选项,请参阅Kafka time-series 属性页面。

名称
说明

timeseries.timefield

类型:字符串

描述:源数据中包含要与时间序列集合中的新文档关联的时间信息的顶级字段的名称。默认值:


""
接受的值:空字符串或包含BSONDateTime 值的字段名称

timeseries.timefield.auto.convert.date.format

类型:字符串


描述:Connector应使用该日期格式模式来转换timeseries.timefield

设置指定的字段中包含的源数据。Connector将日期格式模式传递给Java DateTimeFormatter.ofPattern(模式, 区域设置) 方法,以对时间字段执行日期和时间转换。如果源数据中的日期值仅包含日期信息,Connector会将时间信息设置为指定日期的开始时间。如果日期值不包含时区偏移量,Connector会将偏移量设置为



UTC。默认值:

yyyy-MM-dd[['T'][ ]][HH:mm:ss[[.][SSSSSS][SSS]][ ]VV[ ]'['VV']'][HH:mm:ss[[.][SSSSSS][SSS]][ ]X][HH:mm:ss[[.][SSSSSS][SSS]]]

接受的值:有效的DateTimeFormatter格式

timeseries.timefield.auto.convert

类型:boolean


描述:是否将字段中的数据转换为BSONDate 格式。设立为

true时,如果值为数字,Connector将使用纪元之后的毫秒数并丢弃小数部分。如果值为字符串,Connector将使用以下配置中的设置来解析日期:

timeseries.timefield.auto.convert.date.format

如果Connector无法转换该值,则会将原始值发送到时间序列集合。默认值:

false
接受值:truefalse

timeseries.timefield.auto.convert.locale. language.tag

类型:string

说明:
要与日期格式模式一起使用的 DateTimeFormatter 区域设置语言标签(例如:"en-US")。

要了解有关区域设置的更多信息,请参阅 Java SE 文档中的 Locale

默认值ROOT
可接受值:有效的 Locale 语言标签格式

timeseries.metafield

类型:字符串


描述:从源数据中读取哪个顶级字段来描述一群组相关的时间序列文档。重要提示:该字段不能是 字段,也不能是您在 设置中指定的字段。默认值:

_idtimeseries.timefield

""
接受的值:空字符串或包含除 之外的任何BSON类型的字段名称。BsonArray

timeseries.expire.after.seconds

类型:int

描述:
MongoDB在自动删除时间序列集合数据之前应等待的秒数。当设置值小于1

时,Connector会禁用定时过期。要学习;了解更多信息,请参阅MongoDB手册中的设置时间序列集合的自动删除。默认值:

0
接受的值:整数

timeseries.granularity

类型:字符串




描述:源数据后续测量之间的预期间隔。要学习;了解更多信息,请参阅MongoDB手册中的“设置时间序列数据的粒度”。可选默认值:


""
接受值:"" "seconds""minutes" 、 、"hours"

有关如何将现有集合转换为时间序列集合的示例,请参阅如何将现有集合迁移到时间序列集合的教程。