O pipeline de agregação é uma estrutura para agregação de dados, modelada sobre o conceito de pipelines de processamento de dados.
Para saber mais sobre agregação, consulte Aggregation Pipeline no manual do servidor.
Pré-requisitos
Você deve configurar os seguintes componentes para executar os exemplos de código neste guia:
Uma coleção
test.restaurantspreenchida com documentos do arquivorestaurants.jsonnos ativos de documentação do Github.As seguintes declarações de importação:
import org.mongodb.scala._ import org.mongodb.scala.model.Aggregates._ import org.mongodb.scala.model.Accumulators._ import org.mongodb.scala.model.Filters._ import org.mongodb.scala.model.Projections._ 
Observação
Este guia usa as implicações do Observable como abordadas no Quick Start Primary.
Conecte-se a um MongoDB deployment
Primeiro, conecte a um MongoDB deployment e, em seguida, declare e defina as instâncias MongoDatabase e MongoCollection .
O código a seguir se conecta a uma MongoDB deployment standalone em execução em localhost na porta 27017. Em seguida, define a variável database para fazer referência ao banco de dados test e a variável collection para fazer referência à coleção restaurants :
val mongoClient: MongoClient = MongoClient() val database: MongoDatabase = mongoClient.getDatabase("test") val collection: MongoCollection[Document] = database.getCollection("restaurants") 
Para saber mais sobre como se conectar a sistemas do MongoDB, consulte o tutorial Conectar ao MongoDB .
Fazer aggregation
Para executar a agregação, passe uma lista de estágios de agregação para o método MongoCollection.aggregate() . O driver fornece a classe assistente Aggregates que contém construtores para estágios de agregação .
Neste exemplo, o aggregation pipeline executa as seguintes tarefas:
Usa um estágio
$matchpara filtrar documentos nos quais o campo de arraycategoriescontém o elemento"Bakery". O exemplo utilizaAggregates.filter()para construir o estágio$match.
Utiliza um estágio
$grouppara agrupar os documentos correspondentes pelo campostars, acumulando uma contagem de documentos para cada valor distinto destars. O exemplo utilizaAggregates.group()para construir o estágio$groupeAccumulators.sum()para construir a expressão acumulador . Para as expressões acumulador para uso no estágio$group, o driver forneceAccumulatorsclasse assistente .
collection.aggregate(Seq(     Aggregates.filter(Filters.equal("categories", "Bakery")),     Aggregates.group("$stars", Accumulators.sum("count", 1)) )).printResults() 
Usar expressões de agregação
Para expressões de acumulador do $group , o driver fornece a classe assistente do Accumulators . Para outras expressões de agregação , construa manualmente a expressão utilizando a classe Document .
No exemplo a seguir, o aggregation pipeline usa um estágio $project para retornar somente o campo name e o campo calculado firstCategory cujo valor é o primeiro elemento na array categories . O exemplo utiliza Aggregates.project() e vários métodos de classe Projections para construir o estágio $project :
collection.aggregate(   Seq(     Aggregates.project(       Projections.fields(         Projections.excludeId(),         Projections.include("name"),         Projections.computed(           "firstCategory",           Document("$arrayElemAt"-> Seq("$categories", 0))         )       )     )   ) ).printResults() 
Explicar uma agregação
Para $explain um pipeline de agregação , chame o método AggregatePublisher.explain() :
collection.aggregate(   Seq(Aggregates.filter(Filters.eq("categories", "Bakery")),       Aggregates.group("$stars", Accumulators.sum("count", 1))) ).explain().printResults()