Correctly exploiting causal consistency and snapshot reads

I am trying to implement the following functionality in an app but I struggle in getting all the pieces together:

I want to implement a migration-like process consisting in executing a set of transactions.
Dependencies between these transactions is modeled by a directed acyclic graph (DAG): each transaction should read the writes of preceding transactions in the DAG. Some transactions consist in scanning an entire collection, sometimes updating documents inside that collection, but the scan should behave as if those updates are not happening. Some non-scan reads may happen during this transaction and those should read the updates. Note that all reads including scan reads must read updates of preceding transactions. Some transactions may happen in parallel because they do not act on the same parts of the database. Hence, if I want the entire process to execute swiftly, I cannot just linearize the DAG and use one big transaction. Here a set of questions I could not quite find the answer to:

  • From “Tunable Consistency in MongoDB - YouTube” and the Mongo.startSession docs I get that causalConsistency is true by default for sessions. Is this correct?
  • From “Path to Transactions - Local Snapshot Reads” I get that {readConcern: {level: 'snapshot'}} should be used for the find operation of the collection scan.
  • From https://docs.mongodb.com/v4.4/core/read-isolation-consistency-recency/#std-label-causal-consistency it seems only majority reads guarantee causal consistency. Is a snapshot read also a majority read? Does that mean snapshot reads succeeding writes see those writes? In that case, the only difference between multi-document snapshot and majority reads is that majority reads can see writes occurring after the cursor initialization? Since the default writeConcern is {w: 1} and the default readConcern is local, does that mean we have to specify majority as default writeConcern and readConcern for each session?
  • The second code sample at https://docs.mongodb.com/v4.4/core/read-isolation-consistency-recency/#examples shows how to guarantee causal consistency across sessions. Is this pattern necessary and sufficient in my use case (each transaction advances its session time to at least the completion time of each of its preceding transaction’s session)?
  • In addition to the initial snapshot collection scan, can all other multi-document reads use a snapshot readConcern while maintaining causal consistency? If that’s the case, I assume the only penalty is potential increased memory usage and execution time?

Could you help me sorting out these? I find the current documentation about this subject to be both terse and scattered.

Somebody has an idea on the subject?

Hi @Josh_Hamlet , welcome !

Since the documentation links that you provided are pointing to v4.4, I’d assume that you are using MongoDB v4.4

MongoDB v3.6+ enables causal consistency in client sessions. Client sessions only guarantee casual consistency for read operations with majority, and write operations with majority. In addition, please ensure that the application only have one thread at a time to execute those operations in the client session. Please see also Client Session and Causal Consistency Guarantees

Read concern snapshot is only available for transactions. If a transaction with read concern snapshot is part of a causally consistent session, upon transaction commit with write concern majority, the transaction operations are guaranteed to have read from a snapshot of majority-committed data that provides causal consistency with the operation immediately preceding the transaction start.

I’d recommend to be explicit about the level of write/read concern on sessions as intended.

It’s difficult to say without more details of the use case, but for that one sentence above it would be yes.

Please note that the code example you referred to, does not use transactions. Operations within a causally consistent session are not isolated from operations outside the session. If a concurrent write operation interleaves between the session’s write and read operations, the session’s read operation may return results that reflect a write operation that occurred after the session’s write operation.

Read concern snapshot is only available for multi document transactions (Certain read operations outside of multi-document transactions starting in MongoDB v5.0).

Distributed cluster-wide transaction could be complex, I hope the answers above help you.

It is quite challenging to answer some of these questions without having more context, as the answer may differ depending on the requirements and the deployment configuration. i.e. sharded cluster

If you have additional questions it would be helpful to provide a specific use case and concern/issue that you’re facing.

Regards,
Wan.

Thanks for your help. Unfortunately, I think I still do not understand how to achieve what I want. Maybe this pseudocode will help you understand part of what I want to achieve:

  1. There is an ordered sequence of transactions
await transaction1();
await transaction2();
await transaction3();
await transaction4();
  1. Each transactioni() looks like this
async function transaction3() {
  await wrapTransaction(async (session) => {
    for await (const x of scanCollectionAtSnapshotAfterTransaction2(session)) {
       await doCausallyConsistentReadsAndWrites3(session);
    }
  });
}

The missing pieces in this example is how to implement the subroutines so that the semantics of their names are respected:

  • The reads and writes inside wrapTransaction are atomic and isolated.
  • scanCollectionAtSnapshotAfterTransaction2 should not read any writes happening in calls to doCausallyConsistentReadsAndWrites3
  • doCausallyConsistentReadsAndWrites3 should read their writes and the writes of all previous calls of doCausallyConsistentReadsAndWrites3

Does this help you answer the question?

Hi @Josh_Hamlet ,

Thank you, the pseudo code helps better to elaborate your use case.

As long as all of the operations within wrapTransaction are within a single transaction, then yes. i.e. session.withTransaction()

You cannot have a transaction inside of a transaction (or in this case triple nested). If you have a transaction at wrapTransaction then every operations within it are in the same transaction.

Since the scanCollectionAtSnapshotAfterTransaction2 is a loop within the same transaction as doCausallyConsistentReadsAndWrites3 , then this this will read any writes happening (interleaving). Essentially reading your own writes.

Also, keep in mind that if the loop is a long running loop, by default, a transaction must have a runtime of less than one minute.

However you can modify this limit using transactionLifetimeLimitSeconds for the mongod instances. Although this may increase the cache pressure further.

Regards,
Wan.

1 Like

Thanks for you answer @wan. Unfortunately I think it is still not clear if it is possible to achieve what I described. Here are some additional details.

I do indeed intend to use session.withTransaction() as part of wrapTransaction. What is not so clear to me is what options to pass to session.withTransaction() and database CRUD operations (other than the session object) to have the code I sent you behave as I have described.

I do not need nested transactions. Whenever I compose subroutines, I just pass the relevant transaction session object around.

I thought, see for instance this video, that it is possible to pass {readConcern: {level: 'snapshot'}} to individual CRUD operations.

Indeed, thanks for the information. I have not run into that problem yet though. And I would modify transactionLifetimeLimitSeconds if needed. I am interested into correctness at reasonable cost and I do not expect cache pressure to be a problem in my particular case.

With the hope of make more progress at answering the original question, here is what I have come up with so far, could you tell me where it would fail to achieve what I intend it to do?

const wrapTransaction = async ({
	client,
	transactionOptions,
	sessionOptions,
	transaction,
}) => {
	// NOTE causalConsistency: true is the default but better be explicit
	const session = client.startSession({
		causalConsistency: true,
		...sessionOptions,
	});
	let result;
	try {
		await session.withTransaction(async () => {
			result = await transaction(session);
		}, transactionOptions);
	} catch (error) {
		const message = error instanceof Error ? error.message : 'unknown error';
		console.debug(message);
		console.debug({error});
		throw new Error('Database Transaction Failed', message);
	} finally {
		// NOTE No need to await this Promise, this is just used to free-up
		// resources.
		session.endSession();
	}

	return result;
};

const forEachAsync = async ({client, collection, selector, cb}) =>
	wrapTransaction({
		client,
		sessionOptions: undefined,
		transactionOptions: undefined,
		transaction: async (session) => {
			// NOTE This needs to read from a snapshot, it should not read writes
			// happening in `cb(session, item)`
			const cursor = collection.find(selector, {
				session,
				readConcern: {level: 'snapshot'}
			}).hint({$natural: 1});
			for (;;) {
				const item = await cursor.next();
				if (item === null) break;
				await cb(session, item);
			}
		}
	});

// NOTE example usage, for illustration only, do not try to simplify in order
// to circumvent the problem: replaces each item counter with the sum of other
// items counter
await forEachAsync({
	client: SomeClient,
	collection: SomeCOllection,
	selector: {}, // NOTE Everything
	cb: async (session, {_id}) => {
		SomeCOllection.deleteOne({_id}, {session});
		// NOTE this should read the most up to date value for counter
		const sameKeyButNotSelf = await SomeCOllection.find({}, {session}).toArray();
		const total = sameKeyButNotSelf.reduce((
			previous, {counter}
		) => previous + counter, 0);
		// NOTE this new item should not be read by the `find` cursor loop of
		// `forEachAsync` but should be read by the `find` of succeeding calls
		// to `cb` inside that loop.
		await SomeCOllection.insert({counter: total}, {session});
	},
});

Kind regards,
Josh

Hi @Josh_Hamlet,

As long as you only do one transaction per session, that should be correct.

Operations in a transaction use the transaction-level read concern. You can set the transaction-level read concern at the start of the transaction. If left unset, it would default to the session-level read concern.

I don’t think the video above mentioned that you can specify read concern per individual operation within a transaction.

As you are looping through the cursor (which you can’t specify the read concern at that level), you will read your own writes.

Regards,
Wan.

1 Like

Hi @wan,

So the example code does not work, how could it be modified so that it behaves like I want it to behave?

Perhaps using two sessions in parallel? One for the loop, and one for operations inside the loop?

Kind regards,
Josh

Hi @Josh_Hamlet,

Let’s go back to the original question.

I want to implement a migration-like process consisting in executing a set of transactions.

Depending on the use case (how big is the data, other write operations, etc), if you would like to perform some sort of migration, would you be able to migrate to another collection instead ? i.e. read from collection A, insert/update in collection B

Regards,
Wan.