Atlas Stream Processing 流处理器将唯一命名的流聚合管道的逻辑应用于您的流数据。Atlas Stream Processing 将每个流处理器定义保存到持久存储中,以便重复使用。您只能在存储其定义的流处理实例中使用给定的流处理器。Atlas Stream Processing 支持每个工作线程多达 4 个流处理器。对于超出此限制的额外处理器,Atlas Stream Processing 将分配新的资源。
先决条件
要创建和管理流处理器,您必须具备:
mongosh
2.0或更高版本具有
atlasAdmin
角色的数据库用户,用于创建和运行流处理器Atlas 集群
Considerations
许多流处理器命令要求您在方法调用中指定相关流处理器的名称。 以下部分中描述的语法假定严格为字母数字名称。 如果流处理器的名称包含非字母数字字符,例如连字符 ( -
) 或句点 ( .
),则必须将名称用方括号 ( []
) 和双引号 ( ""
) 括在方法调用,如sp.["special-name-stream"].stats()
中。
以交互方式创建流处理器
您可以使用sp.process()
中的mongosh
方法以交互方式创建流处理器。您以交互方式创建的流处理器表现出以下行为:
将输出和死信队列文档写入shell
创建后立即开始运行
运行 10 分钟或直到用户停止它们
停止后不要继续
您以交互方式创建的流处理器用于原型设计。要创建持久的流处理器,请参阅创建流处理器。
sp.process()
通过以下语法实现:
sp.process(<pipeline>)
字段 | 类型 | 必要性 | 说明 |
---|---|---|---|
| 阵列 | 必需 | 要应用于流媒体数据的管道。 |
要以交互方式创建流处理器:
连接到您的Atlas Stream Processing实例。
使用与您的 实例关联的连接stringAtlas Stream Processing mongosh
,通过 进行连接。
例子
以下命令使用 x.059 身份验证,以名为 streamOwner
的用户身份连接到流处理实例:
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
根据提示提供用户密码。
定义管道。
在 mongosh
提示符,分配一个数组,其中包含要应用于名为 pipeline
的变量的聚合阶段。
以下示例使用连接注册表中myKafka
连接中的stuff
主题作为$source
,匹配temperature
字段值为46
的记录,并将已处理的消息发出到output
连接注册表中mySink
连接的主题:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
创建流处理器
要创建一个持续存在直到删除的流处理器,请执行以下操作:
Atlas Administration API 提供了一个用于创建流处理器的端点。
要在Atlas用户界面中创建流处理器,Go转到Atlas项目的 Stream Processing 页面,然后单击流处理实例窗格中的 Configure。
您可以选择使用 Visual Builder 或 JSON编辑器来配置流处理器:
添加源连接。
在 Source字段中,从 Connection 下拉列表中选择一个连接,用作流处理器的源。
这将打开一个JSON文本框,您可以在其中配置流处理器的 source
阶段。要学习;了解有关 source
阶段语法的更多信息,请参阅 $source
。
例子
以下 source
阶段对来自预配置 sample_stream_solar
连接的实时数据进行操作:
{ "$source": { "connectionName": "sample_stream_solar" } }
(可选)配置其他聚合阶段。
要向管道添加其他聚合阶段,请单击管道中最后一个阶段下方的 + Add stage below 按钮,然后选择要添加的聚合阶段,或单击 Custom stage 以定义其他支持的聚合阶段。这将打开一个文本框,您可以在其中以JSON格式配置新阶段。
添加接收器连接。
在 Sink字段中,从 Connection 下拉列表中选择目标连接。
在 Sink字段中,从 Connection 下拉列表中选择一个连接,以将处理后的数据写入其中。
这将打开一个JSON文本框,您可以在其中配置流处理器的 merge
阶段。要学习;了解有关 merge
阶段语法的更多信息,请参阅 $merge
。
例子
以下 sink
阶段写入名为 demoConnection
连接的连接中的 demoDb.demoColl
集合:
{ "$merge": { "into": { "connectionName": "demoConnection", "db": "demoDb", "coll": "demoColl" } } }
定义流处理器。
在JSON编辑器文本框中指定流处理器的JSON定义。此定义必须包含流处理器的名称以及以 $source
阶段开始并以 $merge
阶段结束的聚合管道。您可以在 $source
和 $merge
阶段之间包含任意数量的附加聚合阶段。
要学习;了解有关流处理聚合阶段及其语法的更多信息,请参阅聚合管道阶段。
例子
以下 JSON solarDemo
$tumblingWindow
$group
sample_stream_solar
定义创建一个名为10 mongodb1
的流处理器,该处理器使用带有嵌套 阶段的 阶段来聚合 秒时间间隔内来自预配置 连接的实时数据,并且将处理后的数据写入名为 的连接中的集合。
{ "name": "solarDemo", "pipeline": [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" } } } ] } }, { "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } ] }
要使用 mongosh
创建新的流处理器,请使用 sp.createStreamProcessor()
方法。它具有以下语法:
sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument | 类型 | 必要性 | 说明 |
---|---|---|---|
| 字符串 | 必需 | 流处理器的逻辑名称。它在流处理实例中必须是唯一的。此名称应仅包含字母数字字符。 |
| 阵列 | 必需 | 要应用于流媒体数据的管道。 |
| 对象 | Optional | 为流处理器定义各种可选设置的对象。 |
| 对象 | 可选的 | |
| 字符串 | 可选的 | 人类可读标签,用于标识连接注册表中的连接。 此连接必须引用 Atlas 集群。 如果您定义 |
| 字符串 | 可选的 |
|
| 字符串 | 可选的 |
|
连接到您的Atlas Stream Processing实例。
使用与您的 实例关联的连接stringAtlas Stream Processing mongosh
,通过 进行连接。
在Atlas Stream Processing实例的窗格中,单击 Connect。
在 Connect to your instance 对话框中,选择 Shell标签页。
复制对话框中显示的连接字符串。 它采用以下格式,其中
<atlas-stream-processing-url>
是流处理实例的URL ,<username>
是具有atlasAdmin
角色的数据库用户的用户名:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> 将连接字符串粘贴到终端中,并将
<password>
占位符替换为用户的凭证。 按 Enter 键运行它以连接到流处理实例。
例子
以下命令使用 x.059 身份验证,以名为 streamOwner
的用户身份连接到流处理实例。
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
根据提示提供用户密码。
定义管道。
在 mongosh
提示符,分配一个数组,其中包含要应用于名为 pipeline
的变量的聚合阶段。
以下示例管道使用连接注册表中 myKafka
连接中的 stuff
主题作为$source
,匹配 temperature
字段值为 46
的记录,并将已处理的消息发出到 output
连接注册表中 mySink
连接的主题:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
启动流处理器
要启动流处理器:
Atlas Administration API 提供了一个用于启动流处理器的端点。
要在Atlas用户界面中启动流处理器,Go转到Atlas项目的 Stream Processing 页面,然后单击流处理实例窗格中的 Configure,查看为其定义的流处理器列表。
然后,单击流处理器的 Start 图标。
要使用 mongosh
启动现有的流处理器,请使用 sp.processor.start()
方法。
例如,要启动名为proc01
的流处理器,请运行以下命令:
sp.proc01.start()
{ "ok" : 1 }
如果流处理器存在且当前未运行,则此方法返回 { "ok": 1 }
。如果您为不是 STOPPED
的流处理器调用 sp.processor.start()
,mongosh
将返回错误。
停止流处理器
要停止流处理器:
Atlas Administration API 提供了一个用于停止流处理器的端点。
要在Atlas用户界面中暂停流处理器,Go转到Atlas项目的 Stream Processing 页面,然后单击流处理实例窗格中的 Configure,查看为其定义的流处理器列表。
然后,单击流处理器的 Pause 图标。
要使用 mongosh
停止现有的流处理器,请使用 sp.processor.stop()
方法。
例如,要停止名为proc01
的流处理器,请运行以下命令:
sp.proc01.stop()
{ "ok" : 1 }
如果流处理器存在且当前正在运行,则此方法返回 { "ok": 1 }
。如果您为不是 running
的流处理器调用 sp.processor.stop()
,mongosh
将返回错误。
修改流处理器
您可以修改现有流处理器的以下元素:
要修改流处理器,请执行以下步骤:
默认情况下,修改后的处理器会从最后一个检查点恢复。或者,您可以设置 resumeFromCheckpoint=false
,在这种情况下,处理器仅保留摘要统计信息。当您修改具有打开窗口的处理器时,这些窗口将在更新的管道上被完全重新计算。
注意
限制
启用默认设置 resumeFromCheckpoint=true
后,将应用以下限制:
您无法修改
$source
阶段。您无法修改窗口的间隔时间。
您无法删除窗口。
只有当窗口的内部管道中包含
$group
或$sort
阶段时,您才能修改带有窗口的管道。您无法更改现有窗口类型。例如,您无法将
$tumblingWindow
更改为$hoppingWindow
,反之亦然。附带窗口的处理器可能会在重新计算这些窗口时重新处理某些数据。
要修改流处理器:
Atlas Administration API 提供了一个用于修改流处理器的终结点。
需要 mongosh
v2.3.4+。
使用 sp.<streamprocessor>.modify()
命令来修改现有的流处理器。<streamprocessor>
必须是为当前流处理实例定义的已停止流处理器的名称。
例如,要修改名为 proc01
的流处理器,请运行以下命令:
sp.proc1.modify(<pipeline>, { resumeFromCheckpoint: bool, // optional name: string, // optional dlq: string, // optional }})
向现有管道添加一个阶段
sp.createStreamProcessor("foo", [ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ]) sp.foo.start();
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ]); sp.foo.start();
修改流处理器的输入源
sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test", config: { startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000) } }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ], {resumeFromCheckpoint: false});
从流处理器中删除死信队列
sp.foo.stop(); sp.foo.modify({dlq: {}}) sp.foo.start();
通过窗口修改流处理器
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$replaceRoot: {newRoot: "$fullDocument"}}, {$match: {cost: {$gt: 500}}}, {$tumblingWindow: { interval: {unit: "day", size: 1}, pipeline: [ {$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}} ] }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ], {resumeFromCheckpoint: false}); sp.foo.start();
删除流处理器
要删除流处理器:
Atlas Administration API 提供了一个用于删除流处理器的端点。
要在Atlas用户界面中删除流处理器,GoAtlas项目的 Stream Processing 页面,然后单击流处理实例窗格中的 Configure,查看为其定义的流处理器列表。
然后,单击流处理器的 Delete () 图标。在出现的确认对话框中,键入流处理器的名称 (solarDemo
) 以确认要将其删除,然后单击 Delete。
要使用 mongosh
删除现有的流处理器,请使用 sp.processor.drop()
方法。
例如,要删除名为proc01
的流处理器,请运行以下命令:
sp.proc01.drop()
此方法返回:
true
如果流处理器存在。false
如果流处理器不存在。
删除流处理器时,Atlas Stream Processing 为其预配的所有资源以及所有已保存的状态都将被销毁。
列出可用的流处理器
列出所有可用的流处理器:
Atlas Administration API 提供了一个用于列出所有可用流处理器的端点。
要在Atlas用户界面中查看为流处理实例定义的流处理器列表,Go转到Atlas项目的 Stream Processing 页面,然后单击流处理实例窗格中的 Configure。
显示流处理器及其状态的列表。
要使用 mongosh
列出当前流处理实例上的所有可用流处理器,请使用 sp.listStreamProcessors()
方法。这将返回一个文档列表,其中包含与每个流处理器关联的名称、开始时间、当前状态和管道。它具有以下语法:
sp.listStreamProcessors(<filter>)
<filter>
是一个文档,指定要按哪些字段筛选列表。
例子
以下示例显示了未经筛选的请求的返回值:
sp.listStreamProcessors()
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }, 28 { 29 id: '0218', 30 name: "proc02", 31 last_modified: ISODate("2023-03-21T20:17:33.601Z"), 32 state: "STOPPED", 33 error_msg: '', 34 pipeline: [ 35 { 36 $source: { 37 connectionName: "myKafka", 38 topic: "things" 39 } 40 }, 41 { 42 $match: { 43 temperature: 41 44 } 45 }, 46 { 47 $emit: { 48 connectionName: "mySink", 49 topic: "results", 50 } 51 } 52 ], 53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z") 54 }
如果您在同一Atlas Stream Processing实例上再次运行该命令,并筛选"state"
为"running"
的情况,您将看到以下输出:
sp.listStreamProcessors({"state": "running"})
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }
来自流处理器的样本
要使用 mongosh
从现有流处理器返回采样结果大量给 STDOUT
,请使用 sp.processor.sample()
方法。示例,以下命令从名为 proc01
的流处理器中进行采样。
sp.proc01.sample()
此命令将持续运行,直到您使用 CTRL-C
取消它,或者直到返回的示例累积大小达到 40 MB。流处理器在以下格式的 _dlqMessage
文档中报告示例中的无效文档:
{ _dlqMessage: { _stream_meta: { source: { type: "<type>" } }, errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', instanceName: '<instanceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
您可以使用这些信息诊断数据卫生状况问题,而无需定义死信队列集合。
查看流处理器的统计信息
要查看流处理器的统计信息:
Atlas Administration API 提供了一个端点,用于查看流处理器的统计信息。
要查看流处理器的监控,GoAtlas项目的 Stream Processing 页面并打开 Monitoring标签页。 然后,从页面左上角的 Stream processor(流处理器)下拉列表中选择您的流处理器。
要使用 mongosh
返回总结现有流处理器当前状态的文档,请使用 sp.processor.stats()
方法。它具有以下语法:
sp.<streamprocessor>.stats({options: {<options>}})
其中options
是具有以下字段的可选文档:
字段 | 类型 | 说明 |
---|---|---|
| 整型 | 用于输出中项目大小的单位。 默认情况下,Atlas Stream Processing 显示项目大小(以字节为单位)。 要以 KB 为单位显示,请指定 |
| 布尔 | 指定输出文档详细程度的标志。 如果设置为 |
输出文档包含以下字段:
字段 | 类型 | 说明 |
---|---|---|
| 字符串 | 定义流处理器的命名空间。 |
| 对象 | 描述流处理器操作状态的文档。 |
| 字符串 | 流处理器的名称。 |
| 字符串 | 流处理器的状态。 此字段可为以下值:
|
| 整型 | 大小字段的显示比例。如果设置为 |
| 整型 | 发布到流的文档数量。 文档在通过 |
| 整型 | 发布到流的字节数或千字节数。 字节在经过 |
| 整型 | 该流处理的文档数量。 文档一旦通过整个管道,就被视为已被流“处理”。 |
| 整型 | 流处理的字节数或千字节数。 字节一旦通过整个管道,就被视为已被流“处理”。 |
| 整型 | |
| 整型 | |
| 整型 | |
| token | 最新的变更流恢复令牌。仅适用于具有变更流源的流处理器。 |
| 文档 | 整个流处理器的延迟统计信息。仅当您传入 |
| 整型 | 过去 30 秒内处理的所有文档的估计第 50 个百分位数延迟。如果您的管道包括窗口阶段,延迟测量包括窗口的时间间隔。 示例,如果您的 |
| 整型 | 过去 30 秒内处理的所有文档的估计第 99 个百分位数延迟。如果您的管道包括窗口阶段,延迟测量包括窗口的时间间隔。 示例,如果您的 |
| datetime | 最近 30 秒测量窗口开始的总时间。 |
| datetime | 最近 30 秒测量窗口结束的总时间。 |
| 字符串 | 计算延迟的时间单位。此值始终为 |
| 整型 | 流处理器在最近 30 秒测量窗口中处理的文档数。 |
| 整型 | 在最近的 30 秒测量窗口中进行的所有单独延迟测量的总和(以微秒为单位)。 |
| 整型 | Windows 用于存储处理器状态的字节数。 |
| 整型 | 当前水印的时间戳。 |
| 阵列 | 处理器管道中每个操作符的统计信息。 仅当您传入
|
| 整型 | 操作符的最大内存使用量(以字节或千字节为单位)。 |
| 整型 | 操作符的总执行时间(以秒为单位)。 |
| 日期 | 最小打开窗口的开始时间。此值是可选的。 |
| 日期 | 最大开放窗口的开始时间。此值是可选的。 |
| 阵列 | Apache Kafka 代理分区的偏移信息。 |
| 整型 | Apache Kafka主题分区号。 |
| 整型 | 流处理器在指定分区上的偏移量。 该值等于流处理器处理的上一个偏移量加上 |
| 整型 | 流处理器上次提交给Apache Kafka代理的偏移量以及指定分区的检查点。通过此偏移量的所有消息都记录在最后检查点中。 |
| 布尔 | 用于指示该分区是否空闲的标志。此值默认为 |
例如,以下显示了名为inst01
的Atlas Stream Processing实例上名为proc01
的流处理器的状态,其中项大小以 KB 为单位:
sp.proc01.stats(1024) { ok: 1, ns: 'inst01', stats: { name: 'proc01', status: 'running', scaleFactor: Long("1"), inputMessageCount: Long("706028"), inputMessageSize: 958685236, outputMessageCount: Long("46322"), outputMessageSize: 85666332, dlqMessageCount: Long("0"), dlqMessageSize: Long("0"), stateSize: Long("2747968"), watermark: ISODate("2023-12-14T14:35:32.417Z"), ok: 1 }, }