[RxSwift] Subscribe의 동작 파헤치기

2019-12-16

Subscribe의 일반적인 사용

RxSwift를 사용한다면 예외 없이 Subscribe를 사용하고 계실 것입니다.

보통 다음같이 사용하죠.

[Subscribe의 사용]

let numbersObservable = Observable.of(1, 2, 3, 4).debug()

numbersObservable
.subscribe()
.disposed(by: disposeBag)

[코드 실행결과]

-> subscribed**
-> Event next(1)**
-> Event next(2)**
-> Event next(3)**
-> Event next(4)**
-> Event completed**
-> isDisposed**

익숙하다구요? 당연하다구요?

그렇다면 한가지 질문을 드리겠습니다.

Observable은 보이는데 Observer는 어딨을까요?

보통 SubjectObservable이면서 Observer라고 얘기하는데, 여기서 말하는 Observer를 한번이라도 본적 있으신가요? 전 없는데…

Observer 는 어딨는 걸까요? 🤔

Subscribe의 정의

공식 문서를 한번 살펴보겠습니다1. 설명의 처음 한 문단만 보겠습니다.

Subscribe
operate upon the emissions and notifications from an Observable
The Subscribe operator is the glue that connects an observer to an Observable. In order for an observer to see the items being emitted by an Observable, or to receive error or completed notifications from the Observable, it must first subscribe to that Observable with this operator.


Subscribe는 Observable의 알림과 방출에 따라 동작한다.
Subscribe 연산자는 Observable에 Observer를 연결하는 접착제다. Observable에서 아이템이 방출되거나 에러 혹은 완료 알림을 받기 위해서는 반드시 처음에 이 연산자를 Observable에 사용해 구독해야만 가능하다.

Wha~~t???

역시 공식문서느님은 말해주고 계셨네요. 문서를 읽으며 알 수 있는 사실은 두가지 입니다.

하나는 SubscribeObservableObserver를 연결해준다는 것! SubscribeObserver같이 느껴졌는데 SubscribeObserver는 아니였습니다.

다른 하나는 SubscribeOperator였다는 사실입니다. 😮 헐…

Observable을 합치거나 발행된 아이템을 변형시키거나 걸러내는 행위를 하는 것만 Operator인줄 알았는데 Subscribe도 연산자였던 것입니다. 하는 일은 정의와 같이 ObservableObserver를 붙여주는 일이죠.

오호! 지식이 늘었네요! 😙 (지식이 +1 늘었다.)

SubscribeObservableObserver를 붙여준다는 사실은 확인했는데, 아직까지 Observer가 어디있는지는 발견하지 못했습니다.

이제 뭘해야할까요? Subscribe 안의 코드를 한번 살펴보죠.

Subscribe의 내부

[ObservableType+Extensions.swift]

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            let observer = AnonymousObserver<Element> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }

저희가 위 코드에서 살펴볼 부분은 두 부분입니다. 하나는 여기

let observer = AnonymousObserver<Element> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }

보시다시피 Subscribe안에서 Observer를 생성하고 있네요! Observerevent를 종류별로 방출하는 역할을 하고 있네요. 말그대로 이벤트를 관찰하고 거기에 맞는 행위를 하는 Observer역할을 하는 진짜 Observer입니다.

그다음에 살펴볼 코드는 다음 코드입니다.

return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )

Subscribe를 하면 반환되는 것이 Disposable인데 Subscribe안에서 self.asObservable().subscribe(observer) 요런 인자를 만들어서 Disposable을 생성하고 있네요.

self.asObservable()을 좀 더 뜯어보면

extension ObservableType {
    
    /// Default implementation of converting `ObservableType` to `Observable`.
    public func asObservable() -> Observable<Element> {
        // temporary workaround
        //return Observable.create(subscribe: self.subscribe)
        return Observable.create { o in
            return self.subscribe(o)
        }
    }
}

Observable을 생성하고 subscribe를 한 구독체를 반환하고 그 Observable.subscribe(observer)Observer를 붙인 구독체를 최종적으로 반환하는 것을 확인할 수 있습니다.

내부에서 subscribe를 두번하는 것에 대한 얘기는 여기서는 생략하고(확실히 잘 몰라서 😓) Subscribe를 했을때 발생하는 일을 정리해 보겠습니다.

Subscribe가 하는 일

ObservableSubscribe하면 Subscribe내부에서

  1. Observer를 생성하고 생성한 그 Observer
  2. 내부에서 생성한 Observable에 붙이고
  3. 붙인 그 구독체를 반환한다.

이렇게 정리할 수 있습니다.

Subscribe가 하는 일을 기억해두세요.

사실 이번 포스트는 다음 포스트인 ObservableShareShare(replay)를 설명하기 위한 사전 포스트거든요. 😊

다음 포스트에서 또 만나요~