Overview
Puedes añadir un proveedor de autenticación personalizado implementando la interfaz com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider. Debes colocar el archivo JAR de tu clase personalizada en la carpeta lib en tu implementación de Kafka Connect.
Establece las siguientes propiedades de autenticación para configurar el proveedor de autenticación:
mongo.custom.auth.mechanism.enableconfigurado entruemongo.custom.auth.mechanism.providerClassconfigurado con el nombre de clase calificado de la clase de implementación(Opcional)
mongodbaws.auth.mechanism.roleArnconfigurado en un Nombre de recurso de Amazon (ARN)
Ejemplo de autenticación de AWS IAM
Este ejemplo proporciona un proveedor de autenticación personalizado que es compatible con AWS IAM. El siguiente código muestra el archivo JAR del proveedor de autenticación personalizado:
package com.mongodb; import java.util.Map; import java.util.function.Supplier; import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.securitytoken.AWSSecurityTokenService; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder; import com.amazonaws.services.securitytoken.model.AssumeRoleRequest; import com.amazonaws.services.securitytoken.model.AssumeRoleResult; import com.amazonaws.services.securitytoken.model.Credentials; import com.amazonaws.util.StringUtils; public class SampleAssumeRoleCredential implements CustomCredentialProvider { public SampleAssumeRoleCredential() {} public MongoCredential getCustomCredential(Map<?, ?> map) { AWSCredentialsProvider provider = new DefaultAWSCredentialsProviderChain(); Supplier<AwsCredential> awsFreshCredentialSupplier = () -> { AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard() .withCredentials(provider) .withRegion("us-east-1") .build(); AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600) .withRoleArn((String)map.get("mongodbaws.auth.mechanism.roleArn")) .withRoleSessionName("Test_Session"); AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest); Credentials creds = assumeRoleResult.getCredentials(); // Add your code to fetch new credentials return new AwsCredential(creds.getAccessKeyId(), creds.getSecretAccessKey(), creds.getSessionToken()); }; return MongoCredential.createAwsCredential(null, null) .withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier); } // Validates presence of an ARN public void validate(Map<?, ?> map) { String roleArn = (String) map.get("mongodbaws.auth.mechanism.roleArn"); if (StringUtils.isNullOrEmpty(roleArn)) { throw new RuntimeException("Invalid value set for customProperty"); } } // Initializes the custom provider public void init(Map<?, ?> map) { } }
Compila el archivo JAR y colócalo en la carpeta lib de tu implementación.
Nota
Para ver un ejemplo de un archivo pom.xml que pueda compilar el JAR completo que contenga la clase de implementación, consulte el archivo LEAME del repositorio de GitHub del Conector Kafka.
A continuación, siguiente, configura tu conector de origen o destino para incluir el método de autenticación personalizado. Las siguientes propiedades de configuración definen un conector de sink que conecta el Kafka Connector a MongoDB Atlas mediante la autenticación AWS IAM:
{ "name": "mongo-tutorial-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "<topic>", "connection.uri": "<connection string>?authSource=%24external&authMechanism=MONGODB-AWS&retryWrites=true&w=majority", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "database": "<db>", "collection": "<collection>", "mongo.custom.auth.mechanism.enable": "true", "mongo.custom.auth.mechanism.providerClass": "com.mongodb.SampleAssumeRoleCredential", "mongodbaws.auth.mechanism.roleArn": "<AWS IAM ARN>" } }
En este ejemplo, el valor de roleArn es el Rol de IAM del grupo de usuarios que tiene acceso a MongoDB Atlas. En la consola de IAM de AWS, la cuenta de IAM que ejecuta Kafka Connect tiene permisos AssumeRole para el Grupo de Usuarios de Atlas.