Multi-document transaction migration falls over in production

I’m using migrate-mongo to run migrations against our Mongo Cloud Atlas database. It is a simple wrapper that allows us to roll forward and back changes and makes it easy to wrap these updates in transactions. A migration typically looks like this:

module.exports = {
  async up(db, client) {
    const session = client.startSession();

    try {
      await session.withTransaction(async () => {
        db.collection('solutions').updateMany({ ...filter }, { ...changes }, { session });
      })
    } finally {
      session.endSession();
    }
  }

  async down(db, client) {
    // roll back changes in `up` method
  }
}

In all cases we use transactions so that if a migration fails for whatever reason I don’t have to rebuild the database from a backup which is fairly time consuming and results in undesirable downtime.

I want to embed a currently referenced resource in a collection, and am struggling to do this with a transaction. We have a collection of solutions which each have an array of report IDs. I’d like to update this so that each solution has a single embedded report object, and then drop the reports collection.

This was my first naive attempt:

const session = client.startSession();

try {
  await session.withTransaction(async () => {
    const solutions = await db.collection('solutions').aggregate([
      { $addFields: { report: { $last: '$reports' } } },
      {
        $lookup: {
          from: 'reports',
          let: { reportId: '$report' },
          pipeline: [
            { $match: { $expr: { $eq: ['$_id', '$$reportId'] } } },
            { $project: { _id: 0 } },
          ],
          as: 'report',
        },
      },
      { $unwind: '$report' },
      { $project: { reports: 0 } },
    ]).toArray();

    const changes = solutions.map((solution) => {
      return {
        updateOne: { 
          filter: { _id: solution._id },
          update: { $set: { report: solution.report }, $unset: { reports: true } }
        }
      }
    })

    await db.collection('solutions').blukWrite(changes, { session });
  });
} finally {
  await db.collection('reports').drop();
  session.endSession();
}

This runs fine on a smaller dataset, but on our production database with ~180k solution records it falls over, due to lack of memory or the following error:

WriteConflict error: this operation conflicted with another operation. Please retry your operation or multi-document transaction

Copying records into memory and updating an entire collection with blukWrite seems like a bad idea because there’s always a potential to hit a memory issue, and you can’t be sure how many records the collection has grown to by the time you run the migration.

This was my second attempt:

const session = client.startSession();
const MAX_RECORDS = 10000;
let i = 0;

try {
  await session.withTransaction(async () => {
    while(lastCount === MAX_RECORDS) {
      const solutions = await db.collection('solutions').aggregate([
        { $limit: MAX_RECORDS },
        { $skip: MAX_RECORDS * i },
        // ... rest of pipeline
      ]).toArray();

      const changes = solutions.map((solution) => {
        // ... removed for brevity
      })

      await db.collection('solutions').blukWrite(changes, { session });

      lastCount = solutions.length;
      i++;
    }
  });
} finally {
  await db.collection('reports').drop();
  session.endSession();
}

My hope here being that we could run the migration in chunks, but after some time it didn’t seem like the migration was running. I added logging, but nothing was printing to the console, so I abandoned this approach.

Finally I decided to use a $merge stage and write an atomic aggregate pipeline:

const solutions = await db.collection('solutions').aggregate([
  // ... existing pipeline
  {
    $merge: {
      into: 'solutions',
      on: '_id',
      whenMatched: 'replace',
      whenNotMatched: 'discard',
    },
  }
])

await db.collection('reports').drop();

This works as expected and I’ve tested it against a full dataset on a staging database. However transactions cannot be used with aggregations that use a $merge stage. Presumably because the data is being overwritten in-place using a write stream or something.

So my questions are as follows:

  1. Is there an approach to embedding records from another collection that can be done using a transaction on 180k+ records?
  2. Is loading large numbers of records into memory during a migration considered bad practice?
  3. Is there a preferred method to chunking updates / migrations when using a transaction?
  4. Am I overly worrying about running a $merge aggregation on a production database, given that a) there will be a backup available and b) we’ll be in maintenance mode when the migration is run?

Thanks for your time!