※ merge

: merge는 Observable들이 발행한 아이템을 결합한다. 아래 그림을 보자. "20, 40, 60, 80, 100"을 발행하는 Observable과 "1, 1"을 발행하는 Observable이 있다. 이들을 merge 하면 맨 아래의 그림과 같이 된다. 다음 코드를 보면 이해 갈 것이다. 코드를 보면 "1, 2, 3"을 발행하는 Observable과 "str1, str2, str3"를 발행하는 Observable을 merge 했다. 결과를 보면 "1 2 3 str1 str2 str3"가 출력됐다. merge를 할 시 만약 merge를 하는 Observable에서 데이터를 발행하는 동안 오류가 발생하면 merge를 바로 중단한다. 하지만 mergeDelayError()를 사용하면 Observable에서 오류를 발생한더라도 merge 작업을 멈추지 않는다. merge가 완료된 후 onError을 발생시킨다. 


public class main {
	public static void main(String[] args){
		
		String str1 = "Apple";
		String str2 = "Banana";
		String str3 = "Boy";
		
		Observable<String> tim = Observable.just("1", "2","3");
		Observable<String> ss = Observable.just(str1,str2,str3);
		
		
		Observable.merge(tim, ss)
		.subscribe(new DefaultObserver<String>() {

			@Override
			public void onNext(String t) {
				System.out.println(t);
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("Error!!!");}

			@Override
			public void onComplete() {
				System.out.println("Complete!!!");
			}
		});
	}
}

- 결과



※ zip

: zip은 여러 Observable에서 발행한 데이터를 결합하고 map과 유사하게 특정 함수에 따라 변환시킨다. 아래 그림을 보면 merge랑 다른 점을 알 수 있다. merge와 달리 Observable이 발행한 데이터를 결합한다. 다음 예제를 보자. 참고로 해당 예제는 안드로이드에서 작성했다. TextView의 text를 2초 간격으로 바꾸는 예제이다. 


아래 코드는 textView1에 2초 간격으로 str1 -> str2 -> str3을 출력한다. 코드에서 보이는 Observable.interval(2, TimeUnit.SECONDS); 는 2초마다 Observer에게 데이터를 보내는 Observable이다. 첫 번째 인자는 시간이고 두 번째 인자는 시간 단위이다. 아래에서는 문자열 데이터를 발행하는 Observable과 2초마다 데이터를 발행하는 Observable을 zip 했다. 따라서 이 둘은 결합해서 각 문자열은 2초마다 Observer에게 발행된다. 여기서 observeOn 은 지금 몰라도 된다. 다음 포스트에서 설명할 예정이다. 

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        textView1 = (TextView)findViewById(R.id.textView1);

        String str1 = "first";
        String str2 = "second";
        String str3 = "third";

        Observable<String> s = Observable.just(str1,str2,str3);
        Observable<Long> t = Observable.interval(2, TimeUnit.SECONDS);

        Observable.zip(t, s, new BiFunction<Long, String, String>() {
            @Override
            public String apply(@NonNull Long aLong, @NonNull String s) throws Exception {
                return aLong.toString()+s;
            }
        }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new DefaultObserver<String>() {
            @Override
            public void onNext(@NonNull String s) {
                textView1.setText(s);
            }

            @Override
            public void onError(@NonNull Throwable e) {}

            @Override
            public void onComplete() {}
        });
    }



※ combineLatest

: combineLatest는 zip과 비슷하게 동작하지만 결합 대상이 다르다. zip은 가장 최근에 zip 되지 않는 데이터들을 zip했다. combineLastest는 가장 최근에 발행한 아이템을 zip한다. 아래 그림을 보면 알 수 있다. 




+ Recent posts