티스토리 뷰

카테고리 없음

RxJava2의 5가지 Ovservable

박스여우 2019.04.15 18:27

ReactiveX(RxJava, RxJs, Rx*)은 비동기 작업 및 이벤트 기반 프로그램을 작성하기 위한 라이브러리 입니다. 최근(?) 안드로이드부터 시작해서 프론트엔드, 백엔드까지 다양한 부분에 rx가 도입되고 있습니다. 필자 또한 rx에 관심이 생겨서 수개월간 사용해왔지만 이제서야 rx에 대한 기초를 정리해보려고 합니다.

 Reactive Programming


Rx에 대해 알아보기 전에 rx의 기본 개념인 Reactive Programming에 대해 알고 넘어가야 할 필요가 있습니다.

Reactive Programming은 비동기 데이터 스트림을 이용한 프로그래밍으로 OOP, FP와 같이 프로그래밍 패러다임이라고 볼 수 있습니다. 좀더 쉽게 설명하자면, 데이터를 작은 데이터의 연속(스트림)으로 다루는 프로그래밍 기법이라고 할 수 있습니다. 사실 Reactive Programming은 지금까지 해왔던 프로그래밍과 크게 다르거나 새로운 개념이 아닙니다.

시간에 따른 버튼 이벤트 발생흐름

위의 다이어그램은 기존 UI 프로그램의 버튼 클릭 이벤트의 흐름입니다. 시간에 따라 사용자가 버튼을 클릭하면 클릭 이벤트가 발생하며 이러한 이벤트를 감지하여 처리하고 있습니다. 이처럼 기존의 버튼 클릭 이벤트, UI의 이벤트는 비동기 이벤트 스트림이며 이미 Reactive Programming을 하고 있었다고 할 수 있습니다.

Click 이벤트를 통해 HTTP 요청을 하여 response를 받고 이를 list로 가공하는 흐름

단순한 버튼 클릭 이벤트뿐만 아니라 버튼을 클릭하고 해당 이벤트를 통해 HTTP request를 하고 response를 가공하는 일련의 복잡한 과정 역시 스트림으로 다룰 수 있습니다. 아래 예제는 다이얼로그의 흐름을 코드로 구현한 것 입니다.

# 실제 동작하는 코드가 아닌 예시입니다.
let button = document.querySelector('#button');
Observable.fromEvent(button, 'click')
  .map(() => requestApi.call())
  .map((response) => responseToArray(response))
  .subscribe(res => {
    console.log(`click result ${res.length}`);
  });

기존의 명령형 프로그래밍 코드라면 훨씬 복잡하고 긴 코드를 짜야했습니다. 하지만 Reactive Programming은 데이터를 하나의 흐름, 스트림으로 다루어 코드가 간결해지고 유연해지며 코드만 봐도 데이터의 흐름이 명시적으로 보인다는 장점이 있습니다. 이처럼 Reactive Programming은 비동기 데이터를 다룰 때 기존의 명령형 프로그래밍 대신 스트림으로 처리하는 패러다임입니다.

 ReactiveX


ReactiveX - Rx는 Reactive Programming자체가 아니라 Reactive Programming을 위한 API입니다. Rx에서는 Reactive Programming의 비동기 데이터 스트림을 Observable이라고 정의합니다. 영어 단어 그대로 관측가능한 대상 이라고 생각하면 됩니다. 프로그래밍 패턴중 옵저버 패턴을 생각하시면 이해하기 쉬운데요, 어떠한 데이터를 배출하는 Ovservable이라는 대상을 subscribe하여 데이터를 스트림 형태로 처리하는것이 큰 그림이라고 보시면 됩니다.

 

Rx에서는 Observable을 관찰하고 단순한 동작을 수행하는 것에 그치지않고 filter, debounce, throttle등 범용적으로 사용되는 편리한 여러가지 함수(Operator)들을 제공해주며 사용자가 직접 필요한 Operator를 정의할 수도 있으며 이 외에도 다양한 장점덕분에 Reactive Programming 의 대표적인 API로 자리잡았습니다. 하지만 rx에서 제공하는 수많은 operator로 인해 러닝커브가 다소 높을 수 있다는 점도 존재합니다.

 

이번 포스팅에서는 Operator에 대해서는 다루지 않으며 rx의 기본적인 개념인 5가지 Observable의 종류들에 대해 알아보도록 하겠습니다.

 

Observable

rx의 기본적인 단위는 Observable입니다. Observable로부터 발생되는 이벤트는 next, error, complete 세가지 입니다. next는 데이터가 발생되었을 때 호출되며 error는 스트림 처리, 또는 데이터 발생 과정 중 에러가 발생했을때, complete는 Observable에 대한 작업이 끝나고 스트림이 에러없이 정상적으로 닫혔을 때에 호출됩니다.

 

아래 예제는 JavaScript에서 input 입력 이벤트를 Observable로 다루는 예제입니다.

const node = document.querySelector('input[type=text]');

const inputObservable = Rx.Observable.fromEvent(node, 'input');

inputObservable.subscribe({
  next: event => console.log(`typed : ${event.target.value}!`),
  error: err => console.log(`error : ${err}`),
  complete: () => console.log(`Complete!`),
});

input element에서 발생되는 이벤트를 Observable로 만들고 subscribe하여 지속적으로 발생하는 이벤트를 감지합니다. input element에 텍스트가 입력되어 이벤트가 발생하면 subscribe에 넘겨준 next가 호출됩니다. 이벤트를 감지하는 중 error가 발생하거나 구독(subsribe)이 완료되면 error에 대한 핸들러와 complete 핸들러가 호출됩니다.

# input 이벤트를 스트림 형태로 처리하는 예제
Rx.Observable.fromEvent(node, 'input')
  .map(event => event.target.value)
  .filter(value => value.length >= 2)
  .debounce(500)
  .subscribe(value => {
    console.log(value)
  });

Observable은 위와같이 지속적으로 발생하는 이벤트뿐만 아니라 비동기적인 네트워크 요청, 리스트의 스트림 처리 등 다양한 상황에 활용할 수 있습니다. 하지만 다양한 스트림을 처리하기에 Observable하나만으로는 부족하다는 의견이 있어 Rx2에서는 다양한 데이터 스트림에 따라 사용할 수 있는 Observable들이 늘어나게 되었습니다.

 

Flowable

Rx2에서는 Flowable이 새롭게 생겼습니다. Observable을 사용하다보면 데이터를 생산하는 속도를 subscribe하여 소비하는 속도를 따라잡지 못하는 경우가 있습니다. 이런 경우에 발생한 데이터가 누락되거나 메모리 부족이 발생합니다. 그래서 기존에는 Observable에 Backpressure Buffer를 두엇고 이 버퍼에 넘치는 데이터를 보관하고 버퍼가 가득찼을 경우 새로운 데이터를 publish 하지 않았습니다.

 

하지만 이러한 Backpressure Buffer가 rx를 잘 모르는 초보자들에게는 의도하지 않은 동작이 일어날 수 있다고 생각되어 Observable에서 Backpressure를 없에고 이 대신 Flowable를 추가하였습니다.

 

Flowable에서는 아래의 5 가지의 BackpressureStrategy를 통해 배압 문제를 다룰 수 있습니다.

 

  • BUFFER : 처리할 수 없어서 넘치는 데이터를 별도의 버퍼에 저장
  • DROP : 처리할 수 없어서 넘치는 데이터를 무시(소비자에게 전달x)
  • LATEST : 넘치는 데이터를 버퍼에 저장하지만 버퍼가 찰 경우 오래된 데이터를 무시하고 최신의 데이터만 유지
  • ERROR : 넘치는 데이터가 버퍼 크기를 초과하면 MissingBackPressureException에러를 통지
  • NONE : 특정 처리를 수행하지 않는다.
// RxJava1
Observable.from(...)
    .onBackpressureBuffer()
    .subscribe({...})

// RxJava2
Flowable.fromIterable(...)
    .onBackpressureBuffer()
    .subscribe({...})

// RxJava2 Observable.toFlowable
Observable.fromIterable(...)
    .toFlowable(BackpressureStrategy.BUFFER)
    .subscribe({...})

 

Flowable에서도 배압관련 문제는 여전히 다루기 까다롭기 때문에 Flowable을 사용하기 전에 Observable에서 debounce, throttle등 operator를 통해 데이터의 흐름을 최대한 제어해 보고 그럼에도 불가피한 상황이거나, 꼭 필요한 경우에만 Flowable을 사용하는 것을 추천드립니다. 자세한 내용은 Observable과 Flowable을 선택하는 기준 을 통해 확인할 수 있습니다.

 

 Single

Observable은 0...N개의 데이터를 발생시킵니다. 하지만 대부분의 복잡하지 않은 비동기 작업들은 보통 1개의 데이터만 발생시키는 경우가 많습니다. 이를 좀더 편리하게 다루기 위해서 RxJava2에서는 Single과 Completable이 등장하게 되었습니다.

 

Single은 Observable의 한 종류로써 무한대의 값을 배출시킬 수 있는 Observable과는 달리 작업을 수행한 뒤에 하나의 데이터만 발생시킬 수 있습니다. 따라서 작업이 성공했을 때 결과값을 배출시키는 onSuccess, 작업이 실패, 에러가 발생했을 때 에러를 배출시키는 onError 두 가지 메소드를 사용할 수 있습니다.

 

Single을 많이 사용하는 경우는 비동기 작업을 요청한 뒤 결과값을 가져오는 것으로 대표적으로 HTTP 요청 후 결과값을 가져올 때 사용할 수 있습니다.

Single.create(sub -> {
	try {
    	// http request and get result
    	sub.onSuccess(result);
    }catch(HTTPRequestException e){
    	sub.onError(e);
    }
});

 Completable

Completable은 별도로 발생시키는 데이터 없이 작업의 성공, 실패 여부만 전파합니다. 따라서 작업이 성공했을 때 성공했을 때 onComplete, 작업이 실패했을 때 onError 두 가지만을 가집니다.

Completable.create(sub -> {
	try {
    	//sql insert
    	sub.onComplete();
    } catch(SQLException e){
    	sub.onError(e);
    }
});

 

 Maybe

Maybe는 이름 그대로 값이 배출될수도 있고 배출되지 않을수도 있는 경우에 사용됩니다. Single과 Completable 두 가지가 합쳐졌다고 생각하시면 됩니다. 따라서 성공하여 값이 발생했을 때 onSuccess, 성공하였지만 값이 없을 때 onComplete, 실패했을 때 onError 세 가지를 사용할 수 있습니다.

Maybe.just("Hello World")
    .delay(10, TimeUnit.SECONDS, Schedulers.io())
    .subscribeWith(new DisposableMaybeObserver<String>() {
        @Override
        public void onSuccess(String value) {
            System.out.println("Success: " + value);
        }

        @Override
        public void onError(Throwable error) {
            error.printStackTrace();
        }

        @Override
        public void onComplete() {
            System.out.println("Done!");
        }
    });

마치며


rx를 처음 접했을때는 처음 보는 개념과 처음보는 형식의 코드로 인해 많이 방황했었습니다. 최근 들어서 rx를 자주 다루고 효과적으로 사용하기 위해 고민을 많이 하게 되어서 기초부터 공부하며 글을 작성하였습니다. 문제가 있거나 틀린 부분도 있을테니 지적해주시면 감사하겠습니다.

 

 참고자료


 

ReactiveX/RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM. - ReactiveX/RxJava

github.com

 

RxJS: Observables, Observers and Operators Introduction | Ultimate Courses™

RxJS is an incredible tool for reactive programming, and today we're going to dive a little deeper into what Observables and observers are - as well as

ultimatecourses.com

 

The introduction to Reactive Programming you've been missing

The introduction to Reactive Programming you've been missing - introrx.md

gist.github.com

댓글
댓글쓰기 폼
Total
354,596
Today
15
Yesterday
658
링크
«   2019/11   »
          1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
글 보관함