Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Docs 菜单
Docs 主页
/
Atlas
/

托管流处理器

Atlas Stream Processing 流处理器将唯一命名的流聚合管道的逻辑应用于您的流数据。Atlas Stream Processing 将每个流处理器定义保存到持久存储中,以便重复使用。您只能在存储其定义的流处理实例中使用给定的流处理器。Atlas Stream Processing 支持每个工作线程多达 4 个流处理器。对于超出此限制的额外处理器,Atlas Stream Processing 将分配新的资源。

要创建和管理流处理器,您必须具备:

许多流处理器命令要求您在方法调用中指定相关流处理器的名称。 以下部分中描述的语法假定严格为字母数字名称。 如果流处理器的名称包含非字母数字字符,例如连字符 ( - ) 或句点 ( . ),则必须将名称用方括号 ( [] ) 和双引号 ( "" ) 括在方法调用,如sp.["special-name-stream"].stats()中。

您可以使用sp.process() 中的mongosh 方法以交互方式创建流处理器。您以交互方式创建的流处理器表现出以下行为:

  • 将输出和死信队列文档写入shell

  • 创建后立即开始运行

  • 运行 10 分钟或直到用户停止它们

  • 停止后不要继续

您以交互方式创建的流处理器用于原型设计。要创建持久的流处理器,请参阅创建流处理器。

sp.process() 通过以下语法实现:

sp.process(<pipeline>)
字段
类型
必要性
说明

pipeline

阵列

必需

要应用于流媒体数据的管道

要以交互方式创建流处理器:

1

使用与您的 实例关联的连接stringAtlas Stream Processing mongosh,通过 进行连接。

例子

以下命令使用 x.059 身份验证,以名为 streamOwner 的用户身份连接到流处理实例:

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

根据提示提供用户密码。

2

mongosh 提示符,分配一个数组,其中包含要应用于名为 pipeline 的变量的聚合阶段。

以下示例使用连接注册表中myKafka连接中的stuff主题作为$source ,匹配temperature字段值为46的记录,并将已处理的消息发出到output连接注册表中mySink连接的主题:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

以下命令创建一个流处理器,该处理器应用pipeline中定义的逻辑。

sp.process(pipeline)

要创建一个持续存在直到删除的流处理器,请执行以下操作:

Atlas Administration API 提供了一个用于创建流处理器的端点。

创建一个流处理器

要在Atlas用户界面中创建流处理器,Go转到Atlas项目的 Stream Processing 页面,然后单击流处理实例窗格中的 Configure

您可以选择使用 Visual Builder 或 JSON编辑器来配置流处理器:

1

如果您的流处理实例中已有流处理器,请单击 + Create stream processor 按钮,然后从下拉选项中选择 Visual Builder

可视化构建器将打开并显示一个表单,您可以在其中配置流处理器。

2
3

Source字段中,从 Connection 下拉列表中选择一个连接,用作流处理器的源。

这将打开一个JSON文本框,您可以在其中配置流处理器的 source 阶段。要学习;了解有关 source 阶段语法的更多信息,请参阅 $source

例子

以下 source 阶段对来自预配置 sample_stream_solar 连接的实时数据进行操作:

{
"$source": {
"connectionName": "sample_stream_solar"
}
}
4

Start building your pipeline 窗格中,单击要添加到管道的聚合阶段的按钮。这将打开一个文本框,您可以在其中以JSON格式配置所选的聚合阶段。

如果您的聚合阶段未列出,请单击+ Custom stage 以JSON格式定义支持的聚合阶段。要学习;了解有关流处理聚合阶段及其语法的更多信息,请参阅聚合管道阶段。

例子

以下 $match 阶段匹配预配置的 sample_stream_solar流中的所有文档,其中 obs.watts字段大于 300

{
"$match": {
"obs.watts": { "$gt": 300 }
}
}
5

要向管道添加其他聚合阶段,请单击管道中最后一个阶段下方的 + Add stage below 按钮,然后选择要添加的聚合阶段,或单击 Custom stage 以定义其他支持的聚合阶段。这将打开一个文本框,您可以在其中以JSON格式配置新阶段。

6

Sink字段中,从 Connection 下拉列表中选择目标连接。

Sink字段中,从 Connection 下拉列表中选择一个连接,以将处理后的数据写入其中。

这将打开一个JSON文本框,您可以在其中配置流处理器的 merge 阶段。要学习;了解有关 merge 阶段语法的更多信息,请参阅 $merge

例子

以下 sink 阶段写入名为 demoConnection 连接的连接中的 demoDb.demoColl集合:

{
"$merge": {
"into": {
"connectionName": "demoConnection",
"db": "demoDb",
"coll": "demoColl"
}
}
}
7

流处理器已创建并列在 Stream Processing 页面的 Stream Processors标签页上。

1

如果您的流处理实例中已有流处理器,请单击 + Create stream processor 按钮,然后从下拉选项中选择 Visual Builder

JSON编辑器将打开,并显示一个文本框,您可以在其中以JSON格式配置流处理器。

2

在JSON编辑器文本框中指定流处理器的JSON定义。此定义必须包含流处理器的名称以及以 $source 阶段开始并以 $merge 阶段结束的聚合管道。您可以在 $source$merge 阶段之间包含任意数量的附加聚合阶段。

要学习;了解有关流处理聚合阶段及其语法的更多信息,请参阅聚合管道阶段。

例子

以下 JSON solarDemo$tumblingWindow$groupsample_stream_solar定义创建一个名为10 mongodb1的流处理器,该处理器使用带有嵌套 阶段的 阶段来聚合 秒时间间隔内来自预配置 连接的实时数据,并且将处理后的数据写入名为 的连接中的集合。

{
"name": "solarDemo",
"pipeline": [
{
"$source": {
"connectionName": "sample_stream_solar"
}
},
{
"$tumblingWindow": {
"interval": {
"size": 10,
"unit": "second"
},
"pipeline": [
{
"$group": {
"_id": "$group_id",
"max_watts": { "$max": "$obs.watts" },
"min_watts": { "$min": "$obs.watts" }
}
}
]
}
},
{
"$merge": {
"into": {
"connectionName": "mongodb1",
"db": "solarDb",
"coll": "solarColl"
}
}
}
]
}

要使用 mongosh 创建新的流处理器,请使用 sp.createStreamProcessor() 方法。它具有以下语法:

sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument
类型
必要性
说明

name

字符串

必需

流处理器的逻辑名称。它在流处理实例中必须是唯一的。此名称应仅包含字母数字字符。

pipeline

阵列

必需

要应用于流媒体数据的管道

options

对象

Optional

为流处理器定义各种可选设置的对象。

options.dlq

对象

可选的

为您的 实例分配 死信队列Atlas Stream Processing 的对象。如果您定义options字段,则此字段是必需的。

options.dlq.connectionName

字符串

可选的

人类可读标签,用于标识连接注册表中的连接。 此连接必须引用 Atlas 集群。 如果您定义options.dlq字段,则此字段是必需的。

options.dlq.db

字符串

可选的

options.dlq.connectionName中指定的集群上的 Atlas 数据库的名称。 如果您定义options.dlq字段,则此字段是必需的。

options.dlq.coll

字符串

可选的

options.dlq.db中指定的数据库中的集合名称。 如果您定义options.dlq字段,则此字段是必需的。

1

使用与您的 实例关联的连接stringAtlas Stream Processing mongosh,通过 进行连接。

  1. 在Atlas Stream Processing实例的窗格中,单击 Connect

  2. Connect to your instance 对话框中,选择 Shell标签页。

  3. 复制对话框中显示的连接字符串。 它采用以下格式,其中 <atlas-stream-processing-url> 是流处理实例的URL ,<username> 是具有 atlasAdmin 角色的数据库用户的用户名:

    mongosh "mongodb://<atlas-stream-processing-url>/"
    --tls --authenticationDatabase admin --username <username>
    --password <password>
  4. 将连接字符串粘贴到终端中,并将 <password> 占位符替换为用户的凭证。 按 Enter 键运行它以连接到流处理实例。

例子

以下命令使用 x.059 身份验证,以名为 streamOwner 的用户身份连接到流处理实例。

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

根据提示提供用户密码。

2

mongosh 提示符,分配一个数组,其中包含要应用于名为 pipeline 的变量的聚合阶段。

以下示例管道使用连接注册表中 myKafka 连接中的 stuff 主题作为$source,匹配 temperature 字段值为 46 的记录,并将已处理的消息发出到 output 连接注册表中 mySink 连接的主题:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

mongosh提示符中,分配一个包含以下 DLQ 属性的对象:

  • connectionName

  • 数据库名称

  • 集合名称

以下示例通过metadata.dlq collection 中的cluster01连接定义了一个 DLQ。

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

以下命令创建一个名为proc01的流处理器,它应用pipeline中定义的逻辑。 在处理中引发错误的文档将写入deadLetter中定义的 DLQ。

sp.createStreamProcessor("proc01", pipeline, deadLetter)

注意

Atlas Stream Processing会丢弃处于 stopped 状态且持续 45 天或更长时间的流处理器的内部状态。 当您启动此类处理器时,它的操作和报告统计信息与其初始运行相同。

要启动流处理器:

Atlas Administration API 提供了一个用于启动流处理器的端点。

启动一个流处理器

要在Atlas用户界面中启动流处理器,Go转到Atlas项目的 Stream Processing 页面,然后单击流处理实例窗格中的 Configure,查看为其定义的流处理器列表。

然后,单击流处理器的 Start 图标。

要使用 mongosh 启动现有的流处理器,请使用 sp.processor.start() 方法。

例如,要启动名为proc01的流处理器,请运行以下命令:

sp.proc01.start()
{ "ok" : 1 }

如果流处理器存在且当前未运行,则此方法返回 { "ok": 1 }。如果您为不是 STOPPED 的流处理器调用 sp.processor.start()mongosh 将返回错误。

注意

Atlas Stream Processing会丢弃处于 stopped 状态且持续 45 天或更长时间的流处理器的内部状态。 当您启动此类处理器时,它的操作和报告统计信息与其初始运行相同。

要停止流处理器:

Atlas Administration API 提供了一个用于停止流处理器的端点。

停止一个流处理器

要在Atlas用户界面中暂停流处理器,Go转到Atlas项目的 Stream Processing 页面,然后单击流处理实例窗格中的 Configure,查看为其定义的流处理器列表。

然后,单击流处理器的 Pause 图标。

要使用 mongosh 停止现有的流处理器,请使用 sp.processor.stop() 方法。

例如,要停止名为proc01的流处理器,请运行以下命令:

sp.proc01.stop()
{ "ok" : 1 }

如果流处理器存在且当前正在运行,则此方法返回 { "ok": 1 }。如果您为不是 running 的流处理器调用 sp.processor.stop()mongosh 将返回错误。

您可以修改现有流处理器的以下元素:

要修改流处理器,请执行以下步骤:

  1. 停止流处理器。

  2. 修改流处理器。

  3. 重新启动流处理器。

默认情况下,修改后的处理器会从最后一个检查点恢复。或者,您可以设置 resumeFromCheckpoint=false,在这种情况下,处理器仅保留摘要统计信息。当您修改具有打开窗口的处理器时,这些窗口将在更新的管道上被完全重新计算。

注意

如果您通过使用包含 iscontains 等匹配器表达式的 Operator 更改了已为其配置流处理器状态为失败警报的流处理器的名称,则当匹配器表达式与新名称不匹配时,Atlas 不会为重命名后的流处理器触发警报。要监控重命名后的流处理器,请重新配置该警报。

启用默认设置 resumeFromCheckpoint=true 后,将应用以下限制:

  • 您无法修改 $source 阶段。

  • 您无法修改窗口的间隔时间。

  • 您无法删除窗口。

  • 只有当窗口的内部管道中包含 $group$sort 阶段时,您才能修改带有窗口的管道。

  • 您无法更改现有窗口类型。例如,您无法将 $tumblingWindow 更改为 $hoppingWindow,反之亦然。

  • 附带窗口的处理器可能会在重新计算这些窗口时重新处理某些数据。

Atlas Administration API 提供了一个用于修改流处理器的终结点。

修改一个流处理器

需要 mongosh v2.3.4+。

使用 sp.<streamprocessor>.modify() 命令来修改现有的流处理器。<streamprocessor> 必须是为当前流处理实例定义的已停止流处理器的名称。

例如,要修改名为 proc01 的流处理器,请运行以下命令:

sp.proc1.modify(<pipeline>, {
resumeFromCheckpoint: bool, // optional
name: string, // optional
dlq: string, // optional
}})
sp.createStreamProcessor("foo", [
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
])
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
]);
sp.foo.start();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test",
config: {
startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000)
}
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.stop();
sp.foo.modify({dlq: {}})
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$replaceRoot: {newRoot: "$fullDocument"}},
{$match: {cost: {$gt: 500}}},
{$tumblingWindow: {
interval: {unit: "day", size: 1},
pipeline: [
{$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}}
]
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.start();

要删除流处理器:

Atlas Administration API 提供了一个用于删除流处理器的端点。

删除一个流处理器

要在Atlas用户界面中删除流处理器,GoAtlas项目的 Stream Processing 页面,然后单击流处理实例窗格中的 Configure,查看为其定义的流处理器列表。

然后,单击流处理器的 Delete () 图标。在出现的确认对话框中,键入流处理器的名称 (solarDemo) 以确认要将其删除,然后单击 Delete

要使用 mongosh 删除现有的流处理器,请使用 sp.processor.drop() 方法。

例如,要删除名为proc01的流处理器,请运行以下命令:

sp.proc01.drop()

此方法返回:

  • true 如果流处理器存在。

  • false 如果流处理器不存在。

删除流处理器时,Atlas Stream Processing 为其预配的所有资源以及所有已保存的状态都将被销毁。

列出所有可用的流处理器:

Atlas Administration API 提供了一个用于列出所有可用流处理器的端点。

列出流处理器

要在Atlas用户界面中查看为流处理实例定义的流处理器列表,Go转到Atlas项目的 Stream Processing 页面,然后单击流处理实例窗格中的 Configure

显示流处理器及其状态的列表。

要使用 mongosh 列出当前流处理实例上的所有可用流处理器,请使用 sp.listStreamProcessors() 方法。这将返回一个文档列表,其中包含与每个流处理器关联的名称、开始时间、当前状态和管道。它具有以下语法:

sp.listStreamProcessors(<filter>)

<filter> 是一个文档,指定要按哪些字段筛选列表。

例子

以下示例显示了未经筛选的请求的返回值:

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

如果您在同一Atlas Stream Processing实例上再次运行该命令,并筛选"state""running"的情况,您将看到以下输出:

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

要使用 mongosh 从现有流处理器返回采样结果大量给 STDOUT,请使用 sp.processor.sample() 方法。示例,以下命令从名为 proc01 的流处理器中进行采样。

sp.proc01.sample()

此命令将持续运行,直到您使用 CTRL-C 取消它,或者直到返回的示例累积大小达到 40 MB。流处理器在以下格式的 _dlqMessage 文档中报告示例中的无效文档:

{
_dlqMessage: {
_stream_meta: {
source: {
type: "<type>"
}
},
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
instanceName: '<instanceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

您可以使用这些信息诊断数据卫生状况问题,而无需定义死信队列集合。

注意

Atlas Stream Processing会丢弃处于 stopped 状态且持续 45 天或更长时间的流处理器的内部状态。 当您启动此类处理器时,它的操作和报告统计信息与其初始运行相同。

要查看流处理器的统计信息:

Atlas Administration API 提供了一个端点,用于查看流处理器的统计信息。

获取一个流处理器

要查看流处理器的监控,GoAtlas项目的 Stream Processing 页面并打开 Monitoring标签页。 然后,从页面左上角的 Stream processor(流处理器)下拉列表中选择您的流处理器。

要使用 mongosh 返回总结现有流处理器当前状态的文档,请使用 sp.processor.stats() 方法。它具有以下语法:

sp.<streamprocessor>.stats({options: {<options>}})

其中options是具有以下字段的可选文档:

字段
类型
说明

scale

整型

用于输出中项目大小的单位。 默认情况下,Atlas Stream Processing 显示项目大小(以字节为单位)。 要以 KB 为单位显示,请指定scale1024

verbose

布尔

指定输出文档详细程度的标志。 如果设置为true ,则输出文档包含一个子文档,其中报告管道中每个操作符的统计信息。 默认为false

输出文档包含以下字段:

字段
类型
说明

ns

字符串

定义流处理器的命名空间。

stats

对象

描述流处理器操作状态的文档。

stats.name

字符串

流处理器的名称。

stats.status

字符串

流处理器的状态。 此字段可为以下值:

  • starting

  • running

  • error

  • stopping

stats.scaleFactor

整型

大小字段的显示比例。如果设置为1 ,大小以字节为单位显示。 如果设置为1024 ,则大小以千字节为单位显示。

stats.inputMessageCount

整型

发布到流的文档数量。 文档在通过$source阶段(而不是通过整个管道时)才被视为已“发布”到流。

stats.inputMessageSize

整型

发布到流的字节数或千字节数。 字节在经过$source阶段(而不是经过整个管道时)被视为已“发布”到流。

stats.outputMessageCount

整型

该流处理的文档数量。 文档一旦通过整个管道,就被视为已被流“处理”。

stats.outputMessageSize

整型

流处理的字节数或千字节数。 字节一旦通过整个管道,就被视为已被流“处理”。

stats.dlqMessageCount

整型

stats.dlqMessageSize

整型

stats.changeStreamTimeDifferenceSecs

整型

最新变更流恢复令牌所代表的事件时间与 oplog 中最新事件之间的时间差(以秒为单位)。

stats.changeStreamState

token

最新的变更流恢复令牌。仅适用于具有变更流源的流处理器。

stats.latency

文档

整个流处理器的延迟统计信息。仅当您传入 verbose 选项时, Atlas Stream Processing才会返回此字段。

stats.latency.p50

整型

过去 30 秒内处理的所有文档的估计第 50 个百分位数延迟。如果您的管道包括窗口阶段,延迟测量包括窗口的时间间隔。

示例,如果您的 $tumblingWindow 阶段的间隔为 5 分钟,则延迟测量值将包括这 5 分钟。

stats.latency.p99

整型

过去 30 秒内处理的所有文档的估计第 99 个百分位数延迟。如果您的管道包括窗口阶段,延迟测量包括窗口的时间间隔。

示例,如果您的 $tumblingWindow 阶段的间隔为 5 分钟,则延迟测量值将包括这 5 分钟。

stats.latency.start

datetime

最近 30 秒测量窗口开始的总时间。

stats.latency.end

datetime

最近 30 秒测量窗口结束的总时间。

stats.latency.unit

字符串

计算延迟的时间单位。此值始终为 microseconds

stats.latency.count

整型

流处理器在最近 30 秒测量窗口中处理的文档数。

stats.latency.sum

整型

在最近的 30 秒测量窗口中进行的所有单独延迟测量的总和(以微秒为单位)。

stats.stateSize

整型

Windows 用于存储处理器状态的字节数。

stats.watermark

整型

当前水印的时间戳。

stats.operatorStats

阵列

处理器管道中每个操作符的统计信息。 仅当您传入verbose选项时,Atlas Stream Processing 才会返回此字段。

stats.operatorStats 提供许多核心stats字段的每个操作符版本:

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.latency

  • stats.operatorStats.stateSize

stats.operatorStats 包括以下独特字段:

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTimeMillis

stats.operatorStats 还包括以下字段,前提是您已传入 verbose 选项,并且您的处理器包含一个窗口阶段:

  • stats.minOpenWindowStartTime

  • stats.maxOpenWindowStartTime

stats.operatorStats.maxMemoryUsage

整型

操作符的最大内存使用量(以字节或千字节为单位)。

stats.operatorStats.executionTimeSecs

整型

操作符的总执行时间(以秒为单位)。

stats.minOpenWindowStartTime

日期

最小打开窗口的开始时间。此值是可选的。

stats.maxOpenWindowStartTime

日期

最大开放窗口的开始时间。此值是可选的。

stats.kafkaPartitions

阵列

stats.kafkaPartitions.partition

整型

Apache Kafka主题分区号。

stats.kafkaPartitions.currentOffset

整型

流处理器在指定分区上的偏移量。 该值等于流处理器处理的上一个偏移量加上1

stats.kafkaPartitions.checkpointOffset

整型

流处理器上次提交给Apache Kafka代理的偏移量以及指定分区的检查点。通过此偏移量的所有消息都记录在最后检查点中。

stats.kafkaPartitions.isIdle

布尔

用于指示该分区是否空闲的标志。此值默认为 false

例如,以下显示了名为inst01的Atlas Stream Processing实例上名为proc01的流处理器的状态,其中项大小以 KB 为单位:

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}

后退

管理VPC连接

在此页面上