Hi I need to test change streams on Cosmos db. I have this code
private Flux<ChangeStreamEvent<T>> startChangeStream(DbChangeOffset offset) {
List<String> operationTypeFilter = List.of("insert", "update", "replace");
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("operationType").in(operationTypeFilter)),
Aggregation.project("_id", "fullDocument", "ns", "documentKey"));
var optionsBuilder = ChangeStreamOptions.builder()
.filter(aggregation)
.fullDocumentLookup(FullDocument.UPDATE_LOOKUP);
if (offset.getResumeToken() == null) {
optionsBuilder.resumeAt(bsonTimestamp(offset));
} else {
optionsBuilder.startAfter(new BsonDocument().append("_data", new BsonString(offset.getResumeToken())));
}
return mongoTemplate.changeStream(collectionName, optionsBuilder.build(), collectionType)
.doOnNext(event -> log.debug("Raw change stream event: {}", event.getRaw()))
.doOnNext(event -> {
if (event.getRaw().getOperationType() == null) {
log.warn("Missing operationType in ChangeStreamEvent: {}", event);
} else {
log.debug("Processed ChangeStreamEvent with operationType={}", event.getRaw().getOperationType());
}
})
.doOnError(e -> log.error("Error in change stream for collection: {}", collectionName, e));
}
``` But I am getting this error Missing operationType in ChangeStreamEvent: ChangeStreamEvent {raw=ChangeStreamDocument{ operationType=null, resumeToken={"_data": {"$binary":etc.. Does someon know why my operationType is null?