Visão geral
Você pode adicionar um provedor de autenticação personalizado implementando a interface com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider . Você deve colocar seu arquivo JAR de classe personalizada na pasta lib no seu sistema do Kafka Connect.
Defina as seguintes propriedades de autenticação para configurar o provedor de autenticação:
- mongo.custom.auth.mechanism.enable: definido como- true
- mongo.custom.auth.mechanism.providerClass: defina para o nome da classe qualificada da classe de implementação
- (Opcional) - mongodbaws.auth.mechanism.roleArn: defina como um nome de recurso da Amazon (ARN)
Exemplo de autenticação IAM Amazon Web Services
Este exemplo fornece um provedor de autenticação personalizado compatível com Amazon Web Services IAM. O código a seguir mostra o arquivo JAR do provedor de autenticação personalizada:
package com.mongodb; import java.util.Map; import java.util.function.Supplier; import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.securitytoken.AWSSecurityTokenService; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder; import com.amazonaws.services.securitytoken.model.AssumeRoleRequest; import com.amazonaws.services.securitytoken.model.AssumeRoleResult; import com.amazonaws.services.securitytoken.model.Credentials; import com.amazonaws.util.StringUtils; public class SampleAssumeRoleCredential implements CustomCredentialProvider {   public SampleAssumeRoleCredential() {}      public MongoCredential getCustomCredential(Map<?, ?> map) {     AWSCredentialsProvider provider = new DefaultAWSCredentialsProviderChain();     Supplier<AwsCredential> awsFreshCredentialSupplier = () -> {       AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard()           .withCredentials(provider)           .withRegion("us-east-1")           .build();       AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600)           .withRoleArn((String)map.get("mongodbaws.auth.mechanism.roleArn"))           .withRoleSessionName("Test_Session");       AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest);       Credentials creds = assumeRoleResult.getCredentials();       // Add your code to fetch new credentials       return new AwsCredential(creds.getAccessKeyId(), creds.getSecretAccessKey(), creds.getSessionToken());     };     return MongoCredential.createAwsCredential(null, null)         .withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier);   }      // Validates presence of an ARN      public void validate(Map<?, ?> map) {     String roleArn = (String) map.get("mongodbaws.auth.mechanism.roleArn");     if (StringUtils.isNullOrEmpty(roleArn)) {       throw new RuntimeException("Invalid value set for customProperty");     }   }   // Initializes the custom provider      public void init(Map<?, ?> map) {   } } 
Compile o arquivo JAR e coloque-o na pasta lib em seu sistema.
Observação
Para visualizar um exemplo de um pom.xml arquivo que pode construir o JAR completo contendo a classe de implementação, consulte o arquivo README do repositório do Github do Conector Kafka.
Em seguida, configure seu conector de origem ou coletor para incluir o método de autenticação personalizado. As seguintes propriedades de configuração definem um conector de coletor que conecta o Kafka Connector ao MongoDB Atlas usando a autenticação Amazon Web Services IAM:
{    "name": "mongo-tutorial-sink",    "config": {      "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",      "topics": "<topic>",      "connection.uri": "<connection string>?authSource=%24external&authMechanism=MONGODB-AWS&retryWrites=true&w=majority",      "key.converter": "org.apache.kafka.connect.storage.StringConverter",      "value.converter": "org.apache.kafka.connect.json.JsonConverter",      "value.converter.schemas.enable": false,      "database": "<db>",      "collection": "<collection>",      "mongo.custom.auth.mechanism.enable": "true",      "mongo.custom.auth.mechanism.providerClass": "com.mongodb.SampleAssumeRoleCredential",      "mongodbaws.auth.mechanism.roleArn": "<AWS IAM ARN>"    } } 
Neste exemplo, o valor roleArn é a função IAM do grupo de usuários que tem acesso ao MongoDB Atlas. No console do Amazon Web Services IAM, a conta do IAM que está executando o Kafka Connect tem AssumeRole permissões para o grupo de Atlas user .