托管流处理器
在此页面上
Atlas Stream Processing 流处理器将唯一命名的流聚合管道的逻辑应用于流数据。 Atlas Stream Processing 将每个流处理器定义保存到持久存储中,以便重复使用。您只能在存储其定义的流处理实例中使用给定的流处理器。Atlas Stream Processing 最多支持每个工作线程4流处理器。对于超过此限制的其他处理器,Atlas Stream Processing 会分配新资源。
先决条件
要创建和管理流处理器,您必须具备:
mongosh
2.0或更高版本具有
atlasAdmin
角色的数据库用户,用于创建和运行流处理器Atlas 集群
Considerations
许多流处理器命令要求您在方法调用中指定相关流处理器的名称。 以下部分中描述的语法假定严格为字母数字名称。 如果流处理器的名称包含非字母数字字符,例如连字符 ( -
) 或句点 ( .
),则必须将名称用方括号 ( []
) 和双引号 ( ""
) 括在方法调用,如sp.["special-name-stream"].stats()
中。
以交互方式创建流处理器
您可以使用sp.process()
方法以交互方式创建流处理器。您以交互方式创建的流处理器表现出以下行为:
将输出和死信队列文档写入shell
创建后立即开始运行
运行10分钟或直到用户停止
停止后不要继续
您以交互方式创建的流处理器用于原型设计。要创建持久流处理器,请参阅创建流处理器。
sp.process()
通过以下语法实现:
sp.process(<pipeline>)
字段 | 类型 | 必要性 | 说明 |
---|---|---|---|
pipeline | 阵列 | 必需 | 要应用于流媒体数据的管道。 |
连接到您的Atlas Stream Processing实例。
使用与您的 实例关联的连接stringAtlas Stream Processing mongosh
,通过 进行连接。
例子
以下命令使用 SCRAM-SHA-256 身份验证以名为streamOwner
的用户身份连接到Atlas Stream Processing实例:
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
根据提示提供用户密码。
定义管道。
在mongosh
提示中,将包含要应用于名为pipeline
的变量的聚合阶段的数组赋值。
以下示例使用连接注册表中myKafka
连接中的stuff
主题作为$source
,匹配temperature
字段值为46
的记录,并将已处理的消息发出到output
连接注册表中mySink
连接的主题:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
创建流处理器
要使用mongosh
创建新的流处理器,请使用sp.createStreamProcessor()
方法。 它具有以下语法:
sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument | 类型 | 必要性 | 说明 |
---|---|---|---|
name | 字符串 | 必需 | 流处理器的逻辑名称。它在流处理实例中必须是唯一的。此名称应仅包含字母数字字符。 |
pipeline | 阵列 | 必需 | 要应用于流媒体数据的管道。 |
options | 对象 | Optional | 为流处理器定义各种可选设置的对象。 |
options.dlq | 对象 | 可选的 | 为Atlas Stream Processing实例分配死信队列(DLQ)的对象。如果您定义 options 字段,则此字段是必需的。 |
options.dlq.connectionName | 字符串 | 可选的 | 人类可读标签,用于标识连接注册表中的连接。 此连接必须引用 Atlas 集群。 如果您定义 options.dlq 字段,则此字段是必需的。 |
options.dlq.db | 字符串 | 可选的 | options.dlq.connectionName 中指定的集群上的 Atlas 数据库的名称。 如果您定义options.dlq 字段,则此字段是必需的。 |
options.dlq.coll | 字符串 | 可选的 | options.dlq.db 中指定的数据库中的集合名称。 如果您定义options.dlq 字段,则此字段是必需的。 |
连接到您的Atlas Stream Processing实例。
使用与您的 实例关联的连接stringAtlas Stream Processing mongosh
,通过 进行连接。
例子
以下命令使用 SCRAM-SHA-256 身份验证以名为streamOwner
的用户身份连接到Atlas Stream Processing实例:
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
根据提示提供用户密码。
定义管道。
在mongosh
提示中,将包含要应用于名为pipeline
的变量的聚合阶段的数组赋值。
以下示例使用连接注册表中myKafka
连接中的stuff
主题作为$source
,匹配temperature
字段值为46
的记录,并将已处理的消息发出到output
连接注册表中mySink
连接的主题:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
启动流处理器
要使用mongosh
启动现有的流处理器,请使用sp.<streamprocessor>.start()
方法。 <streamprocessor>
必须是为当前流处理实例定义的流处理器名称。
例如,要启动名为proc01
的流处理器,请运行以下命令:
sp.proc01.start()
此方法返回:
true
如果流处理器存在且当前未运行。false
如果您尝试启动不存在或存在但当前正在运行的流处理器。
停止流处理器
要使用mongosh
停止现有的流处理器,请使用sp.<streamprocessor>.stop()
方法。 <streamprocessor>
必须是为当前流处理实例定义的当前正在运行的流处理器的名称。
例如,要停止名为proc01
的流处理器,请运行以下命令:
sp.proc01.stop()
此方法返回:
true
如果流处理器存在且当前正在运行。false
如果流处理器不存在,或者流处理器当前未运行。
删除流处理器
要使用mongosh
删除现有的流处理器,请使用sp.<streamprocessor>.drop()
方法。 <streamprocessor>
必须是为当前流处理实例定义的流处理器名称。
例如,要删除名为proc01
的流处理器,请运行以下命令:
sp.proc01.drop()
此方法返回:
true
如果流处理器存在。false
如果流处理器不存在。
删除流处理器时,Atlas Stream Processing 为其预配的所有资源以及所有已保存的状态都将被销毁。
列出可用的流处理器
要使用mongosh
列出当前流处理实例上的所有可用流处理器,请使用sp.listStreamProcessors()
方法。它返回一个文档列表,其中包含与每个流处理器关联的名称、开始时间、当前状态和管道。它具有以下语法:
sp.listStreamProcessors(<filter>)
<filter>
是一个文档,指定要按哪些字段筛选列表。
例子
以下示例显示了未经筛选的请求的返回值:
sp.listStreamProcessors()
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }, 28 { 29 id: '0218', 30 name: "proc02", 31 last_modified: ISODate("2023-03-21T20:17:33.601Z"), 32 state: "STOPPED", 33 error_msg: '', 34 pipeline: [ 35 { 36 $source: { 37 connectionName: "myKafka", 38 topic: "things" 39 } 40 }, 41 { 42 $match: { 43 temperature: 41 44 } 45 }, 46 { 47 $emit: { 48 connectionName: "mySink", 49 topic: "results", 50 } 51 } 52 ], 53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z") 54 }
如果您在同一Atlas Stream Processing实例上再次运行该命令,并筛选"state"
为"running"
的情况,您将看到以下输出:
sp.listStreamProcessors({"state": "running"})
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }
来自流处理器的样本
要使用mongosh
从现有流处理器返回采样结果数组给STDOUT
,请使用sp.<streamprocessor>.sample()
方法。 <streamprocessor>
必须是为当前流处理实例定义的当前正在运行的流处理器的名称。例如,以下命令将从名为proc01
的流处理器中进行采样。
sp.proc01.sample()
此命令会持续运行,直到您使用CTRL-C
取消它,或者直到返回的样本大小累计达到40 MB。流处理器以以下形式的_dlqMessage
文档报告样本中的无效文档:
{ _dlqMessage: { _stream_meta: { source: { type: "<type>" } }, errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>' } }
您可以使用这些消息来诊断数据卫生问题,而无需定义死信队列集合。
查看流处理器的统计信息
要使用mongosh
返回总结现有流处理器当前状态的文档,请使用sp.<streamprocessor>.stats()
方法。 streamprocessor
必须是为当前流处理实例定义的当前正在运行的流处理器的名称。它具有以下语法:
sp.<streamprocessor>.stats({options: {<options>}})
其中options
是具有以下字段的可选文档:
字段 | 类型 | 说明 |
---|---|---|
scale | 整型 | 用于输出中项目大小的单位。 默认情况下,Atlas Stream Processing 显示项目大小(以字节为单位)。 要以 KB 为单位显示,请指定 scale 为1024 。 |
verbose | 布尔 | 指定输出文档详细程度的标志。 如果设置为 true ,则输出文档包含一个子文档,其中报告管道中每个操作符的统计信息。 默认为false 。 |
输出文档包含以下字段:
字段 | 类型 | 说明 |
---|---|---|
ns | 字符串 | 定义流处理器的命名空间。 |
stats | 对象 | 描述流处理器操作状态的文档。 |
stats.name | 字符串 | 流处理器的名称。 |
stats.status | 字符串 | 流处理器的状态。 此字段可为以下值:
|
stats.scaleFactor | 整型 | 大小字段的显示比例。如果设置为 1 ,大小以字节为单位显示。 如果设置为1024 ,则大小以千字节为单位显示。 |
stats.inputMessageCount | 整型 | 发布到流的文档数量。 文档在通过 $source 阶段(而不是通过整个管道时)才被视为已“发布”到流。 |
stats.inputMessageSize | 整型 | 发布到流的字节数或千字节数。 字节在经过 $source 阶段(而不是经过整个管道时)被视为已“发布”到流。 |
stats.outputMessageCount | 整型 | 该流处理的文档数量。 文档一旦通过整个管道,就被视为已被流“处理”。 |
stats.outputMessageSize | 整型 | 流处理的字节数或千字节数。 字节一旦通过整个管道,就被视为已被流“处理”。 |
stats.dlqMessageCount | 整型 | |
stats.dlqMessageSize | 整型 | |
stats.changeStreamTimeDifferenceSecs | 整型 | 最新变更流恢复令牌表示的事件时间与oplog 中的最新事件之间的差值(以秒为单位)。 |
stats.changeStreamState | token | 最新的变更流恢复令牌。仅适用于具有变更流源的流处理器。 |
stats.stateSize | 整型 | Windows 用于存储处理器状态的字节数。 |
stats.watermark | 整型 | 当前水印的时间戳。 |
stats.operatorStats | 阵列 | 处理器管道中每个操作符的统计信息。 仅当您传入
此外,
|
stats.operatorStats.maxMemoryUsage | 整型 | 操作符的最大内存使用量(以字节或千字节为单位)。 |
stats.operatorStats.executionTime | 整型 | 操作符的总执行时间(以秒为单位)。 |
stats.kafkaPartitions | 阵列 | |
stats.kafkaPartitions.partition | 整型 | Apache Kafka 主题分区号。 |
stats.kafkaPartitions.currentOffset | 整型 | 流处理器在指定分区上的偏移量。 该值等于流处理器处理的上一个偏移量加上 1 。 |
stats.kafkaPartitions.checkpointOffset | 整型 | 流处理器上次提交给 Apache Kafka 的偏移量 代理和指定分区的检查点。通过此偏移量的所有消息都记录在最后一个检查点中。 |
例如,以下显示了名为inst01
的Atlas Stream Processing实例上名为proc01
的流处理器的状态,其中项大小以 KB 为单位:
sp.proc01.stats(1024) { ok: 1, ns: 'inst01', stats: { name: 'proc01', status: 'running', scaleFactor: Long("1"), inputMessageCount: Long("706028"), inputMessageSize: 958685236, outputMessageCount: Long("46322"), outputMessageSize: 85666332, dlqMessageCount: Long("0"), dlqMessageSize: Long("0"), stateSize: Long("2747968"), watermark: ISODate("2023-12-14T14:35:32.417Z"), ok: 1 }, }