Customize handling of asynchronous events by combining event-processing operators using Combine.

Posts under Combine tag

16 Posts
Sort by:

Post

Replies

Boosts

Views

Activity

Issue with Property Wrapper Publisher Being Deallocated Prematurely When Not Stored as a Property
Hello everyone, I've built a @CurrentValue property wrapper that mimics the behavior of @Published, allowing a property to publish values on "did set". I've also created my own assign(to:) implementation that works with @CurrentValue properties, allowing values to be assigned from a publisher to a @CurrentValue property. However, I'm running into an issue. When I use this property wrapper with two classes and the source class (providing the publisher) is not stored as a property, the subscription is deallocated, and values are no longer forwarded. Here's the property wrapper code: @propertyWrapper public struct CurrentValue<Value> { /// A publisher for properties marked with the `@CurrentValue` attribute. public struct Publisher: Combine.Publisher { public typealias Output = Value public typealias Failure = Never /// A subscription that forwards the values from the CurrentValueSubject to the downstream subscriber private class CurrentValueSubscription<S>: Subscription where S: Subscriber, S.Input == Output, S.Failure == Failure { private var subscriber: S? private var currentValueSubject: CurrentValueSubject<S.Input, S.Failure>? private var cancellable: AnyCancellable? init(subscriber: S, publisher: CurrentValue<Value>.Publisher) { self.subscriber = subscriber self.currentValueSubject = publisher.subject } func request(_ demand: Subscribers.Demand) { var demand = demand cancellable = currentValueSubject?.sink { [weak self] value in // We'll continue to emit new values as long as there's demand if let subscriber = self?.subscriber, demand > 0 { demand -= 1 demand += subscriber.receive(value) } else { // If we have no demand, we'll cancel our subscription: self?.subscriber?.receive(completion: .finished) self?.cancel() } } } func cancel() { cancellable = nil subscriber = nil currentValueSubject = nil } } /// A subscription store that holds a reference to all the assign subscribers so we can cancel them when self is deallocated fileprivate final class AssignSubscriptionStore { fileprivate var cancellables: Set<AnyCancellable> = [] } fileprivate let subject: CurrentValueSubject<Value, Never> fileprivate let assignSubscriptionStore: AssignSubscriptionStore = .init() fileprivate var value: Value { get { subject.value } nonmutating set { subject.value = newValue } } init(_ initialValue: Output) { self.subject = .init(initialValue) } public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input { let subscription = CurrentValueSubscription(subscriber: subscriber, publisher: self) subscriber.receive(subscription: subscription) } } public var wrappedValue: Value { get { publisher.value } nonmutating set { publisher.value = newValue } } public var projectedValue: Publisher { get { publisher } mutating set { publisher = newValue } } private var publisher: Publisher public init(wrappedValue: Value) { publisher = .init(wrappedValue) } } /// A subscriber that receives values from an upstream publisher and assigns them to a downstream CurrentValue property. private final class AssignSubscriber<Input>: Subscriber, Cancellable { typealias Failure = Never private var receivingSubject: CurrentValueSubject<Input, Never>? private weak var assignSubscriberStore: CurrentValue<Input>.Publisher.AssignSubscriptionStore? init(currentValue: CurrentValue<Input>.Publisher) { self.receivingSubject = currentValue.subject self.assignSubscriberStore = currentValue.assignSubscriptionStore } func receive(subscription: Subscription) { // Hold a reference to the subscription in the downstream publisher // so when it deallocates, the susbcription is automatically cancelled assignSubscriberStore?.cancellables.insert(AnyCancellable(subscription)) subscription.request(.unlimited) } func receive(_ input: Input) -> Subscribers.Demand { receivingSubject?.value = input return .none } func receive(completion: Subscribers.Completion<Never>) { // Nothing to do here } public func cancel() { receivingSubject = nil assignSubscriberStore = nil } } public extension Publisher where Self.Failure == Never { /// Assigns the output of the upstream publisher to a downstream CurrentValue property /// - Parameter currentValue: The CurrentValue property to assign the values to func assign(to currentValue: inout CurrentValue<Self.Output>.Publisher) { let subscriber = AssignSubscriber(currentValue: currentValue) self.subscribe(subscriber) } } Here’s an example demonstrating the issue, where two classes are used: Source, which owns the @CurrentValue property, and Forwarder1, which subscribes to updates from Source: final class Source { @CurrentValue public private(set) var value: Int = 1 func update(value: Int) { self.value = value } } final class Forwarder1 { @CurrentValue public private(set) var value: Int init(source: Source) { self.value = source.value source.$value.dropFirst().assign(to: &$value) // The source is not stored as a property, so the subscription deallocates } func update(value: Int) { self.value = value } } With this setup, if source isn’t retained as a property in Forwarder1, the subscription is deallocated prematurely, and value in Forwarder1 stops receiving updates from Source. However, this doesn’t happen with @Published properties in Combine. Even if source isn’t retained, @Published subscriptions seem to stay active, propagating values as expected. My Questions: What does Combine do internally with @Published properties that prevents the subscription from being deallocated prematurely, even if the publisher source isn’t retained as a property? Is there a recommended approach to address this in my custom property wrapper to achieve similar behavior, ensuring the subscription isn’t lost? Any insights into Combine’s internals or suggestions on how to resolve this would be greatly appreciated. Thank you!
0
3
90
1w
Async/Await and updating state
When using conformance to ObservableObject and then doing async work in a Task, you will get a warning courtesy of Combine if you then update an @Published or @State var from anywhere but the main thread. However, if you are using @Observable there is no such warning. Also, Thread.current is unavailable in asynchronous contexts, so says the warning. And I have read that in a sense you simply aren't concerned with what thread an async task is on. So for me, that begs a question. Is the lack of a warning, which when using Combine is rather important as ignoring it could lead to crashes, a pretty major bug that Apple seemingly should have addressed long ago? Or is it just not an issue to update state from another thread, because Xcode is doing that work for us behind the scenes too, just as it manages what thread the async task is running on when we don't specify? I see a lot of posts about this from around the initial release of Async/Await talking about using await MainActor.run {} at the point the state variable is updated, usually also complaining about the lack of a warning. But ow years later there is still no warning and I have to wonder if this is actually a non issue. On some ways similar to the fact that many of the early posts I have seen related to @Observable have examples of an @Observable ViewModel instantiated in the view as an @State variable, but in fact this is not needed as that is addressed behind the scenes for all properties of an @Observable type. At least, that is my understanding now, but I am learning Swift coming from a PowerShell background so I question my understanding a lot.
3
0
244
1w
iOS 18 .onChange(of: scenePhase) is constantly triggered
Hey all, I am facing a new issue on iOS 18 with ScenePhase and .onChange modifier. Here's roughly the code: .onChange(of: scenePhase, initial: true) { old, new in if new == .active { doStuff() } } on iOS 17 this was working as expected and triggered when eg. ap was brough from background or user navigated back to view on ios18 though this code gets triggered constatnly in my case when I eg. navigate to another screen on top of the one with that .onChange. after navigation happens, the onChange is being triggered continuosly and does not stop which causes doStuff() to be called multiple times
1
0
436
Sep ’24
App freezes when built with Xcode 16 on iOS 18, but not on iOS 17 or lower, or with Xcode 15 on iOS 18
I'm experiencing an issue where my app freezes when built with Xcode 16 on iOS 18. Additional Information: The issue does not occur when building the app with Xcode 16 on iOS 17 or lower. The issue does not occur when building the app with Xcode 15 on iOS 18. The CPU usage spikes to 100% when the app freezes. The app specifically freezes after the code runs into .sink(receiveValue:). Here is the relevant code snippet: @Published var selectedCardData: CardData? @Published var selectedRootTab: RootViewTab = .statement override func load() { state = .loading $selectedCardData.ignoreNil() .removeDuplicates() .map { [unowned self] cardData in $selectedRootTab.filter { $0 == .statement } .first() .map { _ in cardData } } .switchToLatest() .sink(receiveValue: { value in print(value) // value not nil print("Execution reaches this point and the app freezes (CPU 100%).") }) .store(in: &cancellables) } Are there any known changes in iOS 18 or Xcode 16 that might affect this code?
2
2
434
Sep ’24
Can I omit `ObservableObject` conformance?
Apple's documentation pretty much only says this about ObservableObject: "A type of object with a publisher that emits before the object has changed. By default an ObservableObject synthesizes an objectWillChange publisher that emits the changed value before any of its @Published properties changes.". And this sample seems to behave the same way, with or without conformance to the protocol by Contact: import UIKit import Combine class ViewController: UIViewController { let john = Contact(name: "John Appleseed", age: 24) private var cancellables: Set<AnyCancellable> = [] override func viewDidLoad() { super.viewDidLoad() // Do any additional setup after loading the view. john.$age.sink { age in print("View controller's john's age is now \(age)") } .store(in: &cancellables) print(john.haveBirthday()) } } class Contact { @Published var name: String @Published var age: Int init(name: String, age: Int) { self.name = name self.age = age } func haveBirthday() -> Int { age += 1 return age } } Can I therefore omit conformance to ObservableObject every time I don't need the objectWillChange publisher?
2
0
313
Jul ’24
How can I subscribe to changes to an @AppStorage var...
From what I've read, @AppStorage vars should be @Published, however the following code generates a syntax error at extended's .sink modifier: Cannot call value of non-function type 'Binding<Subject>' class LanguageManager: ObservableObject { @Published var fred = "Fred" @AppStorage("extended") var extended: Bool = true private var subscriptions = Set<AnyCancellable>() init() { $fred .sink(receiveValue: {value in print("value: \(value)") }) .store(in: &subscriptions) $extended .sink(receiveValue: {value in print("value: \(value)") }) .store(in: &subscriptions) } Does anyone know of a way to listen for (subscribe to) changes in @AppStorage values? didSet works in for a specific subset of value changes, but this is not sufficient for my intended use.
2
0
846
Jun ’24
How might I get didSet behaviour on an AppStorage var?
I've defined a value stored in UserDefaults. In a view struct I have code that can successfully update the stored value. I've also added an @AppStorage var in an instance of a class, that can read this value and run business logic that depends on the current stored value. But what I really want to do, is have code in my class that gets automatically called when the value stored in UserDefaults gets updated. Basically I want to do this: @AppStorage("languageChoice") var languageChoice: LanguageChoice = .all { didSet { print("hello") } } Unfortunately didSet closures in @AppStorage vars do not appear to get called :-( My clumsy attempts to use combine have all ended in tears from the compiler. Any/all suggestions are greatly appreciated. thanks, Mike
3
0
691
Jun ’24
Async publisher for AVPlayerItem.tracks doesn't produce values.
I'm just putting this here for visibility, I already submitted FB13688825. If you say this: Task { for await tracks in avPlayerItem.publisher(for: \.tracks, options: [.initial]).values { print("*** fired with: \(tracks.description)") } } ...it fires once with: "*** fired with: []" If you say this: avPlayerItem.publisher(for: \.tracks).sink { [weak self] tracks in print("*** fired with: \(tracks.description)") }.store(in: &subscriptions) ...you get, as expected, multiple fires, most with data in them such as: *** fired with: [<AVPlayerItemTrack: 0x10a9869a0, assetTrack = <AVAssetTrack: 0x10a9869f0... I think it's a bug but I'm just going to go back to the "old way" for now. No emergency.
2
0
665
Mar ’24
Process() object and async operation
I am maintaining a macOS app, a GUI on top of a command line tool. A Process() object is used to kick off the command line tool with arguments. And completion handlers are triggered for post actions when command line tool is completed. My question is: I want to refactor the process to use async and await, and not use completion handlers. func execute(command: String, arguments:[String]) -&gt; async { let task = Process() task.launchPath = command task.arguments = arguments ... do { try task.run() } catch let e { let error = e propogateerror(error: error) } ... } ... and like this in calling the process await execute(..) Combine is used to monitor the ProcessTermination: NotificationCenter.default.publisher( for: Process.didTerminateNotification) .debounce(for: .milliseconds(500), scheduler: DispatchQueue.main) .sink { _ in ..... // Release Combine subscribers self.subscriptons.removeAll() }.store(in: &amp;subscriptons) Using Combine works fine by using completion handlers, but not by refactor to async. What is the best way to refactor the function? I know there is a task.waitUntilExit(), but is this 100% bulletproof? Will it always wait until external task is completed?
2
0
697
Mar ’24
How to watch changes on fields of `@Published FamilyActivitySelection`?
class MyModel: ObservableObject { @Published var selection = FamilyActivitySelection() init() { $selection.sink { newSelection in print(newSelection) } } } class MyView: View { @StateObject var model = MyModel() // some body // .... // my method func removeToken(token: ApplicationToken) { model.selection.applicationTokens.remove(token) } } I am using the above code. When I call removeToken, the callback from the sink (which is registered in init() of MyModel) is called without any changes. newSelection still contains the token that I removed. Currently, I am using the additional code below to work around the problem. .onChange(of: model.selection.applicationTokens) { newSet in model.selection.applicationTokens = newSet } Should I use the workaround solution, or am I missing something?
1
0
658
Jan ’24
I want to move a CoreImage task to the background...
It feels like this should be easy, but I'm having conceptual problems about how to do this. Any help would be appreciated. I have a sample app below that works exactly as expected. I'm able to use the Slider and Stepper to generate inputs to a function that uses CoreImage filters to manipulate my input image. This all works as expected, but it's doing some O(n) CI work on the main thread, and I want to move it to a background thread. I'm pretty sure this can be done using combine, here is the pseudo code I imagine would work for me: func doStuff() { // subscribe to options changes // .receive on background thread // .debounce // .map { model.inputImage.combine(options: $0) // return an object on the main thread. // update an @Published property? } Below is the POC code for my project. Any guidance as to where I should use combine to do this would be greatly appreciate. (Also, please let me know if you think combine is not the best way to tackle this. I'd be open to alternative implementations.) struct ContentView: View { @State private var options = CombineOptions.basic @ObservedObject var model = Model() var body: some View { VStack { Image(uiImage: enhancedImage) .resizable() .aspectRatio(contentMode: .fit) Slider(value: $options.scale) Stepper(value: $options.numberOfImages, label: { Text("\(options.numberOfImages)")}) } } private var enhancedImage: UIImage { return model.inputImage.combine(options: options) } } class Model: ObservableObject { let inputImage: UIImage = UIImage.init(named: "IMG_4097")! } struct CombineOptions: Codable, Equatable { static let basic: CombineOptions = .init(scale: 0.3, numberOfImages: 10) var scale: Double var numberOfImages: Int }
1
0
758
Jan ’24
AVFoundation player.publisher().assign() for @Observable/@ObservationTracked?
Following this Apple Article, I copied their code over for observePlayingState(). The only difference I am using @Observable instead of ObservableObject and @Published for var isPlaying. We get a bit more insight after removing the $ symbol, leading to a more telling error of: Cannot convert value of type 'Bool' to expected argument type 'Published.Publisher' Is there anyway to get this working with @Observable?
2
0
833
Nov ’23
Why aren't changes to @Published variables automatically published on the main thread?
Given that SwiftUI and modern programming idioms promote asynchronous activity, and observing a data model and reacting to changes, I wonder why it's so cumbersome in Swift at this point. Like many, I have run up against the problem where you perform an asynchronous task (like fetching data from the network) and store the result in a published variable in an observed object. This would appear to be an extremely common scenario at this point, and indeed it's exactly the one posed in question after question you find online about this resulting error: Publishing changes from background threads is not allowed Then why is it done? Why aren't the changes simply published on the main thread automatically? Because it isn't, people suggest a bunch of workarounds, like making the enclosing object a MainActor. This just creates a cascade of errors in my application; but also (and I may not be interpreting the documentation correctly) I don't want the owning object to do everything on the main thread. So the go-to workaround appears to be wrapping every potentially problematic setting of a variable in a call to DispatchQueue.main. Talk about tedious and error-prone. Not to mention unmaintainable, since I or some future maintainer may be calling a function a level or two or three above where a published variable is actually set. And what if you decide to publish a variable that wasn't before, and now you have to run around checking every potential change to it? Is this not a mess?
9
0
2.4k
Oct ’24
AsyncPublisher of KeyValueObservingPublisher doesn't work
Hi, I'm trying to use async/await for KVO and it seems something is broken. For some reason, it doesn't go inside for in body when I'm changing the observed property. import Foundation import PlaygroundSupport class TestObj: NSObject {   @objc dynamic var count = 0 } let obj = TestObj() Task {   for await value in obj.publisher(for: \.count).values {     print(value)   } } Task.detached {   try? await Task.sleep(for: .microseconds(100))   obj.count += 1 } Task.detached {   try? await Task.sleep(for: .microseconds(200))   obj.count += 1 } PlaygroundPage.current.needsIndefiniteExecution = true Expected result: 0, 1, 2 Actual result: 0 Does anyone know what is wrong here?
2
1
2.4k
Feb ’24
@Published properties and the main thread
I am working on a library, a Swift package. We have quite a few properties on various classes that can change and we think the @Published property wrapper is a good way to annotate these properties as it offers a built-in way to work with SwiftUI and also Combine. Many of our properties can change on background threads and we've noticed that we get a purple runtime issue when setting the value from a background thread. This is a bit problematic for us because the state did change on a background thread and we need to update it at that time. If we dispatch it to the main queue and update it on the next iteration, then our property state doesn't match what the user expects. Say they "load" or "start" something asynchronously, and that finishes, the status should report "loaded" or "started", but that's not the case if we dispatch it to the main queue because that property doesn't update until the next iteration of the run loop. There also isn't any information in the documentation for @Published that suggests that you must update it on the main thread. I understand why SwiftUI wants it on the main thread, but this property wrapper is in the Combine framework. Also it seems like SwiftUI internally could ask to receive the published updates on the main queue and @Published shouldn't enforce a specific thread. One thing we are thinking about doing is writing our own property wrapper, but that doesn't seem to be ideal for SwiftUI integration and it's one more property wrapper that users of our package would need to be educated about. Any thoughts on direction? Is there anyway to break @Published from the main thread?
3
1
4.8k
Jan ’24