Is this a sound way to construct a "live" query?

I’m trying to implement the so-called transactional outbox pattern using MongoDB as the underlying data store.
I have a _outbox collection where messages that are supposed to be sent to a message broker (e.g. Kafka/RabbitMQ) are first stored; I then “watch” any inserts into this collection and in a background process and do the sending of the messages.

Here’s the solution I’ve come up with (C#):

On startup, I basically read any pending messages in the outbox collection and send those, and then begin watching for inserts in real-time. I’m wondering if my implementation is robust enough, am I missing anything?

public class OutboxMessagePublisher(
	IMongoDatabase db,
	IMessageBroker broker,
	IClock clock,
	ILogger<OutboxMessagePublisher> logger
) : BackgroundService
{
	private const int BatchSize = 100;

	protected override async Task ExecuteAsync(CancellationToken ct)
	{
		var outboxCollection = db.GetCollection<MessageEnvelope>(OutboxCollectionName);
		await foreach (var batchOfMessages in outboxCollection.LiveQuery(BatchSize, clock, ct))
		{
			logger.LogInformation("Retrieved {Count} messages from the outbox", batchOfMessages.Count);
			await broker.Publish(batchOfMessages);
			logger.LogInformation("Published {Count} messages from the outbox to the broker", batchOfMessages.Count);
			var publishedMessageIds = batchOfMessages.Select(m => m.Id).ToList();
			await outboxCollection.DeleteManyAsync(m => publishedMessageIds.Contains(m.Id), ct);
			logger.LogInformation("Cleaned up {Count} published messages from the outbox", batchOfMessages.Count);
		}
	}
}

public static class MongoExtensions
{
	public static async IAsyncEnumerable<IReadOnlyList<T>> LiveQuery<T>(
		this IMongoCollection<T> collection,
		int batchSize,
		IClock clock,
		[EnumeratorCancellation] CancellationToken ct
	)
	{
		Instant lastRetrieval;
		// NOTE: Without an explicit sort clause specified, MongoDB returns documents based on what's called "natural order", which is the order in which the documents are stored on disk, this often happens to be equivalent to insertion order, but in the case clustered collections (assuming that the `_id` is time-sortable), this order will basically be guaranteed to be the same as insertion order. See https://www.mongodb.com/docs/manual/reference/glossary/#std-term-natural-order in conjunction with https://www.mongodb.com/docs/manual/core/clustered-collections/#:~:text=The%20clustered%20index%20specifies%20the%20order%20in%20which%20documents%20are%20stored.
		// todo: does this keep working if the documents are deleted?
		using (var cursor = await collection.FindAsync(
			_ => true,
			new() { BatchSize = batchSize },
			ct
		))
		{
			while (await MoveNextAsync())
			{
				var batch = cursor.Current.ToList();
				if (batch is not []) // NOTE: If the collection is empty, we get a single empty batch, which we don't need to yield return.
					yield return batch;
			}

			// NOTE: There is a window of time between when the last batch of existing documents are retrieved (and subsequently processed by the caller) and when the change stream is begun; documents inserted within that window of time would be missed.
			Task<bool> MoveNextAsync()
			{
				lastRetrieval = clock.GetCurrentInstant();
				return cursor.MoveNextAsync(ct);
			}
		}

		var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<T>>()
			// NOTE: We can't do `change.FullDocument.CreatedAt > existingMessages.Last().CreatedAt` here to get potentially "past" data because by default change stream only consider things that happen after they're created.
			.Match(change =>
				change.OperationType == ChangeStreamOperationType.Insert
			);
		using var watchCursor = await collection.WatchAsync(pipeline, new()
		{
			StartAtOperationTime = lastRetrieval.ToBsonTimestamp(),
			BatchSize = batchSize, // NOTE: See https://mongodb.github.io/mongo-csharp-driver/2.18/reference/driver/change_streams/#:~:text=BatchSize%20determines%20the%20maximum%20number%20of%20change%20events%20the%20server%20will%20return%20at%20one%20time.%20The%20server%20might%20return%20fewer.
		}, ct);
		while (await watchCursor.MoveNextAsync(ct))
		{
			var batch = watchCursor.Current.Select(c => c.FullDocument).ToList();
			// NOTE: For some reason, the task that `MoveNextAsync` returns resolves periodically (but with an empty `watchCursor.Current`); we need to skip those iterations.
			if (batch is not [])
				yield return batch;
		}
	}
}