MongoDB Server 3.6版本引入了 $changeStream聚合管道操作符。
变更流提供了一种监视集合中文档变更的方法。 为了提高此新阶段的可用性, MongoCollection类型包含watch()方法。 ChangeStreamObservable实例会设置变更流,并在遇到可能可恢复的错误时自动尝试恢复。
先决条件
您必须设置以下组件才能运行本指南中的代码示例:
test.restaurants集合,其中填充了文档资产restaurants.jsonGithub 中 文件中的文档。以下 import 语句:
import java.util.concurrent.CountDownLatch import org.mongodb.scala._ import org.mongodb.scala.model.Aggregates._ import org.mongodb.scala.model.Filters._ import org.mongodb.scala.model.changestream._ 
注意
本指南使用快速入门入门中所述的Observable隐式。
连接到 MongoDB 部署
首先,连接到 MongoDB 部署,然后声明并定义MongoDatabase和MongoCollection实例。
以下代码连接到在端口27017上的localhost上运行的独立 MongoDB 部署。 然后,定义database变量以引用test数据库,并collection变量以引用restaurants集合:
val mongoClient: MongoClient = MongoClient() val database: MongoDatabase = mongoClient.getDatabase("test") val collection: MongoCollection[Document] = database.getCollection("restaurants") 
要了解有关连接到 MongoDB 部署的更多信息,请参阅连接到 MongoDB教程。
观察集合中的所有更改
要创建变更流,请使用MongoCollection.watch()方法之一。
在以下示例中,变更流会打印出其观察到的所有更改:
case class LatchedObserver() extends Observer[ChangeStreamDocument[Document]] {   val latch = new CountDownLatch(1)   override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data   override def onNext(changeDocument: ChangeStreamDocument[Document]): Unit = println(changeDocument)   override def onError(throwable: Throwable): Unit = {       println(s"Error: '$throwable")       latch.countDown()   }   override def onComplete(): Unit = latch.countDown()   def await(): Unit = latch.await() } val observer = LatchedObserver() collection.watch().subscribe(observer) observer.await() // Block waiting for the latch 
监视数据库更改
应用程序可以打开单个变更流来监视数据库的所有非系统集合。 要创建此类变更流,请使用MongoDatabase.watch()方法之一。
在以下示例中,变更流会打印出它在给定数据库上观察到的所有变更:
val observer = LatchedObserver() database.watch().subscribe(observer) observer.await() // Block waiting for the latch 
监视所有数据库的更改
应用程序可以打开单个变更流来监视 MongoDB 部署中所有数据库的所有非系统集合。 要创建此类变更流,请使用MongoClient.watch()方法之一。
在以下示例中,变更流打印出它在MongoClient连接的部署中观察到的所有变更:
val observer = LatchedObserver() client.watch().subscribe(observer) observer.await() // Block waiting for the latch 
过滤内容
您可以向watch()方法传递聚合阶段列表,以修改$changeStream操作符返回的数据。
注意
并非所有聚合操作符都受支持。 请参阅Change Streams MongoDB Server手册中的 以了解更多信息。
在以下示例中,变更流会打印出其观察到的与insert 、 update 、 replace和delete操作相对应的所有更改。
首先,管道包括一个$match阶段,用于过滤operationType为insert 、 update 、 replace或delete的文档。 然后,它将fullDocument设置为FullDocument.UPDATE_LOOKUP ,以便更新后的文档包含在结果中:
val observer = LatchedObserver() collection.watch(Seq(Aggregates.filter(Filters.in("operationType", Seq("insert", "update", "replace", "delete")))))         .fullDocument(FullDocument.UPDATE_LOOKUP).subscribe(observer) observer.await() // Block waiting for the latch