El pipeline de agregación es un marco para la agregación de datos, modelado en el concepto de pipelines de procesamiento de datos.
Para aprender más sobre la agregación, consulte Pipeline de agregación en el manual del servidor.
Requisitos previos
Debe configurar los siguientes componentes para ejecutar los ejemplos de código en esta guía:
A
test.restaurantscolección poblada con documentos del archivorestaurants.jsonen la documentación activos GitHub.Las siguientes instrucciones de importación:
import com.mongodb.reactivestreams.client.MongoClients; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Accumulators; import com.mongodb.client.model.Projections; import com.mongodb.client.model.Filters; import org.bson.Document;
Importante
Esta guía utiliza implementaciones personalizadas de Subscriber, que se describen en la Guía Muestra de implementaciones personalizadas de suscriptor.
Conecta a una implementación de MongoDB
En primer lugar, conéctate a una implementación de MongoDB y, a continuación, declara y define las instancias MongoDatabase y MongoCollection.
El código siguiente se conecta a una implementación autónoma de MongoDB que se ejecuta en localhost en el puerto 27017. Luego, define la variable database para referirse a la base de datos test y la variable collection para referirse a la colección restaurants:
MongoClient mongoClient = MongoClients.create(); MongoDatabase database = mongoClient.getDatabase("test"); MongoCollection<Document> collection = database.getCollection("restaurants");
Para aprender más sobre cómo conectar implementaciones de MongoDB, consulta el tutorial Conectar a MongoDB.
Realizar agregación
Para realizar la agregación, pasa una lista de etapas de agregación al método MongoCollection.aggregate(). El driver proporciona la clase asistente Aggregates que contiene desarrolladores para etapas de agregación.
En este ejemplo, la pipeline de agregación realiza las siguientes tareas:
Utiliza una etapa
$matchpara filtrar los documentos en los que el campo de arreglocategoriescontiene el elemento"Bakery". El ejemplo utilizaAggregates.match()para construir la etapa$match.
Utiliza una etapa
$grouppara agrupar los documentos coincidentes por el campostars, acumulando un conteo de documentos para cada valor distinto destars. El ejemplo utilizaAggregates.group()para compilar la etapa$groupyAccumulators.sum()para compilar la expresión acumuladora. Para las expresiones de acumulador que se utilizarán dentro de la etapa$group, el driver proporciona la clase asistenteAccumulators.
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)) ) ).subscribe(new PrintDocumentSubscriber());
Utiliza expresiones de agregación
Para las expresiones de acumulador $group, el driver proporciona la clase asistente Accumulators. Para otras expresiones de agregación, se puede compilar manualmente la expresión utilizando la clase Document.
En el siguiente ejemplo, el pipeline de agregación utiliza una etapa $project para devolver solo el campo name y el campo calculado firstCategory, cuyo valor es el primer elemento en el arreglo categories. El ejemplo usa Aggregates.project() y varios métodos de clase Projections para compilar la etapa $project:
collection.aggregate( Arrays.asList( Aggregates.project( Projections.fields( Projections.excludeId(), Projections.include("name"), Projections.computed( "firstCategory", new Document("$arrayElemAt", Arrays.asList("$categories", 0)) ) ) ) ) ).subscribe(new PrintDocumentSubscriber());
Explica una agregación
Para $explain una pipeline de agregación, llamar el método AggregatePublisher.explain():
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)))) .explain() .subscribe(new PrintDocumentSubscriber());
MongoDB búsqueda
Puedes realizar una consulta de búsqueda de MongoDB creando y ejecutando un pipeline de agregación que contenga una de las siguientes etapas de pipeline:
$search$searchMeta
El controlador de Java Reactive Streams proporciona las funciones Aggregates.search() y Aggregates.searchMeta() métodos para realizar consultas de MongoDB Search.
Para aprender más sobre las etapas del pipeline de agregación de MongoDB Search, consulta Choose the Aggregation Pipeline Stage en la documentación de Atlas.
Nota
Atlas y requisitos de versión de Community Edition
El operador $search pipeline de agregación está disponible únicamente para colecciones alojadas en clústeres de MongoDB Atlas que ejecutan MongoDB v4.2 o posterior, o en clústeres de MongoDB Community Edition que ejecutan MongoDB v8.2 o posterior. Las colecciones deben estar cubiertas por un índice de búsqueda MongoDB. Para obtener más información sobre la configuración necesaria y la funcionalidad de este operador, consulta la documentación de MongoDB Search.
Crear etapas de búsqueda en pipeline
Puedes crear los criterios de búsqueda en la etapa de tu pipeline de MongoDB Search (Búsqueda de MongoDB) utilizando los operadores de búsqueda.
El driver de Java Reactive Streams proporciona métodos asistente para los siguientes operadores:
Operador | Descripción |
|---|---|
Realiza una búsqueda de una palabra o frase que contenga una secuencia de caracteres de una string de entrada incompleta. | |
Combina dos o más operadores en una única query. | |
Verifica si un campo coincide con un valor que especificaste. Se asigna a los métodos | |
Verifica si existe una ruta hacia un nombre de campo indexado especificado en un documento. | |
Realiza una búsqueda de un arreglo de valores de número, fecha, booleano, objectId, uuid o string BSON en la ruta determinada y retorna documentos donde el valor del campo coincide con cualquier valor en el arreglo especificado. | |
Devuelve documentos similares a los documentos de entrada. | |
Permite consultar y puntuar valores numéricos, fechas y valores de puntos GeoJSON. | |
Realiza una búsqueda de documentos que contienen una secuencia ordenada de términos utilizando el analizador especificado en la configuración del índice. | |
Brinda soporte a las queries de una combinación de campos indexados y valores. | |
Permite la consulta y la calificación de valores numéricos, de fecha y de string. Se asigna a los métodos | |
Interpreta el campo de query como una expresión regular. | |
Realiza una búsqueda de texto completo utilizando el analizador que se especifique en la configuración del índice. | |
Permite consultas que utilizan caracteres especiales en la string de búsqueda que pueden coincidir con cualquier carácter. |
Ejemplo de etapa de búsqueda en el pipeline
Antes de que puedas ejecutar este ejemplo, debes crear un índice de búsqueda de MongoDB en la colección movies que tenga la siguiente definición:
{ "mappings": { "dynamic": true, "fields": { "title": { "analyzer": "lucene.keyword", "type": "string" }, "genres": { "normalizer": "lowercase", "type": "token" } } } }
Para obtener más información sobre cómo crear índices de Búsqueda de MongoDB, consulte la sección Gestión del índice de búsqueda de MongoDB de la guía de índices.
El siguiente código crea un escenario $search que tiene las siguientes especificaciones:
Comprueba que el arreglo
genresincluya"Comedy"Busca en el campo
fullplotla frase"new york"Coincide con
yearvalores entre1950y2000, ambos inclusiveBusca valores de
titleque comiencen con el término"Love"
Bson searchStageFilters = Aggregates.search( SearchOperator.compound() .filter( List.of( SearchOperator.in(fieldPath("genres"), List.of("Comedy")), SearchOperator.phrase(fieldPath("fullplot"), "new york"), SearchOperator.numberRange(fieldPath("year")).gtLt(1950, 2000), SearchOperator.wildcard(fieldPath("title"), "Love *") ))); Bson projection = Aggregates.project(Projections.fields( Projections.include("title", "year", "genres") )); List<Bson> aggregateStages = List.of(searchStageFilters, projection); Publisher<Document> publisher = movies.aggregate(aggregateStages); publisher.subscribe(new SubscriberHelpers.PrintDocumentSubscriber()); Mono.from(publisher).block();
{"_id": ..., "genres": ["Comedy", "Romance"], "title": "Love at First Bite", "year": 1979} {"_id": ..., "genres": ["Comedy", "Drama"], "title": "Love Affair", "year": 1994}
Para obtener más información sobre los métodos asistente de MongoDB Search, consulta la referencia de la interfaz SearchOperator en la documentación de Core API del driver.
Información Adicional
Para obtener una lista completa de las etapas de agregación, consulta Etapas de agregación en el manual de MongoDB Server.
Para aprender sobre cómo ensamblar una pipeline de agregación y ver ejemplos, consulte Pipeline de agregación en el manual del MongoDB Server.
Para aprender más sobre cómo explicar las operaciones de MongoDB, consulta Salida de explicación y Planes de consultas en el manual de MongoDB Server.
Documentación de la API
Para obtener más información sobre las clases y métodos mencionados en esta guía, consulta la siguiente documentación API: