: 이번 포스트는 이전 포스트 "2. RxJava - Observer와 SingleObserver" 에 이어서 DisposableObserver와 SingleObserver, DisposableSingleObserver, SingleObserver, CompletableObserver, Maybe에 대해서 설명한다. 따라서 이전 포스트에서 보았던 예제를 이어서 사용한다. 그러니 해당 포스트를 읽기 전에 이전 포스트 "2. RxJava - Observer와 SingleObserver"를 읽기 바란다. 참고로 Rxjava2 관련 책 같은 게 없어 인터넷 검색으로 터득했다. 한글로 된 문서도 거의 없다. 그래서 여기서 설명한 내용이 틀릴 수 있다.
※ DisposableObserver
: DisposableObserver는 DefaultObserver와 거의 비슷하다. DefaultObserver 객체를 DisposableObserver을 사용해도 프로그램은 정상적으로 동작한다. 이전 포스트에서 DefaultObserver를 사용한 예를 DisposableObserver로 바꿔보면 똑같은 결과가 나온다. DisposableObserver가 DefaultObserver와 다른 점은 DisposableObserver.dispose() 메서드가 있다는 것이다. 다음은 DisposableObserver.dispose()를 사용한 예이다. 아래 코드를 보면 Observer의 onNext 부분에 this.dispose()를 사용한 게 보인다. this.dispose는 이전 포스트의 DefaultObserver에서 보았던 cancel과 거의 동일한 역할을 한다. this.dispose()가 호출되면 Observable과 Observer의 연결이 끊킨다. 따라서 결과 이미지와 같이 나온다. 그렇다면 cancel과 dispose의 차이점은 무엇일까? Observer에서 해당 함수 호출을 Observable에서 명시적으로 알 수 있느냐이다. Observable의 subscribe 메서드를 보면 e.isDisposed() 메서드를 호출한다. 이 메서드는 Observer가 dispose 메서드 호출 유무를 리턴해준다. 하지만 e.isCancelled() 라는 메서드는 없다. 따라서 dispose를 사용하면 Observable에서 Observer의 Dispose 유무를 명시적으로 확인 가능하므로 Observable은 불필요한 추가 작업을 할 필요가 없어진다. Observer에서 this.cancel을 사용했을 시 Observable에서 명시적으로 알 방법이 없어 Observable은 자신의 작업을 다 수행한다. 따라서 List<Sample> list에 모든 데이터가 차게 된다. 하지만 dispose를 사용하면 Observable에서 isDisposable 메서드를 통해 Observer와의 관계를 보고 자신의 작업을 끊낼 수 있다. 따라서 아래의 예제에서는 List<Sample> list 변수에 "G:Goal" 데이터까지만 들어간다.
public class main { private static Observable<Sample> getSamples() { return Observable.create(new ObservableOnSubscribe<Sample>() { @Override public void subscribe(ObservableEmitter<Sample> e) throws Exception { List<Sample> list = new ArrayList<Sample>(); try { BufferedReader br = new BufferedReader(new FileReader("./samplelist.txt")); Sample sample; while (true) { String line = br.readLine(); if (line == null) break; sample = new Sample(line.charAt(0), line.substring(2)); list.add(sample); e.onNext(sample); if(e.isDisposed()){ System.out.println("Observer Disposed!!!"); return; } } br.close(); } catch (Exception ex) { ex.printStackTrace(); }finally{ e.onComplete(); } } }); } private static void printSamples() { getSamples().subscribe(new DisposableObserver<Sample>() { @Override public void onNext(Sample t) { System.out.println(t.getPrefix()+":"+t.getName()); if(t.getPrefix() == 'G') this.dispose(); } @Override public void onError(Throwable e) { System.out.println("Error"); } @Override public void onComplete() { System.out.println("Finish!!!"); } }); } public static void main(String[] args) { printSamples(); } }
- 결과
※ SingleObserver
: SingleObserver는 Observable에게 하나의 데이터를 전달받을 때 사용한다. 다수의 데이터를 전달받으면 에러가 난다. 하나의 데이터를 받을 때 DefaultObserver나 DisposableObserver, Observer을 사용하면 에러가 난다. 다음은 SingleObserver을 이용한 예이다. toSortedList()는 옵저버블에서 받은 다수의 데이터들을 정렬해 List 로 반환한다. Sample 클래스는 Comparable을 구현해 알파벳 순으로 정렬된다. toSortedList가 반환하는 것은 무조건 하나이므로 SingleObserver를 사용했다. SingleObserver는 추가로 cancel이나 dispose 메서드가 없는 데 dispose를 사용하고 싶다면 SingleObserver가 아닌 DisposableSingleObserver을 사용하면 된다.
private static void printSamples() {
getSamples().toSortedList().subscribe(new SingleObserver<List<Sample>>(){
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed!!!");
}
@Override
public void onSuccess(List<Sample> t) {
for(Sample s : t)
System.out.println(s.getName());
}
@Override
public void onError(Throwable e) {
System.out.println("Error!!!");
}
});
}
- 결과
※ CompletableObserver
public class main {
private static Completable getSamples() {
return Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter e) throws Exception {
List<Sample> list = new ArrayList<Sample>();
try {
BufferedReader br = new BufferedReader(new FileReader("./samplelist.txt"));
Sample sample;
while (true) {
String line = br.readLine();
if (line == null)
break;
sample = new Sample(line.charAt(0), line.substring(2));
list.add(sample);
}
br.close();
} catch (Exception ex) {
ex.printStackTrace();
}finally{
e.onComplete();
}
}
});
}
private static void printSamples() {
getSamples().subscribe(new CompletableObserver(){
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed()!!!");
}
@Override
public void onComplete() {
System.out.println("Completed!!!");
}
@Override
public void onError(Throwable e) {
System.out.println("Error!!!");
}
});
}
public static void main(String[] args) {
printSamples();
}
}
- 결과
※ Maybe
'안드로이드 > Rxjava, RxAndroid' 카테고리의 다른 글
6. RxJava - merge, zip, conbineLatest (0) | 2017.08.18 |
---|---|
5. RxJava - map, flatMap, concatMap, switchMap (0) | 2017.08.17 |
4. RxJava - repeat, range, filter, distinct, take, first, skip, elementAt, sample (0) | 2017.08.16 |
2. RxJava - Observer와 DefaultObserver 란 (0) | 2017.08.14 |
1. Rxjava - 시작 (0) | 2017.08.13 |