MongoSpark not passing on aggregation pipeline

We are using MongoSpark 2.4.1 to query data from a large collection (~30M documents). We have set indexes in specific places to make querying a lot faster and I am now trying to get MongoSpark to use those indexes. The indexes are sparse indexes on fields that exist or not (the one in particular I need has about 75k documents that have the field). The naive way we tried first was to do something like:


This however results in MongoSpark fetching ALL documents and then filtering them in Spark. I read that this could be due to it not pushing down predicates on nested fields and instead I should use pipeline/withPipeline. I tried doing .pipeline on a dataframe and .withPipeline on a MongoRDD but in both cases, MongoSpark still scans the entire collection (I set the log level to DEBUG to see the actual commands sent to Mongo).

What I tried:
val pipeline = new BsonDocument()
pipeline.put(“indexed.field”, new BsonDocument("$exists", new BsonBoolean(true)))

val profiles = MongoSpark
  .pipeline(Seq(new BsonDocument("$match", pipeline)))

as well as:

val rdd = MongoSpark.load(spark.sparkContext)
val query = Seq(Document.parse("{ $match: { \"indexed.field\" : { $exists : true } } }"))
val aggregatedRdd = rdd.withPipeline(query)

But in both cases, when having log level set to DEBUG, I see (truncated for brevity).:

20/07/23 08:53:43 DEBUG command: Sending command '{"aggregate": "profile", "pipeline": [{"$match": {}}, {"$sample": {"size": 46866}}, {"$project": {"_id": 1}}, {"$sort": {"_id": 1}}], "cursor": {}, "allowDiskUse": true, "$db":........

How can I force MongoSpark to my query? (Which would be: [{$match:{"indexed.field":{$exists:true}}}])