※ merge

: merge는 Observable들이 발행한 아이템을 결합한다. 아래 그림을 보자. "20, 40, 60, 80, 100"을 발행하는 Observable과 "1, 1"을 발행하는 Observable이 있다. 이들을 merge 하면 맨 아래의 그림과 같이 된다. 다음 코드를 보면 이해 갈 것이다. 코드를 보면 "1, 2, 3"을 발행하는 Observable과 "str1, str2, str3"를 발행하는 Observable을 merge 했다. 결과를 보면 "1 2 3 str1 str2 str3"가 출력됐다. merge를 할 시 만약 merge를 하는 Observable에서 데이터를 발행하는 동안 오류가 발생하면 merge를 바로 중단한다. 하지만 mergeDelayError()를 사용하면 Observable에서 오류를 발생한더라도 merge 작업을 멈추지 않는다. merge가 완료된 후 onError을 발생시킨다. 


public class main {
	public static void main(String[] args){
		
		String str1 = "Apple";
		String str2 = "Banana";
		String str3 = "Boy";
		
		Observable<String> tim = Observable.just("1", "2","3");
		Observable<String> ss = Observable.just(str1,str2,str3);
		
		
		Observable.merge(tim, ss)
		.subscribe(new DefaultObserver<String>() {

			@Override
			public void onNext(String t) {
				System.out.println(t);
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("Error!!!");}

			@Override
			public void onComplete() {
				System.out.println("Complete!!!");
			}
		});
	}
}

- 결과



※ zip

: zip은 여러 Observable에서 발행한 데이터를 결합하고 map과 유사하게 특정 함수에 따라 변환시킨다. 아래 그림을 보면 merge랑 다른 점을 알 수 있다. merge와 달리 Observable이 발행한 데이터를 결합한다. 다음 예제를 보자. 참고로 해당 예제는 안드로이드에서 작성했다. TextView의 text를 2초 간격으로 바꾸는 예제이다. 


아래 코드는 textView1에 2초 간격으로 str1 -> str2 -> str3을 출력한다. 코드에서 보이는 Observable.interval(2, TimeUnit.SECONDS); 는 2초마다 Observer에게 데이터를 보내는 Observable이다. 첫 번째 인자는 시간이고 두 번째 인자는 시간 단위이다. 아래에서는 문자열 데이터를 발행하는 Observable과 2초마다 데이터를 발행하는 Observable을 zip 했다. 따라서 이 둘은 결합해서 각 문자열은 2초마다 Observer에게 발행된다. 여기서 observeOn 은 지금 몰라도 된다. 다음 포스트에서 설명할 예정이다. 

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        textView1 = (TextView)findViewById(R.id.textView1);

        String str1 = "first";
        String str2 = "second";
        String str3 = "third";

        Observable<String> s = Observable.just(str1,str2,str3);
        Observable<Long> t = Observable.interval(2, TimeUnit.SECONDS);

        Observable.zip(t, s, new BiFunction<Long, String, String>() {
            @Override
            public String apply(@NonNull Long aLong, @NonNull String s) throws Exception {
                return aLong.toString()+s;
            }
        }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new DefaultObserver<String>() {
            @Override
            public void onNext(@NonNull String s) {
                textView1.setText(s);
            }

            @Override
            public void onError(@NonNull Throwable e) {}

            @Override
            public void onComplete() {}
        });
    }



※ combineLatest

: combineLatest는 zip과 비슷하게 동작하지만 결합 대상이 다르다. zip은 가장 최근에 zip 되지 않는 데이터들을 zip했다. combineLastest는 가장 최근에 발행한 아이템을 zip한다. 아래 그림을 보면 알 수 있다. 




※ map

: Observable이 발행한 모든 데이터를 Function 객체를 적용해 일괄적으로 바꿔서 Observer에게 전달한다. 다음 예를 보자.

public class main {
	public static void main(String[] args){
		
		String str1 = "Apple";
		String str2 = "Banana";
		String str3 = "Boy";
		String str4 = "Girl";
		
		Observable.just(str1, str2, str3, str4)
			.map(new Function<String, String>(){
				@Override
				public String apply(String t) throws Exception {
					return "String : " + t;
				}
				
			})
			.subscribe(new DefaultObserver<String>() {

				@Override
				public void onNext(String t) {
					System.out.println(t);
				}

				@Override
				public void onError(Throwable e) {
					System.out.println("Error!!!");}

				@Override
				public void onComplete() {
					System.out.println("Complete!!!");
				}
			});
	}
}

- 결과

위 코드에서 map()에 대해 람다를 적용하면 다음과 같은 코드가 된다.

.map(str-> "String: " + str)


※ flatMap, concatMap, switchMap

: map은 Observable가 Observer에게 전달할 모든 데이터를 변경한다. 즉 map은 데이터들을 반환하는 것이다. flatMap은 데이터가 아닌 Observable이 발행한 데이터에 flatMap에서 기술한 Observable을 적용한다. 다음 예를 보자. flatMap 안에서 Observable을 기술한 걸 볼 수 있다. 처음 Observable이 발행한 str1, str2, str3 각각에 대해서 flatMap에서 기술한 Observable이 적용된다. 결과 사진을 보면 확실히 알 수 있다.
public class main {
	public static void main(String[] args){
		
		String str1 = "Apple";
		String str2 = "Banana";
		String str3 = "Boy";
		
		Observable.just(str1, str2, str3)
			.flatMap(str->Observable.just("1. "+str, "2. "+str, "3. "+str," End"))
			.subscribe(new DefaultObserver<String>() {

				@Override
				public void onNext(String t) {
					System.out.println(t);
				}

				@Override
				public void onError(Throwable e) {
					System.out.println("Error!!!");}

				@Override
				public void onComplete() {
					System.out.println("Complete!!!");
				}
			});
	}
}

- 결과

flatMap을 사용할 경우 데이터의 순서가 보장이 안 된다. 위 예에서는 데이터 순서가 보장됬지만 복잡한 로직을 가지고 있다면 "Banana"가 "Apple"보다 먼저 나올 수도 있다. 이렇게 순서가 중요하다면 concatMap()을 사용하면 된다. switchMap()은 flatMap과 비슷하다. 하지만 위 예에서 switchMap()을 사용하고 1.Banana가 발행되고 1.Boy, 2.Boy, 3.Boy, End가 발행됬다면 1.Banana는 Observer에게 전달되지 않는다.   


※ scan

: scan은 Observable이 Observer에 전달한 데이터를 다음에 Observer에 전달할 데이터에 이용한다. 말로는 어려운데 다음 예를 보면 바로 알 수 있다. 결과를 보면 제일 먼저 Observer에 전달된 str1이 scan 내부 sum+str에서 sum에 대입된다. 따라서 Observer에 str1+str2가 전달된다. 이와 마찬가지로 다음에는 str1+str2가 sum에 대입되 str1+str2+str3가 Observer에 전달된다.

public class main {
	public static void main(String[] args){
		
		String str1 = "Apple";
		String str2 = "Banana";
		String str3 = "Boy";
		
		Observable.just(str1, str2, str3)
			.scan((sum, str) -> sum+str)
			.subscribe(new DefaultObserver<String>() {

				@Override
				public void onNext(String t) {
					System.out.println(t);
				}

				@Override
				public void onError(Throwable e) {
					System.out.println("Error!!!");}

				@Override
				public void onComplete() {
					System.out.println("Complete!!!");
				}
			});
	}
}
- 결과




+ Recent posts