Force-syncing realm and showing the progress meanwhile

Let me try to explain what we are trying to achieve first.
Whenever our users launch the application for the first time there are 2 possible cases

  1. They have previously used the app and have data in the Realm cloud
  2. They are logging into the system for the first time ever

If the user is logging in for the first time ever, we need to bootstrap their database with basic data which is required for our business application to work.
If they have been previously used the system, the basic data is there, and there’s no need to generate it.
I know, the ideal solution would have been to do this in the backend, but for now, we do not need such complication and we decided to do the bootstrapping on the client-side.

Thus, whenever we launch the app, we show a special bootstrap screen, which should first try to download the data that the user might have in the cloud. At this step we need to force-sync the data, before moving forward, otherwise, we risk duplicating the initial bootstrap data. Up until recent times we were doing it by utilizing the Realm.GetInstanceAsync(syncConfiguration) method, which will wait for all of the data to be synced if it’s used with SyncConfiguration.

However, now we need to also show the progress of the initial synchronization, cause that may take some time for large databases and we want to give some visibility to the user about how far the process went. Unfortunately, Realm.GetInstanceAsync can’t be used in that case and one would need to use the Realm.GetInstance method and then monitor for the progress as explained in this StackOverflow post

If we do so, now we will need to somehow be able to wait for the synchronization to complete without using the Realm.GetInstanceAsync method. The logical pick would have been the RefreshAsync method. According to the docs

Asynchronously wait for the Realm instance and outstanding objects to get updated
to point to the most recent persisted version.

The behaviour that I see when invoking this method, is that it completes immediately without downloading the user’s current cloud data. The doc is a bit misleading here, but I believe what happens is that this method only synchronizes the uplink changes and won’t download the existing data. If it was meant to actually do a full 2-way synchronization, then I guess I have found a bug that should be fixed.

I couldn’t find any other API that would do force 2-way synchronization on the background thread that wouldn’t block the progress from happening… I tried to somehow doing that using upload/download progress. Basically, I subscribe to the upload/download messages and wait until they both are complete.

var session = realm.GetSession();
await session.GetProgressObservable(ProgressDirection.Download, ProgressMode.ForCurrentlyOutstandingWork)
       .CombineLatest(session.GetProgressObservable(ProgressDirection.Upload, ProgressMode.ForCurrentlyOutstandingWork))
       .Where(progress => progress.Second.TransferredBytes >= progress.First.TransferableBytes)
       .FirstAsync();

However, this is brute force and works on the consequence of the action rather than the source.

So, what would be the best way to force-sync realm without blocking the progress notifications?

Hey, thanks for the detailed post. I’ll try to address a few points here:

  1. To get progress notifications you can set the OnProgress property on your sync configuration. It’s a callback that gets invoked as data is being downloaded similarly to the session.GetProgressObservable code you have:
    var config = new SyncConfiguration(user, "my-partition")
    {
        OnProgress = progress =>
        {
            // Convert the reported bytes in a 0-100% range.
            var progressPercentage = 100.0 * progress.TransferredBytes / progress.TransferableBytes;
            this.ProgressIndicator.Progress = progressPercentage;
        }
    };
    
  2. RefreshAsync does not do any synchronization with the server, but rather updates your local database with changes that may have happened on background threads. This happens automatically when you’re on the main thread and the thread is idle, but you can use RefreshAsync to force update it. An example of this would be, if you download some data via a REST call, insert it on a background thread to avoid blocking the main thread with a large write, then want to lookup the newly inserted objects on the main thread. Calling await RefreshAsync will ensure that the newly inserted objects are available without introducing race conditions.
  3. If you want to ensure the Realm is synchronized, you can use Session.WaitForDownloadAsync. It will resolve once the download has completed. It can be used independently or in combination with GetProgressObservable. Generally speaking, WaitForDownloadAsync is more efficient than tracking progress, so if you just want to ensure the Realm has been synchronized without displaying progress bars, this is the recommended approach.
  4. The IObservable returned by GetProgressObservable with ProgressMode.ForCurrentlyOutstandingWork will complete once the upload/download has completed. So rather than checking for transferred >= transferrable, you can provide a OnComplete callback when subscribing.

I hope that clarifies the questions you’ve had. Finally, I’d like to point you to the warnings on this docs page as those are in effect both when using GetProgressObservable and when using SyncConfiguration.OnProgress.

1 Like

@nirinchev Thanks a lot for the detailed reply.
We were playing with these APIs the past few days and I wanted to come up with a complete report before replying to this message, hence, sorry for the bit delayed reply.

First of all WaitForDownloadAsync and WaitForUploadAsync methods were exactly the ones we needed! I didn’t know about their existence and they worked like a charm.

However, there are problems with the GetProgressObservable method. No matter whether we track the upload or the download progress, and whether we use ProgressMode.ForCurrentlyOutstandingWork or ProgressMode.ReportIndefinitely TransferredBytes/TransferableBytes are accumulative. What I mean by that? Let’s take an example of the upload. Here are some real numbers:

  1. I launch the app and make some changes, after all of the changes are synced here here’s what we get as final upload progress - [12:53:58 INF] Upload progress: 2121568/2121568
  2. I close the application and reopen it, make more changes here is the first progress message I get - [12:55:22 INF] Upload progress: 2121568/2134356

Notice how the TransferredBytes were recovered from the previous session and the TransferableBytes increase with the new changes, which is logical.

The same behavior is present in the download progress. Is this by design? I wouldn’t say so. I would assume, whenever I use ProgressMode.ForCurrentlyOutstandingWork as soon as I am subscribed to this observable, it should tell me how many bytes need to sync as for the current moment, and how many of those were synced, but definitely not how many bytes in general have been synced through the lifetime of the application.

The behaviour that we want to achieve is whenever data is changed in the realm, we will subscribe to the progress observable and monitor the progress of the currently pending changes and show a progress bar in the app. To make sure we don’t do that for every single change, we buffer the changes using the reactive Throttle operator. Here is the reactive chain that we’ve built

//Monitor all realm data changes for the synced realm instance
Observable.FromEventPattern<Realm.RealmChangedEventHandler, EventArgs>(handler => syncedRealm.RealmChanged += handler,
                                                                       handler => syncedRealm.RealmChanged -= handler)
          .Select(_ => syncedRealm)
          //Combine changes happened within the past 10 seconds
          .Throttle(TimeSpan.FromSeconds(10))
          //Get the progress observable
          .Select(realm => new SyncProgressObservable(_logger, realm.GetSession().GetProgressObservable(ProgressDirection.Upload, ProgressMode.ReportIndefinitely)))
          //Now we have observable of sync observables. By using the Switch operator we dispose the previous subscription after we subscribe to the new one
          .Switch();

I think I could add some more reactive operators that essentially would track the delta progress. For instance, as soon as we create a new subscription read the current accumulated progress and subtract so we start from zero every time for the current batch. However, before doing so, I want to confirm with you that the behaviour I see is by design and not a bug. And if it’s by design, I would really like to understand the rationale behind this kind of implementation.

This doesn’t sound like the expected behavior. I’ll need to ask the cloud team to understand why progress doesn’t start from zero every time you create a new observable.

2 Likes

@nirinchev thanks. That’s good to know. For the time being, we a bit more complexity I was able to achieve what I wanted by add some more methods to the reactive chain to combine the first progress event with the current progress event and do the subtraction. Here’s the final pipeline we got. We will test more, to see if we observe any other weird behaviors

//Monitor all realm data changes for the synced realm instance
Observable.FromEventPattern<Realm.RealmChangedEventHandler, EventArgs>(handler => syncedRealm.RealmChanged += handler,
                                                                       handler => syncedRealm.RealmChanged -= handler)
          .Select(_ => syncedRealm)
          //Combine changes happened within the past 10 seconds
          .Throttle(TimeSpan.FromSeconds(10))
          //Get the progress observable
          .Select(realm => realm.GetSession().GetProgressObservable(ProgressDirection.Upload, ProgressMode.ReportIndefinitely))
          //Combine the current progress with the first event in the progress chain
          //and subtract the initial values from the current progress
          .Select(syncProgress => syncProgress.CombineLatest(syncProgress.FirstAsync(), (progress, first) =>
                                                                                        {
                                                                                            var transferred = progress.TransferredBytes - first.TransferredBytes;
                                                                                            var transferable = first.TransferableBytes - first.TransferredBytes;
                                                                                            return new SyncProgress(transferred, transferable);
                                                                                        }))
          //Now we have observable of sync observables. By using the Switch operator we dispose the previous subscription after we subscribe to the new one
          .Switch();

I want to add some updates. We played with the sync progress and polished the final solution even more. There were some lifecycle issues with the existing implementation. Here’s the final implementation

private IObservable<SyncProgress> GetProgress(ProgressDirection progressDirection)
{
    //Our app supports multiple databases. When the current database changes, this observable will fire and
    //we will start a new sequence of progress observable
    IObservable<SyncProgress> syncProgress = _databaseManager.CurrentDatabaseObservable
                                                                //Get the realm for the current database asynchronously
                                                                //the realm factory here is a simple wrapper which returns the instance of the Realm having the database
                                                                //under the hood it will either initialize an offline database or a synced database using SyncConfiguration
                                                                .Where(database => database != null)
                                                                .Select(database => Observable.FromAsync(_ => _realmFactory.GetRealmAsync(database)))
                                                                .Switch()
                                                                //Bind the realm lifecycle to the observable sequence, so that it's disposed when the observable subscription is disposed
                                                                .Select(realm => Observable.Using(() => realm, r => GetProgressForRealm(progressDirection, r)))
                                                                .Switch();

    return syncProgress;
}

private IObservable<SyncProgress> GetProgressForRealm(ProgressDirection progressDirection, Realm realm)
{
    return Observable.Return(realm.SyncSession)
                        //We only want to monitor the progress for synced databases, which means SyncSession should not be null
                        .Where(session => session != null)
                        .Select(session => _sessionService.GetProgress(session, progressDirection, ProgressMode.ReportIndefinitely)
                                                        //Raise the progress events on the thread pool not to block the UI with a lot of progress events
                                                        .ObserveOn(_schedulerProvider.ThreadPool))
                        //Monitor the session progress and create a new progress observable when Transferable Bytes change
                        //That is, whenever there are new changes and the total bytes that need to be transferred change
                        //then we need to create a new progress
                        .Select(syncProgress => syncProgress.CombineLatest(syncProgress.Select(progress => progress)
                                                                                    .DistinctUntilChanged(progress => progress.TransferableBytes),
                                                                        (progress, first) =>
                                                                        {
                                                                            ulong transferred = progress.TransferredBytes - first.TransferredBytes;
                                                                            ulong transferable = first.TransferableBytes - first.TransferredBytes;
                                                                            return new SyncProgress(transferred, transferable);
                                                                        }))
                        .Switch()
                        //We only want to report a progress when it actually changes
                        .DistinctUntilChanged(progress => $"{progress.TransferredBytes}{progress.TransferableBytes}");
}

I hope this will help someone that wants to implement sync progress. It took really a lot of effort to come up with the final solution and we didn’t anticipate that this would be so hard to implement. I really hope that the realm team would fix the progress handling, so we won’t have to deal with such a complexity.

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.