Atlas Stream Processing 支持与Apache Kafka 的源和目标连接。
添加 Kafka 连接
如要将 Kafka 连接添加到您的流处理实例的连接注册表:
要使用 Atlas CLI 为指定的 Atlas Stream Processing 实例创建一个连接,请运行以下命令:
atlas streams connections create [connectionName] [options]
要了解有关命令语法和参数的更多信息,请参阅 Atlas Streams 连接创建的 Atlas CLI 文档。
AtlasGoStream Processing在Atlas中,Go项目的 页面。
如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含项目的组织。
如果尚未显示,请从导航栏的 Projects 菜单中选择您的项目。
在侧边栏中,单击 Services 标题下的 Stream Processing。
此时将显示 Stream Processing 页面。
添加新连接。
选择 Kafka 连接。
提供 Connection Name。每个连接名称在流处理实例中必须是唯一的。这是用于引用Atlas Stream Processing 聚合中的连接的名称。
选择 Network Access 类型。Atlas Stream Processing 支持 Public IP或VPC Peering 连接。
点击 Public IP 按钮。此网络访问类型无需进一步配置。
单击 VPC Peering 按钮。
将 Enable VPC Peering(启用 VPC 对等互连)打开。Atlas Stream Processing 会自动从您配置的连接中选择适当的 VPC 对等连接。
如果您没有 VPC 对等连接,请配置 Atlas 网络对等连接。
为 Apache Kafka 系统指定一个或多个引导服务器的 IP 地址。
从下拉菜单中选择 Security Protocol Method。
Atlas Stream Processing支持
SASL_PLAINTEXT
、SASL_SSL
和SSL
。SASL_PLAINTEXT
与VPC对等互联不兼容。要使用VPC对等互连,您必须选择SASL_SSL
或SSL
方法。从下拉菜单中选择 SASL Mechanism。
Atlas Stream Processing 支持:
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
提供 Username 用于身份验证。
提供用于身份验证的密码。
单击 Add connection(连接)。
从下拉菜单中选择 SASL Mechanism。
Atlas Stream Processing 支持:
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
单击 Upload 上传您的 Certificate Authority PEM file。
提供 Username 用于身份验证。
提供用于身份验证的密码。
单击 Add connection(连接)。
(可选)如果您使用的是默认Apache Kafka CA 以外的证书颁发机构,请单击 Upload 上传您的 Certificate Authority PEM file。
单击 Upload 上传您的 Client SSL Certificate。
单击 Upload 上传您的 Client SSL Keyfile。
(可选)如果您的客户端 SSL 密钥文件受密码保护,请在 Client key password字段中输入您的密码。
单击 Add connection(连接)。
Atlas Administration API 提供了一个端点,用于将连接添加到连接注册表。
重要
将 Apache Kafka 集群等外部连接添加到连接注册表后,您必须将 Atlas IP 地址添加到该外部连接的访问列表中。有关更多信息,请参阅支持访问 Atlas 控制面或从 Atlas 控制面进行访问。
添加 Kafka 专用链接连接
Atlas Stream Processing 目前支持创建 AWS Private Link 连接至以下:
AWS Confluent 集群
AWS MSK 集群
Microsoft Azure EventHub
AWS Confluent 连接
要创建用于Atlas Stream Processing项目的 AWS Confluent 连接,请执行以下操作:
重要
配置 Confluence集群。
您必须配置 Confluence集群以接受来自Atlas项目的传入连接。
重要
Confluent 仅接受来自Amazon Web Services 的传入连接。要使用 Confluent Private Link 连接,您必须在Amazon Web Services上托管流处理实例。
调用返回群组和地区的Atlas Administration API端点的帐户ID和VPC ID 。请注意
awsAccountId
的值,您将在后续步骤中用到它。在您的 Confluence 帐户中,导航到要连接的集群。 在集群网络界面中,导航到集群网络详细信息。
对于 Confluence专用集群,请提供您选择的名称。 对于Amazon Web Services帐号,提供您之前记下的 awsAccountId
字段的值。
注意
Confluence 无无服务器集群不需要此步骤。
请求连接到您的云提供商。
Atlas Administration API提供一个端点,用于请求为Atlas Stream Processing配置的 Private Link 连接。
提示
或者,您可以使用Atlas CLI请求专用链接连接。
对于使用Atlas CLI进行流处理器连接的专用链接端点,运行以下命令:
atlas streams privateLinks create [options]
要学习;了解有关命令语法和参数的更多信息,请参阅 atlas streams privateLinks create 的Atlas CLI文档。
对于 AWS Confluent 专用链接连接,您必须设置以下键值对:
键 | 值 |
---|---|
| Confluent 集群的 VPC Endpoint service name。 |
| Confluent集群上引导服务器的完全限定域名。 |
| 如果您的集群不使用子域,则必须将其设立为空大量 |
您可以在 Confluence 集群的网络详细信息中找到这些值。
以下示例命令请求连接到 Confluence集群并说明了典型的响应:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/privateLinkConnections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "vendor": "Confluent", "provider": "AWS", "region": "us_east_1", "serviceEndpointId": "com.amazonaws.vpce.us-east-1.vpce-svc-93da685022ee702a9", "dnsDomain": "sample.us-east-1.aws.confluent.cloud", "dnsSubDomain: [ "use1-az1.sample.us-east-1.aws.confluent.cloud", "use1-az2.sample.us-east-1.aws.confluent.cloud", "use1-az4.sample.us-east-1.aws.confluent.cloud" ] }'
{"_id":"6aa12e7ccd660d4b2380b1c1","dnsDomain":"sample.us-east-1.aws.confluent.cloud.","vendor":"Confluent","provider":"AWS","region":"us_east_1","serviceEndpointId":"com.amazonaws.vpce.us-east-1.vpce-svc-93da685022ee702a9"}
发送请求后,请记下响应正文中 _id
字段的值。 您将在后续步骤中用到它。
创建 Atlas 端连接。
重要提示:此示例向您展示如何添加使用 SASL_SSL
进行保护的连接。您还可以使用双向 SSL
保护连接。
有关此API用于 SSL
身份验证的支持字段的详细信息,请参阅Atlas Admin API文档。
键 | 值 |
---|---|
| 云提供商的Kafka引导服务器的IP解决。 |
|
|
|
|
| |
| 与您的 Confluent API 密钥关联的用户名 |
|
|
|
|
|
|
根据需要设置所有其他值。
以下示例命令在Atlas中创建Apache Kafka连接:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/spinstance/connections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "name": "confluent_demo", "bootstrapServers": "slr-ntgrbn.sample.us-east-1.aws.confluent.cloud:9092", "security": { "protocol": "SASL_SSL" }, "authentication": { "mechanism": "PLAIN", "password": "apiSecretDemo", "username": "apiUserDemo" }, "type": "Kafka", "networking": { "access": { "type": "PRIVATE_LINK", "connectionId": "38972b0cbe9c2aa40a30a246" } } }'
AWS MSK 专用链接连接
要创建 AWS Private Link 连接以用于您的 Atlas Stream Processing 项目:
配置您的 AWS MSK 集群。
您必须将 AWS MSK 集群配置为接受来自 Atlas 项目的传入连接。
重要
AWS MSK 仅接受来自 AWS 的传入连接。要使用 AWS MSK 专用链接连接,您必须在 AWS 上托管流处理实例。
使用获取帐户详细信息终结点来检索 AWS 主体身份。您将需要此值用于 AWS MSK 集群策略。
登录 AWS 管理控制台,并导航到 AWS MSK 控制台。确保要连接的集群已启用
multi-VPC connectivity
。单击Properties 、 Security Settings和Edit cluster policy 。
提供您之前检索到的主体身份的完整 ARN 形式作为
Statement.Principal.Aws.[]
的值,并确保策略采用以下形式:{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": [ "arn:aws:iam:123456789012:root" ] }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeCluster", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/testing/de8982fa-8222-4e87-8b20-9bf3cdfa1521-2" } ] }
请求连接到您的云提供商。
Atlas Administration API提供一个端点,用于请求为Atlas Stream Processing配置的 Private Link 连接。
提示
或者,您可以使用Atlas CLI请求专用链接连接。
对于使用Atlas CLI进行流处理器连接的专用链接端点,运行以下命令:
atlas streams privateLinks create [options]
要学习;了解有关命令语法和参数的更多信息,请参阅 atlas streams privateLinks create 的Atlas CLI文档。
对于 AWS MSK 专用链接连接,您必须设置以下键值对:
键 | 值 |
---|---|
| 必须设置为 |
| 必须设置为 |
| 表示 AWS MSK 集群的 Amazon 资源编号的字符串。 |
您可以在 AWS MSK 集群的网络详细信息中找到 ARN。
以下示例命令请求连接到您的 AWS MSK 集群,并展示了一个典型的响应:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/privateLinkConnections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "vendor": "msk", "provider": "AWS", "arn": "1235711"}'
{"_id":"6aa12e7ccd660d4b2380b1c1","dnsDomain":"scram.sample.us-east-1.amazonaws.com","vendor":"msk","provider":"AWS","region":"us_east_1","serviceEndpointId":"com.amazonaws.vpce.us-east-1.vpce-svc-93da685022ee702a9"}
发送请求后,请记下响应正文中 _id
字段的值。 您将在后续步骤中用到它。
创建 Atlas 端连接。
重要提示:此示例向您展示如何添加使用 SASL_SSL
进行保护的连接。您还可以使用双向 SSL
保护连接。
有关此API用于 SSL
身份验证的支持字段的详细信息,请参阅Atlas Admin API文档。
键 | 值 |
---|---|
| 云提供商的Kafka引导服务器的IP解决。 |
|
|
|
|
| 与集群关联的 SCRAM 密码。您必须定义配对的 SCRAM 用户和密码,并使用 AWS Secrets Manager 将其与 AWS MSK 集群关联。 |
| 与您的集群关联的 SCRAM 用户。您必须定义配对的 SCRAM 用户和密码,并使用 AWS Secrets Manager 将其与 AWS MSK 集群关联。 |
|
|
|
|
|
|
根据需要设置所有其他值。
以下示例命令在Atlas中创建Apache Kafka连接:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/spinstance/connections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "name": "msk_demo", "bootstrapServers": "slr-ntgrbn.sample.us-east-1.amazonaws.com:9092", "security": { "protocol": "SASL_SSL" }, "authentication": { "mechanism": "PLAIN", "password": "scramSecretDemo", "username": "scramUserDemo" }, "type": "Kafka", "networking": { "access": { "type": "PRIVATE_LINK", "connectionId": "38972b0cbe9c2aa40a30a246" } } }'
Atlas生成一个私有端点ID。现在,您可以通过单击相应行中的 按钮,在 界面的 标签页下查看 AWS MSKNetwork Access Atlas Stream ProcessingView私有端点的详细信息。
Microsoft Azure EventHub Private Link 连接
要创建用于Atlas Stream Processing项目的Microsoft Azure EventHub Private Link 连接,请执行以下操作:
请提供您的 Azure EventHub 终结点详细信息。
请提供您的Azure 服务终结点 ID。
选择您的Endpoint region 。
选择您的Host name 。
单击 Next, generate endpoint ID
现在,您可以在 Network Access 界面的 Atlas Stream Processing 标签页中,通过单击其行中的 View 按钮,查看 Azure EventHub 私有端点。
请求连接到您的云提供商。
Atlas Administration API提供一个端点,用于请求为Atlas Stream Processing配置的 Private Link 连接。
对于Azure Private Link连接,必须设立以下键值对:
键 | 值 |
---|---|
| 您的 EventHub 命名空间端点。请注意,此值必须是事件中心命名空间的Azure资源经理(ARM) ID ,而不是单个事件中心的 ARM ID 。 |
| 您的 Azure Event Hub 命名空间中引导服务器的完全限定域名及其端口号。该域名符合此处描述的格式。 |
以下示例命令请求连接到Azure事件中心并说明了典型的响应:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/privateLinkConnections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "provider": "AZURE", "region": "US_EAST_2", "serviceEndpointId": "/subscriptions/b82d6aa0-0b0a-ffa3-7c22-e167dc44f5b0/resourceGroups/asp/providers/Microsoft.EventHub/namespaces/sample", "dnsDomain": "sample.servicebus.windows.net" }'
{"_id":"6aa12e7ccd660d4b2380b1c1","dnsDomain":"sample.servicebus.windows.net","provider":"AZURE","region":"US_EAST_2","serviceEndpointId":"/subscriptions/b82d6aa0-0b0a-ffa3-7c22-e167dc44f5b0/resourceGroups/asp/providers/Microsoft.EventHub/namespaces/sample"}
发送请求后,请记下响应正文中 _id
字段的值。 您将在后续步骤中用到它。
在您的云提供商帐户中接受请求的连接。
对于到Azure 的Private Link 连接,请导航到事件中心网络页面并选择Private endpoint connections 标签页。在连接表中,确定新请求的连接并予以批准。
创建 Atlas 端连接。
重要提示:此示例向您展示如何添加使用 SASL_SSL
进行保护的连接。您还可以使用双向 SSL
保护连接。
有关此API用于 SSL
身份验证的支持字段的详细信息,请参阅Atlas Admin API文档。
键 | 值 |
---|---|
| 云提供商的Kafka引导服务器的IP解决。 |
|
|
|
|
| |
|
|
|
|
|
|
|
|
根据需要设置所有其他值。
以下示例命令在Atlas中创建Apache Kafka连接:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/spinstance/connections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "name": "eventhubpl33333", "bootstrapServers": "sample.servicebus.windows.net:9093", "security": { "protocol": "SASL_SSL" }, "authentication": { "mechanism": "PLAIN", "password": "Endpoint=sb://sample.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=Irlo3OoRkc27T3ZoGOlbhEOqXQRXzb12+Q2hNXm0lc=", "username": "$ConnectionString" }, "type": "Kafka", "networking": { "access": { "type": "PRIVATE_LINK", "connectionId": "38972b0cbe9c2aa40a30a246" } } }'
Microsoft Azure Confluent 专用链接连接
要创建用于Atlas Stream Processing项目的Microsoft Azure Confluent Private Link 连接,请执行以下操作:
配置 Confluent 集群。
调用streams/accountDetails
终结点以获取您的 Atlas 项目的 Azure 订阅 ID:
curl --location 'http://cloud.mongodb.com/api/atlas/v2/groups/<project_id>/streams/accountDetails?cloudProvider=azure®ionName=<region>' \ --header 'Accept: application/vnd.atlas.2024-11-13+json' { "azureSubscriptionId": "f1a2b3c4-d5e6-87a8-a9b0-c1d2e3f4a5b6", "cidrBlock": "192.168.123.0/21", "virtualNetworkName": "vnet_a1b2c3d4e5f6a7b8c9d0e1f2_xyz987ab", "cloudProvider": "azure" }
添加 PrivateLink 访问权限。
按照Confluent 文档中提供的步骤添加 PrivateLink 访问权限。
注意
您需要提供您的 azureSubscriptionId
。
请求连接到您的云提供商。
键 | 值 |
---|---|
区域 | Confluent集群的区域 |
dnsDomain | 集群网络的 DNS 域。例如: |
azureResourceIds | 您集群的网络使用的每个可用区 (AZ) 中的 Confluent 云专用链接服务终结点的资源 ID。
|
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/privateLinkConnections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2024-11-13+json' \ --data '{ "vendor": "Confluent", "provider": "Azure", "region": "US_EAST_2", "dnsDomain": "abcxyz12345.eastus2.azure.confluent.cloud", "azureResourceIds: [ "/subscriptions/a1b2c3d4-e5f6-7890-abcd-ef1234567890/resourceGroups/d-xyz98/providers/Microsoft.Network/privateLinkServices/d-xyz98-privatelink-1", "/subscriptions/a1b2c3d4-e5f6-7890-abcd-ef1234567890/resourceGroups/d-xyz98/providers/Microsoft.Network/privateLinkServices/d-xyz98-privatelink-2", "/subscriptions/a1b2c3d4-e5f6-7890-abcd-ef1234567890/resourceGroups/d-xyz98/providers/Microsoft.Network/privateLinkServices/d-xyz98-privatelink-3" ] }'
{ "_id": "65f8a3b4c5d6e7f8a9b0c1d2", "azureResourceIds": [ "/subscriptions/a1b2c3d4-e5f6-7890-abcd-ef1234567890/resourceGroups/d-xyz98/providers/Microsoft.Network/privateLinkServices/d-xyz98-privatelink-1", "/subscriptions/a1b2c3d4-e5f6-7890-abcd-ef1234567890/resourceGroups/d-xyz98/providers/Microsoft.Network/privateLinkServices/d-xyz98-privatelink-2", "/subscriptions/a1b2c3d4-e5f6-7890-abcd-ef1234567890/resourceGroups/d-xyz98/providers/Microsoft.Network/privateLinkServices/d-xyz98-privatelink-3" ], "dnsDomain": "abcxyz12345.eastus2.azure.confluent.cloud", "provider": "Azure", "region": "US_EAST_2", "vendor": "Confluent" }
创建 Atlas 端连接。
重要提示:此示例向您展示如何添加使用 SASL_SSL
进行保护的连接。您还可以使用双向 SSL
保护连接。
有关此API用于 SSL
身份验证的支持字段的详细信息,请参阅Atlas Admin API文档。
键 | 值 |
---|---|
| 云提供商的Kafka引导服务器的IP解决。 |
|
|
|
|
| |
| 与您的 Confluent API 密钥关联的用户名 |
|
|
|
|
|
|
根据需要设置所有其他值。
以下示例命令在 Atlas 中创建一个 Apache Kafka 连接:
curl --location 'https://cloud.mongodb.com/api/atlas/v2/groups/8358217d3abb5c76c3434648/streams/spinstance/connections' \ --digest \ --user "slrntglrbn:933fb118-ac62-4991-db05-ee67a3481fde" \ --header 'Content-Type: application/json' \ --header 'Accept: application/vnd.atlas.2023-02-01+json' \ --data '{ "name": "confluent_demo", "bootstrapServers": "slr-ntgrbn.sample.us-east-1.aws.confluent.cloud:9092", "security": { "protocol": "SASL_SSL" }, "authentication": { "mechanism": "PLAIN", "password": "apiSecretDemo", "username": "apiUserDemo" }, "type": "Kafka", "networking": { "access": { "type": "PRIVATE_LINK", "connectionId": "38972b0cbe9c2aa40a30a246" } } }'
配置
用于创建 Kafka 连接的每个界面都允许您为 Kafka 集群提供配置参数。这些配置采用键值对的形式,并与以下内容之一相对应:
Atlas Stream Processing 仅将这些参数传递给您的 Kafka 集群。如果您声明任何未明确允许的参数,Atlas Stream Processing 将忽略它们。
接口 | 配置机制 |
---|---|
Atlas CLI | 请以 |
Atlas Administration API | 以 |
Atlas UI | 在 Add Connection 页的 Configuration File 字段中提供配置。 |
Atlas Stream Processing 支持以下配置: