Docs 菜单
Docs 主页
/
MongoDB Atlas
/

托管流处理器

在此页面上

Atlas Stream Processing 流处理器将唯一命名的流聚合管道的逻辑应用于流数据。 Atlas Stream Processing 将每个流处理器定义保存到持久存储中,以便重复使用。您只能在存储其定义的流处理实例中使用给定的流处理器。Atlas Stream Processing 最多支持每个工作线程4流处理器。对于超过此限制的其他处理器,Atlas Stream Processing 会分配新资源。

要创建和管理流处理器,您必须具备:

许多流处理器命令要求您在方法调用中指定相关流处理器的名称。 以下部分中描述的语法假定严格为字母数字名称。 如果流处理器的名称包含非字母数字字符,例如连字符 ( - ) 或句点 ( . ),则必须将名称用方括号 ( [] ) 和双引号 ( "" ) 括在方法调用,如sp.["special-name-stream"].stats()中。

您可以使用sp.process()方法以交互方式创建流处理器。您以交互方式创建的流处理器表现出以下行为:

  • 将输出和死信队列文档写入shell

  • 创建后立即开始运行

  • 运行10分钟或直到用户停止

  • 停止后不要继续

您以交互方式创建的流处理器用于原型设计。要创建持久流处理器,请参阅创建流处理器。

sp.process() 通过以下语法实现:

sp.process(<pipeline>)
字段
类型
必要性
说明
pipeline
阵列
必需
要应用于流媒体数据的管道
1

使用与您的 实例关联的连接stringAtlas Stream Processing mongosh,通过 进行连接。

例子

以下命令使用 SCRAM-SHA-256 身份验证以名为streamOwner的用户身份连接到Atlas Stream Processing实例:

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

根据提示提供用户密码。

2

mongosh提示中,将包含要应用于名为pipeline的变量的聚合阶段的数组赋值。

以下示例使用连接注册表中myKafka连接中的stuff主题作为$source ,匹配temperature字段值为46的记录,并将已处理的消息发出到output连接注册表中mySink连接的主题:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

以下命令创建一个流处理器,该处理器应用pipeline中定义的逻辑。

sp.process(pipeline)

要使用mongosh创建新的流处理器,请使用sp.createStreamProcessor()方法。 它具有以下语法:

sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument
类型
必要性
说明
name
字符串
必需
流处理器的逻辑名称。它在流处理实例中必须是唯一的。此名称应仅包含字母数字字符。
pipeline
阵列
必需
要应用于流媒体数据的管道
options
对象
Optional
为流处理器定义各种可选设置的对象。
options.dlq
对象
可选的
为Atlas Stream Processing实例分配死信队列(DLQ)的对象。如果您定义options字段,则此字段是必需的。
options.dlq.connectionName
字符串
可选的
人类可读标签,用于标识连接注册表中的连接。 此连接必须引用 Atlas 集群。 如果您定义options.dlq字段,则此字段是必需的。
options.dlq.db
字符串
可选的
options.dlq.connectionName中指定的集群上的 Atlas 数据库的名称。 如果您定义options.dlq字段,则此字段是必需的。
options.dlq.coll
字符串
可选的
options.dlq.db中指定的数据库中的集合名称。 如果您定义options.dlq字段,则此字段是必需的。
1

使用与您的 实例关联的连接stringAtlas Stream Processing mongosh,通过 进行连接。

例子

以下命令使用 SCRAM-SHA-256 身份验证以名为streamOwner的用户身份连接到Atlas Stream Processing实例:

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

根据提示提供用户密码。

2

mongosh提示中,将包含要应用于名为pipeline的变量的聚合阶段的数组赋值。

以下示例使用连接注册表中myKafka连接中的stuff主题作为$source ,匹配temperature字段值为46的记录,并将已处理的消息发出到output连接注册表中mySink连接的主题:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

mongosh提示符中,分配一个包含以下 DLQ 属性的对象:

  • connectionName

  • 数据库名称

  • 集合名称

以下示例通过metadata.dlq collection 中的cluster01连接定义了一个 DLQ。

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

以下命令创建一个名为proc01的流处理器,它应用pipeline中定义的逻辑。 在处理中引发错误的文档将写入deadLetter中定义的 DLQ。

sp.createStreamProcessor("proc01", pipeline, deadLetter)

要使用mongosh启动现有的流处理器,请使用sp.<streamprocessor>.start()方法。 <streamprocessor>必须是为当前流处理实例定义的流处理器名称。

例如,要启动名为proc01的流处理器,请运行以下命令:

sp.proc01.start()

此方法返回:

  • true 如果流处理器存在且当前未运行。

  • false 如果您尝试启动不存在或存在但当前正在运行的流处理器。

要使用mongosh停止现有的流处理器,请使用sp.<streamprocessor>.stop()方法。 <streamprocessor>必须是为当前流处理实例定义的当前正在运行的流处理器的名称。

例如,要停止名为proc01的流处理器,请运行以下命令:

sp.proc01.stop()

此方法返回:

  • true 如果流处理器存在且当前正在运行。

  • false 如果流处理器不存在,或者流处理器当前未运行。

要使用mongosh删除现有的流处理器,请使用sp.<streamprocessor>.drop()方法。 <streamprocessor>必须是为当前流处理实例定义的流处理器名称。

例如,要删除名为proc01的流处理器,请运行以下命令:

sp.proc01.drop()

此方法返回:

  • true 如果流处理器存在。

  • false 如果流处理器不存在。

删除流处理器时,Atlas Stream Processing 为其预配的所有资源以及所有已保存的状态都将被销毁。

要使用mongosh列出当前流处理实例上的所有可用流处理器,请使用sp.listStreamProcessors()方法。它返回一个文档列表,其中包含与每个流处理器关联的名称、开始时间、当前状态和管道。它具有以下语法:

sp.listStreamProcessors(<filter>)

<filter> 是一个文档,指定要按哪些字段筛选列表。

例子

以下示例显示了未经筛选的请求的返回值:

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

如果您在同一Atlas Stream Processing实例上再次运行该命令,并筛选"state""running"的情况,您将看到以下输出:

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

要使用mongosh从现有流处理器返回采样结果数组给STDOUT ,请使用sp.<streamprocessor>.sample()方法。 <streamprocessor>必须是为当前流处理实例定义的当前正在运行的流处理器的名称。例如,以下命令将从名为proc01的流处理器中进行采样。

sp.proc01.sample()

此命令会持续运行,直到您使用CTRL-C取消它,或者直到返回的样本大小累计达到40 MB。流处理器以以下形式的_dlqMessage文档报告样本中的无效文档:

{
_dlqMessage: {
_stream_meta: {
source: {
type: "<type>"
}
},
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>'
}
}

您可以使用这些消息来诊断数据卫生问题,而无需定义死信队列集合。

要使用mongosh返回总结现有流处理器当前状态的文档,请使用sp.<streamprocessor>.stats()方法。 streamprocessor必须是为当前流处理实例定义的当前正在运行的流处理器的名称。它具有以下语法:

sp.<streamprocessor>.stats({options: {<options>}})

其中options是具有以下字段的可选文档:

字段
类型
说明
scale
整型
用于输出中项目大小的单位。 默认情况下,Atlas Stream Processing 显示项目大小(以字节为单位)。 要以 KB 为单位显示,请指定scale1024
verbose
布尔
指定输出文档详细程度的标志。 如果设置为true ,则输出文档包含一个子文档,其中报告管道中每个操作符的统计信息。 默认为false

输出文档包含以下字段:

字段
类型
说明
ns
字符串
定义流处理器的命名空间。
stats
对象
描述流处理器操作状态的文档。
stats.name
字符串
流处理器的名称。
stats.status
字符串

流处理器的状态。 此字段可为以下值:

  • starting

  • running

  • error

  • stopping

stats.scaleFactor
整型
大小字段的显示比例。如果设置为1 ,大小以字节为单位显示。 如果设置为1024 ,则大小以千字节为单位显示。
stats.inputMessageCount
整型
发布到流的文档数量。 文档在通过$source阶段(而不是通过整个管道时)才被视为已“发布”到流。
stats.inputMessageSize
整型
发布到流的字节数或千字节数。 字节在经过$source阶段(而不是经过整个管道时)被视为已“发布”到流。
stats.outputMessageCount
整型
该流处理的文档数量。 文档一旦通过整个管道,就被视为已被流“处理”。
stats.outputMessageSize
整型
流处理的字节数或千字节数。 字节一旦通过整个管道,就被视为已被流“处理”。
stats.dlqMessageCount
整型
stats.dlqMessageSize
整型
stats.changeStreamTimeDifferenceSecs
整型
stats.changeStreamState
token
最新的变更流恢复令牌。仅适用于具有变更流源的流处理器。
stats.stateSize
整型
Windows 用于存储处理器状态的字节数。
stats.watermark
整型
当前水印的时间戳。
stats.operatorStats
阵列

处理器管道中每个操作符的统计信息。 仅当您传入verbose选项时,Atlas Stream Processing 才会返回此字段。

stats.operatorStats 提供许多核心stats字段的每个操作符版本:

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.stateSize

此外, stats.operatorStats还包括以下唯一字段:

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTime

stats.operatorStats.maxMemoryUsage
整型
操作符的最大内存使用量(以字节或千字节为单位)。
stats.operatorStats.executionTime
整型
操作符的总执行时间(以秒为单位)。
stats.kafkaPartitions
阵列
stats.kafkaPartitions.partition
整型
Apache Kafka 主题分区号。
stats.kafkaPartitions.currentOffset
整型
流处理器在指定分区上的偏移量。 该值等于流处理器处理的上一个偏移量加上1
stats.kafkaPartitions.checkpointOffset
整型
流处理器上次提交给 Apache Kafka 的偏移量 代理和指定分区的检查点。通过此偏移量的所有消息都记录在最后一个检查点中。

例如,以下显示了名为inst01的Atlas Stream Processing实例上名为proc01的流处理器的状态,其中项大小以 KB 为单位:

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}

后退

管理连接