Overview
Puede agregar un proveedor de autenticación personalizado implementando el
com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider
Interfaz. Debe colocar su archivo JAR de clase personalizado en la carpeta lib en su implementación de Kafka Connect.
Establezca las siguientes propiedades de autenticación para configurar el proveedor de autenticación:
mongo.custom.auth.mechanism.enable: establecido entruemongo.custom.auth.mechanism.providerClassconfigurado con el nombre de clase calificado de la clase de implementación(Opcional)
mongodbaws.auth.mechanism.roleArn: establecido en un nombre de recurso de Amazon (ARN)
Ejemplo de autenticación de AWS IAM
Este ejemplo proporciona un proveedor de autenticación personalizado que es compatible con AWS IAM. El siguiente código muestra el archivo JAR del proveedor de autenticación personalizado:
package com.mongodb; import java.util.Map; import java.util.function.Supplier; import com.mongodb.kafka.connect.util.custom.credentials.CustomCredentialProvider; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.securitytoken.AWSSecurityTokenService; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceAsyncClientBuilder; import com.amazonaws.services.securitytoken.model.AssumeRoleRequest; import com.amazonaws.services.securitytoken.model.AssumeRoleResult; import com.amazonaws.services.securitytoken.model.Credentials; import com.amazonaws.util.StringUtils; public class SampleAssumeRoleCredential implements CustomCredentialProvider { public SampleAssumeRoleCredential() {} public MongoCredential getCustomCredential(Map<?, ?> map) { AWSCredentialsProvider provider = new DefaultAWSCredentialsProviderChain(); Supplier<AwsCredential> awsFreshCredentialSupplier = () -> { AWSSecurityTokenService stsClient = AWSSecurityTokenServiceAsyncClientBuilder.standard() .withCredentials(provider) .withRegion("us-east-1") .build(); AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withDurationSeconds(3600) .withRoleArn((String)map.get("mongodbaws.auth.mechanism.roleArn")) .withRoleSessionName("Test_Session"); AssumeRoleResult assumeRoleResult = stsClient.assumeRole(assumeRoleRequest); Credentials creds = assumeRoleResult.getCredentials(); // Add your code to fetch new credentials return new AwsCredential(creds.getAccessKeyId(), creds.getSecretAccessKey(), creds.getSessionToken()); }; return MongoCredential.createAwsCredential(null, null) .withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier); } // Validates presence of an ARN public void validate(Map<?, ?> map) { String roleArn = (String) map.get("mongodbaws.auth.mechanism.roleArn"); if (StringUtils.isNullOrEmpty(roleArn)) { throw new RuntimeException("Invalid value set for customProperty"); } } // Initializes the custom provider public void init(Map<?, ?> map) { } }
Compila el archivo JAR y colócalo en la carpeta lib de tu implementación.
Nota
Para ver un ejemplo de un archivo pom.xml que puede compilar el JAR completo que contiene la clase de implementación, consulta el Archivo README del repositorio de GitHub del conector de Kafka.
A continuación, siguiente, configura tu conector de origen o destino para incluir el método de autenticación personalizado. Las siguientes propiedades de configuración definen un conector de sink que conecta el Kafka Connector a MongoDB Atlas mediante la autenticación AWS IAM:
{ "name": "mongo-tutorial-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "<topic>", "connection.uri": "<connection string>?authSource=%24external&authMechanism=MONGODB-AWS&retryWrites=true&w=majority", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "database": "<db>", "collection": "<collection>", "mongo.custom.auth.mechanism.enable": "true", "mongo.custom.auth.mechanism.providerClass": "com.mongodb.SampleAssumeRoleCredential", "mongodbaws.auth.mechanism.roleArn": "<AWS IAM ARN>" } }
En este ejemplo, el valor de roleArn es el Rol de IAM del grupo de usuarios que tiene acceso a MongoDB Atlas. En la consola de IAM de AWS, la cuenta de IAM que ejecuta Kafka Connect tiene permisos AssumeRole para el Grupo de Usuarios de Atlas.