Overview
カスタム認証プロバイダを追加するには、  com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProviderインターフェースを実装します。 カスタム クラス JAR ファイルは、 Kafka Connect 配置のlibフォルダーに配置する必要があります。
認証プロバイダを設定するには、次の認証プロパティを設定します。
- mongo.custom.auth.mechanism.enable: に設定- true
- mongo.custom.auth.mechanism.providerClass: 実装クラスの修飾クラス名に設定
- (任意) - mongodbaws.auth.mechanism.roleArn: Amazon リソース名(ARN)に設定
Amazon Web Services IAM 認証の例
この例では、 Amazon Web Services IAM をサポートするカスタム認証プロバイダを提供します。 次のコードは、カスタム認証プロバイダーの JAR ファイルを示しています。
package com.mongodb; import java.util.Map; import java.util.function.Supplier; import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.securitytoken.AWSSecurityTokenService; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder; import com.amazonaws.services.securitytoken.model.AssumeRoleRequest; import com.amazonaws.services.securitytoken.model.AssumeRoleResult; import com.amazonaws.services.securitytoken.model.Credentials; import com.amazonaws.util.StringUtils; public class SampleAssumeRoleCredential implements CustomCredentialProvider {   public SampleAssumeRoleCredential() {}      public MongoCredential getCustomCredential(Map<?, ?> map) {     AWSCredentialsProvider provider = new DefaultAWSCredentialsProviderChain();     Supplier<AwsCredential> awsFreshCredentialSupplier = () -> {       AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard()           .withCredentials(provider)           .withRegion("us-east-1")           .build();       AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600)           .withRoleArn((String)map.get("mongodbaws.auth.mechanism.roleArn"))           .withRoleSessionName("Test_Session");       AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest);       Credentials creds = assumeRoleResult.getCredentials();       // Add your code to fetch new credentials       return new AwsCredential(creds.getAccessKeyId(), creds.getSecretAccessKey(), creds.getSessionToken());     };     return MongoCredential.createAwsCredential(null, null)         .withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier);   }      // Validates presence of an ARN      public void validate(Map<?, ?> map) {     String roleArn = (String) map.get("mongodbaws.auth.mechanism.roleArn");     if (StringUtils.isNullOrEmpty(roleArn)) {       throw new RuntimeException("Invalid value set for customProperty");     }   }   // Initializes the custom provider      public void init(Map<?, ?> map) {   } } 
JAR ファイルをコンパイルし、配置のlibフォルダーに配置します。
注意
実装クラスを含む完全な JAR を構築できる pom.xmlファイルの例については、 Kafka コネクタ GithubリポジトリREADMEファイル を参照してください。
次に、カスタム認証方法を含めるようにソース コネクタまたは Sink Connector を構成します。 以下の設定プロパティは、 IAM 認証を使用して を に接続する Sink Connector を定義します。Kafka ConnectorMongoDB AtlasAmazon Web Services
{    "name": "mongo-tutorial-sink",    "config": {      "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",      "topics": "<topic>",      "connection.uri": "<connection string>?authSource=%24external&authMechanism=MONGODB-AWS&retryWrites=true&w=majority",      "key.converter": "org.apache.kafka.connect.storage.StringConverter",      "value.converter": "org.apache.kafka.connect.json.JsonConverter",      "value.converter.schemas.enable": false,      "database": "<db>",      "collection": "<collection>",      "mongo.custom.auth.mechanism.enable": "true",      "mongo.custom.auth.mechanism.providerClass": "com.mongodb.SampleAssumeRoleCredential",      "mongodbaws.auth.mechanism.roleArn": "<AWS IAM ARN>"    } } 
この例では、 roleArn値は MongoDB Atlas へのアクセス権を持つユーザーグループの IAM ロールです。 Amazon Web Services IAM コンソールでは、 Kafka Connect を実行している IAM アカウントはAtlas userグループに対する AssumeRole 権限を持っています。