对于 AI 代理:可在 https://www.mongodb.com/zh-cn/docs/llms.txt 获取文档索引—通过在任何 URL 路径后添加 .md 可获取所有页面的 Markdown 版本。
Docs 菜单

所有源连接器配置属性

在本页上,您可以查看 MongoDB Kafka 源连接器的所有可用配置属性。本页重复了其他源连接器配置属性页的内容。

要查看所有 Source 连接器配置属性页面的列表,请参阅 Source 连接器配置属性页面。

使用以下配置设置指定 MongoDB Kafka Source 连接器如何建立连接并与 MongoDB 集群进行通信。

要仅查看与 MongoDB 连接相关的选项,请参阅 MongoDB 源连接属性页面。

名称
说明

connection.uri

必需

类型:字符串


描述:用于连接MongoDB实例或集群的URI连接字符串。要学习;了解更多信息,请参阅

连接到MongoDB。

重要提示:为避免在 connection.uri 设置中暴露身份验证凭证,请使用ConfigProvider并设立适当的配置参数。

默认值:mongodb://localhost:27017,localhost:27018,localhost:27019
接受的值: MongoDB URI连接字符串

database

类型:字符串


描述:监视其变更的数据库名称。如果未设立,Connector会监视所有数据库的更改。默认值:

""
接受的值:单个数据库名称

集合

类型:字符串


描述:数据库中用于监视其更改的集合的名称。如果未设立,Connector会监视所有集合的更改。重要提示:如果您的

database配置设立为"" ,Connector将忽略 设置。默认值:collection

""
接受的值:单个集合名称

server.api.version

类型:字符串


描述:要与MongoDB 集群一起使用的 Stable API版本。有关 Stable API和支持该 API 的MongoDB 服务器版本的更多信息,请参阅 Stable API指南。默认值:

""
接受的值:空字符串或有效的 Stable API版本。

server.api.deprecationErrors

类型:布尔值 描述:设立为


true时,如果Connector在MongoDB实例上调用已声明 Stable API版本中已弃用的命令,则会引发异常。您可以使用

配置选项设立API版本。有关server.api.version Stable API的更多信息,请参阅MongoDB手册中有关 Stable API 的条目。默认值:

false
接受值:truefalse

server.api.strict

类型:布尔值 描述:设立为


true时,如果Connector在MongoDB实例上调用已声明的 Stable API版本中未涵盖的命令,则会引发异常。您可以使用

配置选项设立API版本。有关server.api.version Stable API的更多信息,请参阅MongoDB手册中有关 Stable API 的条目。默认值:

false
接受值:truefalse

使用以下配置设置指定 MongoDB Kafka 源连接器应将数据发布到哪些 Kafka 主题。

要仅查看与Kafka主题相关的选项,请参阅源连接器的Kafka主题属性页面。

名称
说明

topic.prefix

类型:字符串


描述:指定Connector变更流事件发布到的目标Kafka主题名称的第一部分。目标主题名称由topic.prefix 值组成,后跟数据库和集合名称,并用 属性中指定的值分隔。要学习;了解更多信息,请参阅主题命名前缀中的示例。默认值:topic.separator



""
接受的值:由 ASCII 字母数字字符组成的字符串,包括“.”、“-”和“_”

topic.suffix

类型:字符串


描述:指定Connector变更流事件发布到的目标Kafka主题名称的最后一部分。目标主题名称由数据库和集合名称组成,后跟topic.suffix 值,并用 属性中指定的值分隔。要学习;了解更多信息,请参阅主题命名后缀中的示例。默认值:topic.separator



""
接受的值:由 ASCII 字母数字字符组成的字符串,包括“.”、“-”和“_”

topic.namespace.map

类型:字符串


描述:指定变更流文档命名空间和主题名称之间的JSON映射。您可以使用

topic.namespace.map属性指定复杂的映射。此属性支持正则表达式和通配符匹配。要学习;了解有关这些行为的更多信息并查看示例,请参阅主题命名空间映射。默认值:



""
接受的值:有效的JSON对象

topic.separator

类型:字符串


描述:指定Connector用于连接用于创建主题名称的值的字符串。Connector将记录发布到主题,其名称是通过按以下顺序连接以下字段的值形成的:

  1. topic.prefix

  2. database

  3. collection

  4. topic.suffix

示例,以下配置指示Connector将变更流文档从 db数据库的 coll集合发布到 prefix-db-coll主题:

topic.prefix=prefix
database=db
collection=coll
topic.separator=-

重要提示:请注意,使用 topic.separator属性时,它不会影响您定义topic.namespace.map 属性的方式。topic.namespace.map 属性使用MongoDB命名空间,您必须始终使用 字符指定该命名空间,以分隔数据库和集合名称。默认值:.

"."
接受的值:字符串

topic.mapper

类型:字符串


描述:定义自定义主题映射逻辑的Java类。默认值:

com.mongodb.kafka.connect.source.topic.mapping.DefaultTopicMapper
接受的值:TopicMapper 类实施的有效完整类名。

使用 MongoDB Kafka 源连接器时,使用以下配置设置指定变更流的聚合管道和变更流游标的读取偏好。

要仅查看与变更流相关的选项,请参阅变更流属性页面。

名称
说明

管道

类型:字符串


描述:要在变更流中运行的聚合管道大量。您必须为变更流事件文档而不是 字段配置此设置。示例:fullDocument

[{"$match": { "$and": [{"operationType": "insert"}, {"fullDocument.eventId": 1404 }] } }]

有关更多示例,请参阅:

"[]"
默认值: 接受的值:有效的聚合管道阶段

change.stream.full.document

类型:字符串 描述:确定变更流在更新操作中返回的值。默认设置返回原始文档和更新文档之间的差异。


设置返回原始文档和更新文档之间的差异以及更新后某个时间点的整个更新文档的副本。 设置会返回更新的文档(如果可用)。

设置会返回更新后的文档,如果该文档不可用,则会引发错误。有关此变更流选项如何工作的更多信息,请参阅MongoDB手册中的查找更新操作的完整文档。默认值:

updateLookup

whenAvailable

required



""
接受值:"""updateLookup""whenAvailable""required"

更改。 流.show.expanded.events

类型:布尔值 描述:确定变更流是否通知


DDL 事件,例如
createIndexes
dropIndexes
事件。要学习;了解更多信息,请参阅 showExpandedEvents。此设置需要在更新事件中显示


updateDescription.disambiguatedPaths
,这会澄清涉及不明确字段的更改。要学习;了解更多信息,请参阅
disambiguatedPaths。默认值:

false
接受值:truefalse

change.stream.full.document.before.change

类型:字符串






描述:配置变更流在更新操作中返回的文档前像。前像对于复制现有数据时发布的源记录不可用,并且前像配置对复制没有影响。要学习;了解如何配置集合以启用前像,请参阅MongoDB手册中的具有文档前像和后像的变更流。默认设置会抑制文档前像。

whenAvailable设置会返回替换、更新或删除之前的文档前像(如果可用)。

required设置返回文档前像,如果不可用,则引发错误。默认值:

""
接受值:"""whenAvailable""required"

publish.full.document.only

类型:布尔值 描述:是否仅返回任何更新事件生成的变更流事件文档中的


fullDocument字段。fullDocument 字段包含最新版本的更新文档。要学习;了解有关 字段的更多信息,请参阅服务器手册中的更新事件。当设立为fullDocument

true时,Connector会覆盖change.stream.full.document 设置并将其设置为 ,以便updateLookup fullDocument

false
字段包含更新的文档。默认值: 接受值:truefalse

publish.full.document.only.tombstone.on.delete

类型:布尔值


描述:删除文档时是否返回逻辑删除事件。逻辑删除事件包含具有null 值的已删除文档的密钥。此设置仅当 为 时适用。默认值:publish.full.document.only true

false
接受值:truefalse

change.stream.document.key.as.key

类型:布尔值


描述:如果文档键存在,是否使用文档键作为源记录键。当设立为

true时,Connector会将已删除文档的密钥添加到 tombstone 事件中。当设立为false 时,Connector使用恢复令牌作为 tombstone 事件的源密钥。默认值:

true
接受值:truefalse

排序规则

类型:字符串


描述:一份JSON排序规则文档,用于指定MongoDB应用于变更流返回的文档的特定于语言的排序规则。默认值:

""
接受值:有效的排序规则JSON 文档

batch.size

类型:int


描述:变更流游标批处理大小。默认值:

0
接受值:整数

poll.await.time.ms

类型:long


描述:服务器在返回空批处理之前等待新数据更改报告给变更流游标的最长时间(以毫秒为单位)。默认值:

5000
接受值:整数

poll.max.batch.size

类型:int


描述:轮询变更流游标以获取新数据时,单个批处理中读取的最大文档数。您可以使用此设置来限制Connector内部缓冲的数据量。默认值:

1000
接受值:整数

使用以下配置设置指定 MongoDB Kafka Source 连接器发布到 Kafka 主题的数据格式。

要只查看与输出格式有关的选项,请参阅输出格式属性页面。

名称
说明

output.format.key

类型:字符串


描述:指定源Connector输出密钥文档的数据格式。默认值:

json
接受的值:bsonjsonschema

output.format.value

类型:字符串


描述:指定源Connector输出值文档的数据格式。Connector支持

Protobuf 作为输出数据格式。您可以通过指定schema 值并安装和配置Kafka Connect Protobuf 转换器启用此格式。默认值:

json
接受的值:bsonjsonschema

output.json.formatter

类型:字符串


描述:Connector用于输出数据的JSON格式化程序的类名称。默认值:

com.mongodb.kafka.connect.source.json.formatter.DefaultJson


接受的值:您的自定义JSON格式化程序完整类名称或以下内置格式化程序类名称之一:

com.mongodb.kafka.connect.source.json.formatter.DefaultJson
com.mongodb.kafka.connect.source.json.formatter.ExtendedJson
com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson

要了解有关这些输出格式的更多信息,请参阅 JSON 格式化程序

output.schema.key

类型:字符串

描述:为
SourceRecord 的密钥文档指定 Avro模式定义。要学习;了解有关

Avro模式的更多信息,请参阅数据格式指南中的 Avro。默认值:

{
"type": "record",
"name": "keySchema",
"fields" : [ { "name": "_id", "type": "string" } ]"
}

接受值:有效的 Avro模式

output.schema.value

类型:字符串

描述:为
SourceRecord 的值文档指定 Avro模式定义。要学习;了解有关

Avro模式的更多信息,请参阅数据格式指南中的 Avro。默认值:

{
"name": "ChangeStream",
"type": "record",
"fields": [
{ "name": "_id", "type": "string" },
{ "name": "operationType", "type": ["string", "null"] },
{ "name": "fullDocument", "type": ["string", "null"] },
{ "name": "ns",
"type": [{"name": "ns", "type": "record", "fields": [
{"name": "db", "type": "string"},
{"name": "coll", "type": ["string", "null"] } ]
}, "null" ] },
{ "name": "to",
"type": [{"name": "to", "type": "record", "fields": [
{"name": "db", "type": "string"},
{"name": "coll", "type": ["string", "null"] } ]
}, "null" ] },
{ "name": "documentKey", "type": ["string", "null"] },
{ "name": "updateDescription",
"type": [{"name": "updateDescription", "type": "record", "fields": [
{"name": "updatedFields", "type": ["string", "null"]},
{"name": "removedFields",
"type": [{"type": "array", "items": "string"}, "null"]
}] }, "null"] },
{ "name": "clusterTime", "type": ["string", "null"] },
{ "name": "txnNumber", "type": ["long", "null"]},
{ "name": "lsid", "type": [{"name": "lsid", "type": "record",
"fields": [ {"name": "id", "type": "string"},
{"name": "uid", "type": "string"}] }, "null"] }
]
}

接受值:有效的 JSON schema

output.schema.infer.value

类型:布尔值 描述:Connector是否应推断


SourceRecord 的值文档的模式。由于Connector隔离性处理每个文档,因此Connector可能会生成许多模式。重要提示:仅当您设立

output.format.value设置为 时,Connector才会读取此设置。默认值:schema

false
接受的值:truefalse

使用以下配置设置来配置 MongoDB Kafka Source 连接器的启动,以将 MongoDB 集合转换为变更流事件。

要仅查看与启动相关的选项,请参阅启动属性页面。

名称
说明

startup.mode

类型:字符串


描述:指定当没有可用的源偏移时,Connector应如何启动。恢复变更流需要恢复令牌,Connector将从源偏移获取该令牌。如果没有可用的源偏移,则Connector可能会忽略全部或部分现有源数据,或者可能会先复制所有现有源数据,然后继续处理新数据。如果为

startup.mode=latest

startup.mode=timestampstartup.mode.timestamp.*,则Connector将忽略所有现有源数据。如果为timestamp latest

startup.mode=copy_existing,Connector将激活 属性。如果未配置任何属性,则 相当于 。如果为 ,Connector会将所有现有源数据复制到变更流事件。此设置相当于已弃用的设置copy.existing=true

如果任何系统在源Connector从数据库转换现有数据时更改数据库中的数据, MongoDB可能会生成重复的变更流事件以反映最新更改。由于数据复制所依赖的变更流事件是幂等的,因此复制的数据最终是一致的,符合“至少一次”交付保证。

默认值:latest
接受值:latesttimestampcopy_existing

startup.mode.timestamp.start.at.operation.time

类型:字符串

描述:仅在
startup.mode=timestamp

时才启动。指定变更流的点。要学习;了解有关变更流参数的更多信息,请参阅MongoDB手册中的 $changeStream(聚合)。默认值:

""
接受的值:

  • 自纪元以来的整数秒数(十进制格式)(例如,30

  • ISO-8601 格式的瞬间,精度为一秒(例如,1970-01-01T00:00:30Z

  • 规范扩展 JSON (v2) 格式的 BSON 时间戳(例如 {"$timestamp": {"t": 30, "i": 0}}

startup.mode.copy.existing.namespace.regex

类型:字符串


描述:Connector用于匹配要从中复制数据的命名空间的正则表达式。命名空间描述由句点分隔的MongoDB 数据库名称和集合(示例 )。示例,以下正则表达式设置匹配databaseName.collectionName

stats数据库中以“page”开头的集合:

startup.mode.copy.existing.namespace.regex=stats\.page.*

\上述示例中的 字符对正则表达式中其后的. 字符进行转义。有关如何构建正则表达式的更多信息,请参阅Java API文档中的模式。默认值:

""
接受的值:有效的正则表达式

startup.mode.copy.existing.pipeline

类型:字符串


描述:Connector在复制现有数据时运行的管道操作的内联大量。您可以使用此设置来过滤源集合,并改进复制进程中索引的使用。示例,以下设置使用

$match聚合操作符符指示Connector仅复制包含值为 closedfalse 字段的文档。

startup.mode.copy.existing.pipeline=[ { "$match": { "closed": "false" } } ]

""
默认值: 接受值:有效聚合管道阶段

startup.mode.copy.existing.max.threads

类型:int

描述:Connector可用于复制数据的最大线程数。默认值:环境中可用的处理器数量



接受的值:整数

startup.mode.copy.existing.queue.size

类型:int


描述:Connector在复制数据时可以使用的队列大小。默认值:

16000
接受值:整数

startup.mode.copy.existing.allow.disk.use

类型:布尔值 描述:设立为


true时,Connector使用临时磁盘存储来复制现有聚合。默认值:

true
接受值:truefalse

使用以下配置设置指定 MongoDB Kafka Source 连接器在遇到错误时的行为方式,并指定与恢复中断读取相关的设置。

要仅查看与处理错误相关的选项,请参阅错误处理和从中断属性恢复页面。

名称
说明

mongo.errors.tolerance

类型:字符串


描述:Connector出现错误时是否继续处理消息。如果您希望Connector在遇到错误时停止处理消息并报告问题,请将此项设置为

"none"

"all"。如果您希望Connector继续处理消息并忽略遇到的任何错误,请将此项设置为 。重要提示:该属性会覆盖

Connect Framework

的errors.tolerance属性。默认值:"none"
接受的值:"none""all"

mongo.errors.log.enable

类型:布尔值


描述:Connector是否应在日志文件中报告错误。将其设置为 以日志Connector遇到的所有错误。将其设置为

true

false以日志Connector不允许的错误。您可以使用errors.tolerancemongo.errors.tolerance

设置指定Connector应允许哪些错误。重要提示:此属性会覆盖错误。日志。启用Connect Framework属性。默认值:

false
接受的值:truefalse

mongo.errors.deadletterqueue.topic.name

类型:字符串


描述:用作死信队列(DLQ)的主题名称。如果指定一个值,Connector会将无效消息作为扩展JSON字符串写入死信队列(DLQ)主题。如果将此设置留空,Connector不会将无效消息写入任何主题。



重要提示:您必须设立 errors.tolerancemongo.errors.tolerance 设置设为"all" 才能启用此属性。默认值:

""
接受的值:有效的Kafka主题名称

offset.partition.name

类型:字符串


描述:要使用的自定义偏移分区名称。当现有偏移包含无效的恢复令牌时,您可以使用此选项指示Connector启动新的变更流。如果将此设置留空,Connector将根据连接详细信息使用默认分区名称。要查看偏移分区的命名策略,请参阅重置已存储的偏移。默认值:





""
接受的值:字符串。要学习;了解有关命名分区的更多信息,请参阅Apache Kafka API文档中的 SourceRecord。

heartbeat.interval.ms

类型:long


描述:Connector在发送心跳消息之间等待的毫秒数。当源记录未在指定时间间隔内发布时,Connector会发送心跳消息。此机制提高了Connector针对低容量命名空间的可恢复性。心跳消息包含

postBatchResumeToken数据字段。该字段的值包含Connector上次从变更流中读取的MongoDB服务器oplog条目。将其设置为

0以禁用心跳消息。要学习;了解更多信息,请参阅“无效恢复令牌”页面中的“预防”。默认值:



0
接受的值:整数

heartbeat.topic.name

类型:字符串


描述:Connector应在其上发布心跳消息的主题的名称。您必须在heartbeat.interval.ms 设置中提供正值才能启用此功能。默认值:

__mongodb_heartbeats
接受的值:有效的Kafka主题名称