Observe realm on background thread using Combine

I’m trying to setup following background service:

  • realm db to store reports
  • observe realm for closed reports and send them to server
  • this should all be done on bg thread (e.g. DispatchQueue(qos: .background))
func sendReports() {
     observe(RealmReport.self, predicate: .init(format: "state == %@", .closed)
            .receive(on: DispatchQueue(label: "report-sender")
            .sink { report in
                self.send(report: report) // makes API call
            }
            .store(in: &subscriptions) // keep observing as long as app is alive
}

    func observe(_ type: Object.Type, predicate: NSPredicate) -> AnyPublisher<[Report], Error> {
        Just((type, predicate))
            .receive(on: DispatchQueue.main) // should be on bg queue, but realm publisher needs runloop, I guess?
            .flatMap { objectType, predicate in
                self.realm().objects(objectType) // Realm instance is kept alive til app is killed
                    .filter(predicate)
                    .collectionPublisher
//                  .threadSafeReference() // send live objects, heavy
//                    .freeze() // lightweight; snapshot of live objects to pass to other threads, but version is cached in file until Realm instance is gone?
                    .map(\.elements)
            }
            .tryMap { $0.map { Report(from: $0) } }
            .mapError { _ in
                Error.dbError
            }
            .eraseToAnyPublisher()
    }

I found some problems:

  • .freeze() is preferred to be used with Combine,
    but is it increasing active versions?
    I saw:
  • assert due to max number of active versions
  • realm accessed from wrong thread
    sometimes

BTW: this is a iOS app with local Realm db.

You could simplify this all to something like:

    struct Response {
        // dummy struct to mock a response from a server
    }

    func doNetworkTask(_ results: Results<Movie>) -> AnyPublisher<Result<Response, Error>, Never> {
        // do network things and return a publisher
    }


   let queue = DispatchQueue(label: "my-queue")
   let objects = try! Realm().objects(Movie.self).filter("name == 'Foo'")
   objects
       .collectionPublisher
       .receive(on: queue)
       .assertNoFailure()
       .flatMap(doNetworkTask)
       .sink { response in
            print(response)
        }.store(in: &cancellable)

Your example is causing issues because you have a Realm creating a publisher inside a publisher. This would cause a lot of issues with threading. The Realm should be accessed from outside the publisher, and then access .collectionPublisher from there.

1 Like

Thanks, got it.
I used the Just().receive(on:) just to make sure that Realm() is called on main.
but I could change it and always call func observe(..) on .main.

It’s correct that it needs to be on .main, right? since it’s the only Queue with runloop, else the Realm does not auto refresh, and publisher would not send any updates. Right?

You will still get changes on a serial queue with Realm. You shouldn’t have to receive on the main queue unless you are updating the UI.

1 Like

Observations:

             try Realm(configuration: configuration) // when called from any other queue than .main, I get this exception after some writes: 'Expected failure: Number of active versions () in the Realm exceeded the limit of  (RLMException)'
                    .objects(objectType)
                    .filter(predicate)
                    .collectionPublisher
                    .subscribe(on: DispatchQueue(label: "") // Exception	NSException *	"Can only add notification blocks from within runloops."	
                    .receive(on: /*some queue*/) // calls .threadSafeReference() under the hood

To prevent that downstream publishers use queues other than main, I need to:

                   ...
                   .collectionPublisher
                    .subscribe(on: DispatchQueu.main)

and call try Realm() from .main