Assigning sequential bucket id after $autoBucket in pipeline

Hi All, below is my scenario
I have collection with 4 million documents.
I am using $autoBucket to divide documents equally in 20 buckets in pipeline and pushing them to new collection using OUT stage.
I need to do further processing on each these documents and I am planning to assign one java instance to process documents from each of those 20 buckets so that means I have 20 instances of java app running in cloud. I need help to assign sequantial bucket id like 1,2,3,…20 so that each java process would know its assigned bucket.

please suggest

Hi @Madhav_Lonari and welcome to MongoDB community forums!!

Based on the above, i I understand correctly, you are looking to assign a new field in sequential format to the out collection created.
Since there is no direct way or MongoDB aggregation operator, assuming that the main collection and bucker collection looks like:

Atlas atlas-xp4gev-shard-0 [primary] test> db.bucketID.find()
[
  { _id: ObjectId("656ddec4b279b09ce724ef0e"), id: 0 },
  { _id: ObjectId("656dded602a3648fa14f3f87"), id: 2 },
  { _id: ObjectId("656dded602a3648fa14f3f88"), id: 1 },
  { _id: ObjectId("656dded602a3648fa14f3f89"), id: 4 },
  { _id: ObjectId("656dded602a3648fa14f3f8a"), id: 5 },
  { _id: ObjectId("656dded602a3648fa14f3f8b"), id: 0 },
  { _id: ObjectId("656dded602a3648fa14f3f8c"), id: 3 },
  { _id: ObjectId("656dded602a3648fa14f3f8d"), id: 8 },
  { _id: ObjectId("656dded602a3648fa14f3f8e"), id: 14 },
  { _id: ObjectId("656dded602a3648fa14f3f8f"), id: 15 },
.....
]
and the bucket output is:

{"_id": {"min": 0, "max": 49}, "count": 50}
{"_id": {"min": 49, "max": 99}, "count": 50}
{"_id": {"min": 99, "max": 149}, "count": 50}
{"_id": {"min": 149, "max": 199}, "count": 50}
{"_id": {"min": 199, "max": 249}, "count": 50}
{"_id": {"min": 249, "max": 299}, "count": 50}
{"_id": {"min": 299, "max": 349}, "count": 50}
{"_id": {"min": 349, "max": 399}, "count": 50}
{"_id": {"min": 399, "max": 449}, "count": 50}
{"_id": {"min": 449, "max": 499}, "count": 51}

You can try the below code at the application end to assign the sequenceID to the buckets created:


package com.tour;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;
import org.bson.Document;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

import static com.mongodb.client.model.Filters.eq;
import static com.mongodb.client.model.Filters.exists;
import static com.mongodb.client.model.Projections.excludeId;
import static com.mongodb.client.model.Sorts.descending;
import static java.util.Arrays.asList;
import org.bson.Document;


import com.mongodb.client.AggregateIterable;
import org.bson.Document;
import com.mongodb.client.MongoCollection;
import java.util.Arrays;


@SuppressWarnings("ConstantConditions")
public class QuickTour {

    public static void main(final String[] args) {

        Logger root = (Logger) LoggerFactory.getLogger("org.mongodb.driver");
        root.setLevel(Level.OFF);

        String mongoURI = "mongodb+srv://test:test@sandbox.bl9vn.mongodb.net/?retryWrites=true&w=majority";
        MongoClient mongoClient = MongoClients.create(mongoURI);

        MongoDatabase database = mongoClient.getDatabase("test");
        MongoCollection<Document> collection = database.getCollection("bucketID");
      
      AggregateIterable<Document> result = collection.aggregate(Arrays.asList(
                new Document("$bucketAuto",
                        new Document("groupBy", "$id")
                                .append("buckets", 10L)
                                   .append("granularity", "R20")),
                new Document("$out", "bucketsCreated")));

      
      
      MongoCollection<Document> collectionNew = database.getCollection("bucketsCreated");

        // Iterate over each document and add a sequenceID field
        int sequenceID = 1;
        try (MongoCursor<Document> cursornew = collectionNew.find().iterator()) {
            while (cursornew.hasNext()) {
                Document doc = cursornew.next();
                // Add sequenceID to the document
                doc.put("sequenceID", sequenceID);

                // Update the document in the collection
                collectionNew.updateOne(
                        new Document("_id", doc.get("_id")),
                        new Document("$set", new Document("sequenceID", sequenceID))
                );

                // Print the updated document
                System.out.println(doc.toJson());

                // Increment the sequenceID
                sequenceID++;
            }
        }
    }
}

which gives the output as:

{"_id": {"min": 0, "max": 49}, "count": 50, "sequenceID": 1}
{"_id": {"min": 49, "max": 99}, "count": 50, "sequenceID": 2}
{"_id": {"min": 99, "max": 149}, "count": 50, "sequenceID": 3}
{"_id": {"min": 149, "max": 199}, "count": 50, "sequenceID": 4}
{"_id": {"min": 199, "max": 249}, "count": 50, "sequenceID": 5}
{"_id": {"min": 249, "max": 299}, "count": 50, "sequenceID": 6}
{"_id": {"min": 299, "max": 349}, "count": 50, "sequenceID": 7}
{"_id": {"min": 349, "max": 399}, "count": 50, "sequenceID": 8}
{"_id": {"min": 399, "max": 449}, "count": 50, "sequenceID": 9}
{"_id": {"min": 449, "max": 499}, "count": 51, "sequenceID": 10}

Let us know if the above code fulfils the requirement. If not, could you help me with a few sample documents and the aggregation pipeline that you are currently using?

P.S. The above has been tested based on the assumption of a few sample documents, hence the recommendation would be to test on the staging before implementing on the production environment.

Warm Regards
Aasawari

Thanks for reply Asavari, I was trying through pipeline, but this helps.

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.