observe realm for closed reports and send them to server
this should all be done on bg thread (e.g. DispatchQueue(qos: .background))
func sendReports() {
observe(RealmReport.self, predicate: .init(format: "state == %@", .closed)
.receive(on: DispatchQueue(label: "report-sender")
.sink { report in
self.send(report: report) // makes API call
}
.store(in: &subscriptions) // keep observing as long as app is alive
}
func observe(_ type: Object.Type, predicate: NSPredicate) -> AnyPublisher<[Report], Error> {
Just((type, predicate))
.receive(on: DispatchQueue.main) // should be on bg queue, but realm publisher needs runloop, I guess?
.flatMap { objectType, predicate in
self.realm().objects(objectType) // Realm instance is kept alive til app is killed
.filter(predicate)
.collectionPublisher
// .threadSafeReference() // send live objects, heavy
// .freeze() // lightweight; snapshot of live objects to pass to other threads, but version is cached in file until Realm instance is gone?
.map(\.elements)
}
.tryMap { $0.map { Report(from: $0) } }
.mapError { _ in
Error.dbError
}
.eraseToAnyPublisher()
}
I found some problems:
.freeze() is preferred to be used with Combine,
but is it increasing active versions?
I saw:
struct Response {
// dummy struct to mock a response from a server
}
func doNetworkTask(_ results: Results<Movie>) -> AnyPublisher<Result<Response, Error>, Never> {
// do network things and return a publisher
}
let queue = DispatchQueue(label: "my-queue")
let objects = try! Realm().objects(Movie.self).filter("name == 'Foo'")
objects
.collectionPublisher
.receive(on: queue)
.assertNoFailure()
.flatMap(doNetworkTask)
.sink { response in
print(response)
}.store(in: &cancellable)
Your example is causing issues because you have a Realm creating a publisher inside a publisher. This would cause a lot of issues with threading. The Realm should be accessed from outside the publisher, and then access .collectionPublisher from there.
Thanks, got it.
I used the Just().receive(on:) just to make sure that Realm() is called on main.
but I could change it and always call func observe(..) on .main.
It’s correct that it needs to be on .main, right? since it’s the only Queue with runloop, else the Realm does not auto refresh, and publisher would not send any updates. Right?
try Realm(configuration: configuration) // when called from any other queue than .main, I get this exception after some writes: 'Expected failure: Number of active versions () in the Realm exceeded the limit of (RLMException)'
.objects(objectType)
.filter(predicate)
.collectionPublisher
.subscribe(on: DispatchQueue(label: "") // Exception NSException * "Can only add notification blocks from within runloops."
.receive(on: /*some queue*/) // calls .threadSafeReference() under the hood
To prevent that downstream publishers use queues other than main, I need to: