※ repeat()

: repeat는 말 그대로 데이터를 반복해서 전달한다는 뜻이다. 다음 예를 보면 정확히 이해될 것이다. 아래 예에서 문자열들에 대해 repeat(3)를 호출했다. 따라서 just에 전달된 first, second, third 문자열 객체가 Observer에게 전달되는 데 3번 반복해서 전달한다. 따라서 결과를 보면 first, second, third가 3번씩 출력되는 걸 알 수 있다.

public class main {
	public static void main(String[] args){
		String first = "first";
		String second = "second";
		String third = "third";
		
		Observable.just(first,second,third)
			.repeat(3)
			.subscribe(new DefaultObserver<String>() {

				@Override
				public void onNext(String t) {
					System.out.println(t);
				}

				@Override
				public void onError(Throwable e) {
					System.out.println("Error!!!!");
				}

				@Override
				public void onComplete() {
					System.out.println("Complete!!!");
				}
			});
	}
}

- 결과


※ range()

: range는 특정 값 X로부터 N개의 정수 데이터를 전달한다. 다음 예를 보자. 다음 예는 정수 10, 11, 12를 Observer에게 전달한다.

public class main {

public static void main(String[] args){

Observable.range(10,3)

.subscribe(new DefaultObserver<Integer>() {


@Override

public void onNext(Integer t) {

System.out.println("Number : "+t);

}


@Override

public void onError(Throwable e) {

System.out.println("Error!!!!");

}


@Override

public void onComplete() {

System.out.println("Complete!!!");

}

});

}

}

- 결과



※ filter()

: filter는 Observable에서 Observer에 전달한 데이터 중에서 특정 조건에 만족하는 데이터만 Observer에 전달해준다. 다음의 예를 보자. 일반적으로 filter는 null 값을 확인할 때 많이 사용한다. 

public class main {
	public static void main(String[] args){
		
		String str1 = "Apple";
		String str2 = "Banana";
		String str3 = "Boy";
		String str4 = "Girl";
		
		Observable.just(str1, str2, str3, str4)
			.filter(new Predicate<String>() {
				@Override
				public boolean test(String t) throws Exception {
					return !t.startsWith("B");
				}			    
			})
			.subscribe(new DefaultObserver<String>() {

				@Override
				public void onNext(String t) {
					System.out.println(t);
				}

				@Override
				public void onError(Throwable e) {
					System.out.println("Error!!!");}

				@Override
				public void onComplete() {
					System.out.println("Complete!!!");
				}
			});
	}
}

- 결과

위 예제 코드의 filter에 람다를 적용하면 간결해진다. 

.filter(str-> !str.startsWith("B"))


※ take(), takeLast()

: take(i)는 Observable에서 Observer에 전달하는 데이터 중 첫 번째 데이터부터 i번째 데이터까지만 Observer에 전달한다는 의미이다. takeLast(i)는 반대로 마지막 데이터부터 뒤로 i번째 데이터까지만 Observer에 전달한다는 의미이다.



※ distinct()

: 데이터의 중복을 제거해준다. distinctUntilChanged()는 똑같은 값을 옵저버블이 발행하다가 새로운 값을 발행할 때 Observer에게 데이터를 전달한다.

public class main {
	public static void main(String[] args){
		
		String str1 = "Apple";
		String str2 = "Banana";
		String str4 = "Girl";
		
		Observable.just(str1, str2, str2, str4)
			.distinct()
			.subscribe(new DefaultObserver<String>() {

				@Override
				public void onNext(String t) {
					System.out.println(t);
				}

				@Override
				public void onError(Throwable e) {
					System.out.println("Error!!!");}

				@Override
				public void onComplete() {
					System.out.println("Complete!!!");
				}
			});
	}
}

- 결과



※ first(), last()

: 이름 그대로 Observable이 첫 번째랑 마지막 데이터만 Observer에게 전달한다.


※ skip(), skipLast()

: take()와 반대라고 생각하면 된다. skip(i)는 Observable의 모든 데이터에서 첫 번째부터 i번째 데이터를 생략하고 Observer에게 보낸다. skipLast()는 마지막부터 뒤에서 i번째 데이터를 생략하고 Observer에게 보낸다.


※ elementAt()

: Observable이 발행한 데이터 중에서 i번째 데이터만 Observer에게 보낸다.


※ sample(), timeout(), debounce()

: sample(30, TimeUnit.SECONDS)를 Observable에게 적용하면 30초 마다 Observable이 가장 최근에 발행한 데이터를 Observer에게 보낸다. 시간 간격에서 가장 마지막(최근)이 아닌 첫 번째 데이터를 받고 싶다면 throttleFirst()를 사용하면 된다. timeout()은 sample()과 거의 비슷하지만 시간 간격 내에 Observable이 데이터를 발행하지 않을 경우 Observer의 onError을 호출한다. debounce()는 timeout()과 달리 시간 간격 내에 Observable이 데이터를 발행하지 않을 경우 가장 최근에 발행한 데이터를 Observer에게 전달한다.



: 이번 포스트는 이전 포스트 "2. RxJava - Observer와 SingleObserver" 에 이어서 DisposableObserver와 SingleObserver, DisposableSingleObserver, SingleObserver, CompletableObserver, Maybe에 대해서 설명한다. 따라서 이전 포스트에서 보았던 예제를 이어서 사용한다. 그러니 해당 포스트를 읽기 전에 이전 포스트 "2. RxJava - Observer와 SingleObserver"를 읽기 바란다. 참고로 Rxjava2 관련 책 같은 게 없어 인터넷 검색으로 터득했다. 한글로 된 문서도 거의 없다. 그래서 여기서 설명한 내용이 틀릴 수 있다.


※ DisposableObserver

: DisposableObserver는 DefaultObserver와 거의 비슷하다. DefaultObserver 객체를 DisposableObserver을 사용해도 프로그램은 정상적으로 동작한다. 이전 포스트에서 DefaultObserver를 사용한 예를 DisposableObserver로 바꿔보면 똑같은 결과가 나온다. DisposableObserver가 DefaultObserver와 다른 점은 DisposableObserver.dispose() 메서드가 있다는 것이다. 다음은 DisposableObserver.dispose()를 사용한 예이다. 아래 코드를 보면 Observer의 onNext 부분에 this.dispose()를 사용한 게 보인다. this.dispose는 이전 포스트의 DefaultObserver에서 보았던 cancel과 거의 동일한 역할을 한다. this.dispose()가 호출되면 Observable과 Observer의 연결이 끊킨다. 따라서 결과 이미지와 같이 나온다. 그렇다면 cancel과 dispose의 차이점은 무엇일까? Observer에서 해당 함수 호출을 Observable에서 명시적으로 알 수 있느냐이다. Observable의 subscribe 메서드를 보면 e.isDisposed() 메서드를 호출한다. 이 메서드는 Observer가 dispose 메서드 호출 유무를 리턴해준다. 하지만 e.isCancelled() 라는 메서드는 없다. 따라서 dispose를 사용하면 Observable에서 Observer의 Dispose 유무를 명시적으로 확인 가능하므로 Observable은 불필요한 추가 작업을 할 필요가 없어진다. Observer에서 this.cancel을 사용했을 시 Observable에서 명시적으로 알 방법이 없어 Observable은 자신의 작업을 다 수행한다. 따라서 List<Sample> list에 모든 데이터가 차게 된다. 하지만 dispose를 사용하면 Observable에서 isDisposable 메서드를 통해 Observer와의 관계를 보고 자신의 작업을 끊낼 수 있다. 따라서 아래의 예제에서는 List<Sample> list 변수에 "G:Goal" 데이터까지만 들어간다. 

public class main {
	private static Observable<Sample> getSamples() {
		return Observable.create(new ObservableOnSubscribe<Sample>() {

			@Override
			public void subscribe(ObservableEmitter<Sample> e) throws Exception {
				List<Sample> list = new ArrayList<Sample>();
				try {
					BufferedReader br = new BufferedReader(new FileReader("./samplelist.txt"));
					Sample sample;
					while (true) {
						String line = br.readLine();
						if (line == null)
							break;
						sample = new Sample(line.charAt(0), line.substring(2));
						list.add(sample);
						
						e.onNext(sample);
						if(e.isDisposed()){
							System.out.println("Observer Disposed!!!");
							return;
						}
					}
					br.close();
				} catch (Exception ex) {
					ex.printStackTrace();
				}finally{
					e.onComplete();
				}
			}
		});
	}

	private static void printSamples() {
		getSamples().subscribe(new DisposableObserver<Sample>() {

			@Override
			public void onNext(Sample t) {
				System.out.println(t.getPrefix()+":"+t.getName());
				if(t.getPrefix() == 'G')
					this.dispose();
			}

			@Override
			public void onError(Throwable e) {
				System.out.println("Error");
			}

			@Override
			public void onComplete() {
				System.out.println("Finish!!!");
			}
		});
	}

	public static void main(String[] args) {
		printSamples();
	}
}

- 결과



※ SingleObserver

: SingleObserver는 Observable에게 하나의 데이터를 전달받을 때 사용한다. 다수의 데이터를 전달받으면 에러가 난다. 하나의 데이터를 받을 때 DefaultObserver나 DisposableObserver, Observer을 사용하면 에러가 난다.  다음은 SingleObserver을 이용한 예이다. toSortedList()는 옵저버블에서 받은 다수의 데이터들을 정렬해 List 로 반환한다. Sample 클래스는 Comparable을 구현해 알파벳 순으로 정렬된다. toSortedList가 반환하는 것은 무조건 하나이므로 SingleObserver를 사용했다. SingleObserver는 추가로 cancel이나 dispose 메서드가 없는 데 dispose를 사용하고 싶다면 SingleObserver가 아닌 DisposableSingleObserver을 사용하면 된다. 

private static void printSamples() {

getSamples().toSortedList().subscribe(new SingleObserver<List<Sample>>(){

@Override

public void onSubscribe(Disposable d) {

System.out.println("Subscribed!!!");

}


@Override

public void onSuccess(List<Sample> t) {

for(Sample s : t) 

System.out.println(s.getName());

}


@Override

public void onError(Throwable e) {

System.out.println("Error!!!");

}

});

}

- 결과




※ CompletableObserver

: SingleObserver는 단 하나의 데이터를 다룰 때 사용한다고 배웠다. CompletableObserver는 Observable에서 데이터를 받지 않고 작업의 성공 유무만 알고 싶을 때 사용한다. 다음은 이에 대한 예이다.

public class main {

private static Completable getSamples() {

return Completable.create(new CompletableOnSubscribe() {

@Override

public void subscribe(CompletableEmitter e) throws Exception {

List<Sample> list = new ArrayList<Sample>();

try {

BufferedReader br = new BufferedReader(new FileReader("./samplelist.txt"));

Sample sample;

while (true) {

String line = br.readLine();

if (line == null)

break;

sample = new Sample(line.charAt(0), line.substring(2));

list.add(sample);

}

br.close();

} catch (Exception ex) {

ex.printStackTrace();

}finally{

e.onComplete();

}

}

});

}


private static void printSamples() {

getSamples().subscribe(new CompletableObserver(){


@Override

public void onSubscribe(Disposable d) {

System.out.println("Subscribed()!!!");

}


@Override

public void onComplete() {

System.out.println("Completed!!!");

}


@Override

public void onError(Throwable e) {

System.out.println("Error!!!");

}

});

}


public static void main(String[] args) {

printSamples();

}

}

- 결과


※ Maybe

: Maybe는 Observable에서 데이터를 하나도 받지 않거나 단 하나를 받을 때 사용한다. 위에서 배운 Single과 Completable을 합친거라고 할 수 있다.


기본 Observer는 다수의 데이터를 받는다. Single은 하나, Completable은 None이다. Maybe는 하나 아니면 None 이다. 이것만 기억하면 될 것 같다. 또한 실제로 다수의 데이터를 받을 때는 별로 없다. 될 수 있으면 Single이나 Completable을 사용하는 게 좋다.


+ Recent posts