: 이번 포스트에서는 예를 통해 Observer와 DefaultObserver에 대해 알아보겠다. 참고로 Rxjava2 관련 책 같은 게 없어 인터넷 검색으로 터득했다. 한글로 된 문서도 거의 없다. 그래서 여기서 설명한 내용이 틀릴 수 있다.


: 여기서 구현할 예제는 다음과 같은 텍스트 파일을 읽어서 Sample이라는 객체에 데이터를 저장하고 출력해주는 프로그램이다. 

위 데이터를 담을 Sample 클래스는 다음과 같다. mPrefix는 위 텍스트 문장에서 콜론 앞에 부분이고 mName은 콜론 뒷 부분이다. 추가적으로 알파벳 순 정렬을 위해서 Comparable을 구현했다.

public class Sample implements Comparable<Object> { private char mPrefix; private String mName; public Sample(char prefix, String name) { this.mPrefix = prefix; this.mName = name; } public char getPrefix() { return mPrefix; } public String getName() { return mName; } @Override public int compareTo(Object arg0) { Sample s = (Sample)arg0; return mName.compareTo(s.getName()); } }



※ Observer

: 다양한 Observer들은 Observer 인터페이스를 상속한다. 가장 기본적인 개체이다. 다음 코드를 분석해보자. "1. Rxjava - 시작" 포스트를 봤다면 이해가 바로 갈 것이다. Observable에서 텍스트 파일을 읽어 Sample 객체를 ArrayList에 담는다. ArrayList에 Sample 객체를 담을 때 마다 Observer의 onNext가 호출되고 Sample 데이터가 Observer에 전달된다. Observer의 onNext는 Observable에게 전달 받은 Sample 객체를 출력한다. 텍스트 파일을 다 읽었으면 Observer의 onComplete가 호출된다. 추가적으로 Observer 클래스에서 구현해야 하는 메서드 onSubscribe가 보인다. onSubscribe는 Observable과 Observer가 subscribe로 구독될 때 호출된다. Disposable 객체를 인자로 받는 데 d.dispose()를 호출하면 Observable과 Observer의 관계가 끊킨다. 즉 Observer에서는 onSubscribe만 호출된다. 아래 예의 결과는 "Start!!!"만 출력된다. 단, Observable에서 수행하는 작업은 끊키지 않는다. 아래 예를 통해 보면 d.dispose()를 호출한다고 해도 Sample 객체를 저장하는 list 변수에는 텍스트 파일에 있는 모든 Sample 객체가 저장된다.

public class main {
	private static Observable<Sample> getSamples() {
		return Observable.create(new ObservableOnSubscribe<Sample>() {

			@Override
			public void subscribe(ObservableEmitter<Sample> e) throws Exception {
				List<Sample> list = new ArrayList<Sample>();
				try {
					BufferedReader br = new BufferedReader(new FileReader("./samplelist.txt"));
					Sample sample;
					while (true) {
						String line = br.readLine();
						if (line == null)
							break;
						sample = new Sample(line.charAt(0), line.substring(2));
						list.add(sample);
						
						e.onNext(sample);
					}
					br.close();
				} catch (Exception ex) {
					ex.printStackTrace();
				}finally{
					e.onComplete();
				}
			}
		});
	}

	private static void printSamples() {

		getSamples().subscribe(new Observer<Sample>() {
			
			@Override
			public void onSubscribe(Disposable d) {
				// TODO Auto-generated method stub
				System.out.println("Start!!!");
			}

			@Override
			public void onNext(Sample t) {
				// TODO Auto-generated method stub
				System.out.println(t.getPrefix()+":"+t.getName());
			}

			@Override
			public void onError(Throwable e) {
				// TODO Auto-generated method stub
				System.out.println("Error");
			}

			@Override
			public void onComplete() {
				// TODO Auto-generated method stub
				System.out.println("Finish!!!");
			}
		});
	}

	public static void main(String[] args) {
		printSamples();
	}
}

- 결과



※ DefaultObserver

: 위 예제 코드와 printSamples()만 다르다. DefaultObserver는 아래의 구현해야 하는 메서드 말고 onStart 메서드를 추가로 구현 가능하다. public void onStart(); 메서드는 Observer의 onSubscribe와 역할이 거의 같다. 하지만 매개변수가 없다. 그래서 위 예제 결과와 같게 하려면 onStart 메서드를 오버라이딩한 다음에 sysout("Start!!!")를 메서드 내에 기술하면 된다.

	private static void printSamples() {
		getSamples().subscribe(new DefaultObserver<Sample>() {

			@Override
			public void onNext(Sample t) {
				// TODO Auto-generated method stub
				System.out.println(t.getPrefix()+":"+t.getName());				
			}

			@Override
			public void onError(Throwable e) {
				// TODO Auto-generated method stub
				System.out.println("Error");
			}

			@Override
			public void onComplete() {
				// TODO Auto-generated method stub
				System.out.println("Finish!!!");
			}	
		});
	}

DefaultObserver는 Observer에 없는 메서드 cancel()이 있다. cancel() 메서드를 호출하면 Observable과의 연결이 끊긴다. 즉 Observable에서 작업한 결과물이 Observer로 전송이 안 된다. 따라서 onComplete 메서드도 호출 안 된다. 단, Observable은 작업을 완료한다.

@Override public void onNext(Sample t) { // TODO Auto-generated method stub System.out.println(t.getPrefix()+":"+t.getName()); if(t.getPrefix() == 'G') this.cancel(); }

- 결과


+ Recent posts