Skip to content

withLatestFrom doesn't seem to be thread safe #163

Open
@fbarbat

Description

@fbarbat

withLatestFrom doesn't seem to be thread safe when subscribing to two different publishers that emit elements from different threads.
This is part of the current implementation of withLatestFrom. Check the comments in uppercase.

private func trackLatestFromSecond(onInitialValue: @escaping () -> Void) {
      var gotInitialValue = false

      let subscriber = AnySubscriber<Other.Output, Other.Failure>(
        receiveSubscription: { [weak self] subscription in
            self?.otherSubscription = subscription
            subscription.request(.unlimited)
        },
        receiveValue: { [weak self] value in
            guard let self = self else { return .none }

            // THIS IS CALLED FROM THREAD B
            self.latestValue = value

            if !gotInitialValue {
                // When getting initial value, start pulling values
                // from upstream in the main sink
                self.sink = Sink(upstream: self.upstream,
                                 downstream: self.downstream,
                                 transformOutput: { [weak self] value in
                                    guard let self = self,
                                          // THIS IS CALLED FROM THREAD A, ACCESSING THE SAME VAR BUT IT IS NOT SYNCHRONIZED
                                          let other = self.latestValue else { return nil }

                                    return self.resultSelector(value, other)
                                 },
                                 transformFailure: { $0 })

                // Signal initial value to start fulfilling downstream demand
                gotInitialValue = true
                onInitialValue()
            }

            return .unlimited
        },
        receiveCompletion: nil)

      self.second.subscribe(subscriber)
    }

Should a lock over latestValue be added to ensure thread safety? Are there reasons not to do it (for example, performance)? Should CombineExt at least update the Swift docs saying it is not supposed to be used from multiple threads?

Thank you.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions