MongoSpark Connector Aggregation (groupby) Issue


We are facing one issue in while using aggregation query with MongoSpark connector.

We are using groupby on certain fields while reading data from MongoDB (via MongoSpark API) in our Spark Job but we discovered that MongoSpatk connector is partitioning the data before executing the groupby query. This leads to incorrect result.

For eg, in below query, we expected the aggregation query to run on complete data in collection (based upon customer_id field) and then partitioned. However, the actual output shows groupby query ran on already partitioned data.

    	Map<String, String> readOverrides = new HashMap<String, String>();

		readOverrides.put("database", "mydb");
		readOverrides.put("collection", "myCollection");
		readOverrides.put("", "secondaryPreferred");
Document groupFields = new Document("customer_id", "$customer_id");
		JavaMongoRDD<Document> myRDD = MongoSpark.load(jsc, readConfig);
		Dataset<Row> dsr = myRDD
				.withPipeline(Arrays.asList(new Document("$match", new Document("date", date)),
						new Document("$group", new Document("_id", "$customer_id").append("field1", new Document("$first","$field1")).
								.append("total", new Document("$sum","$field2"))
						, new Document("$skip",0)
						, new Document("$limit", 1000)

We are using these versions - Spark v2.4.6 MongoSpark Connector v2.4.2.

Looking for any suggestions regarding this. Thanks