: 이번 포스트에서는 예를 통해 Observer와 DefaultObserver에 대해 알아보겠다. 참고로 Rxjava2 관련 책 같은 게 없어 인터넷 검색으로 터득했다. 한글로 된 문서도 거의 없다. 그래서 여기서 설명한 내용이 틀릴 수 있다.


: 여기서 구현할 예제는 다음과 같은 텍스트 파일을 읽어서 Sample이라는 객체에 데이터를 저장하고 출력해주는 프로그램이다. 

위 데이터를 담을 Sample 클래스는 다음과 같다. mPrefix는 위 텍스트 문장에서 콜론 앞에 부분이고 mName은 콜론 뒷 부분이다. 추가적으로 알파벳 순 정렬을 위해서 Comparable을 구현했다.

public class Sample implements Comparable<Object> { private char mPrefix; private String mName; public Sample(char prefix, String name) { this.mPrefix = prefix; this.mName = name; } public char getPrefix() { return mPrefix; } public String getName() { return mName; } @Override public int compareTo(Object arg0) { Sample s = (Sample)arg0; return mName.compareTo(s.getName()); } }



※ Observer

: 다양한 Observer들은 Observer 인터페이스를 상속한다. 가장 기본적인 개체이다. 다음 코드를 분석해보자. "1. Rxjava - 시작" 포스트를 봤다면 이해가 바로 갈 것이다. Observable에서 텍스트 파일을 읽어 Sample 객체를 ArrayList에 담는다. ArrayList에 Sample 객체를 담을 때 마다 Observer의 onNext가 호출되고 Sample 데이터가 Observer에 전달된다. Observer의 onNext는 Observable에게 전달 받은 Sample 객체를 출력한다. 텍스트 파일을 다 읽었으면 Observer의 onComplete가 호출된다. 추가적으로 Observer 클래스에서 구현해야 하는 메서드 onSubscribe가 보인다. onSubscribe는 Observable과 Observer가 subscribe로 구독될 때 호출된다. Disposable 객체를 인자로 받는 데 d.dispose()를 호출하면 Observable과 Observer의 관계가 끊킨다. 즉 Observer에서는 onSubscribe만 호출된다. 아래 예의 결과는 "Start!!!"만 출력된다. 단, Observable에서 수행하는 작업은 끊키지 않는다. 아래 예를 통해 보면 d.dispose()를 호출한다고 해도 Sample 객체를 저장하는 list 변수에는 텍스트 파일에 있는 모든 Sample 객체가 저장된다.

public class main {
	private static Observable<Sample> getSamples() {
		return Observable.create(new ObservableOnSubscribe<Sample>() {

			@Override
			public void subscribe(ObservableEmitter<Sample> e) throws Exception {
				List<Sample> list = new ArrayList<Sample>();
				try {
					BufferedReader br = new BufferedReader(new FileReader("./samplelist.txt"));
					Sample sample;
					while (true) {
						String line = br.readLine();
						if (line == null)
							break;
						sample = new Sample(line.charAt(0), line.substring(2));
						list.add(sample);
						
						e.onNext(sample);
					}
					br.close();
				} catch (Exception ex) {
					ex.printStackTrace();
				}finally{
					e.onComplete();
				}
			}
		});
	}

	private static void printSamples() {

		getSamples().subscribe(new Observer<Sample>() {
			
			@Override
			public void onSubscribe(Disposable d) {
				// TODO Auto-generated method stub
				System.out.println("Start!!!");
			}

			@Override
			public void onNext(Sample t) {
				// TODO Auto-generated method stub
				System.out.println(t.getPrefix()+":"+t.getName());
			}

			@Override
			public void onError(Throwable e) {
				// TODO Auto-generated method stub
				System.out.println("Error");
			}

			@Override
			public void onComplete() {
				// TODO Auto-generated method stub
				System.out.println("Finish!!!");
			}
		});
	}

	public static void main(String[] args) {
		printSamples();
	}
}

- 결과



※ DefaultObserver

: 위 예제 코드와 printSamples()만 다르다. DefaultObserver는 아래의 구현해야 하는 메서드 말고 onStart 메서드를 추가로 구현 가능하다. public void onStart(); 메서드는 Observer의 onSubscribe와 역할이 거의 같다. 하지만 매개변수가 없다. 그래서 위 예제 결과와 같게 하려면 onStart 메서드를 오버라이딩한 다음에 sysout("Start!!!")를 메서드 내에 기술하면 된다.

	private static void printSamples() {
		getSamples().subscribe(new DefaultObserver<Sample>() {

			@Override
			public void onNext(Sample t) {
				// TODO Auto-generated method stub
				System.out.println(t.getPrefix()+":"+t.getName());				
			}

			@Override
			public void onError(Throwable e) {
				// TODO Auto-generated method stub
				System.out.println("Error");
			}

			@Override
			public void onComplete() {
				// TODO Auto-generated method stub
				System.out.println("Finish!!!");
			}	
		});
	}

DefaultObserver는 Observer에 없는 메서드 cancel()이 있다. cancel() 메서드를 호출하면 Observable과의 연결이 끊긴다. 즉 Observable에서 작업한 결과물이 Observer로 전송이 안 된다. 따라서 onComplete 메서드도 호출 안 된다. 단, Observable은 작업을 완료한다.

@Override public void onNext(Sample t) { // TODO Auto-generated method stub System.out.println(t.getPrefix()+":"+t.getName()); if(t.getPrefix() == 'G') this.cancel(); }

- 결과


: Rxjava는 안드로이드에서 비동기 작업을 손쉽게 구현할 수 있게 해준다. 안드로이드는 UI 변경을 메인 스레드에서 밖에 못 한다. 따라서 메인 스레드에서의 작업을 최소화해 UI 끊킴 현상을 없애야 한다. 그러기 위해서 별도의 스레드를 만들어서 비동기 작업을 해야 한다. 이를 위해 흔히 AsyncTask, Handler, Service 등의 클래스를 사용한다. 만약 웹 서버와의 통신 후 결과를 UI에 반영한다면 별도의 AsyncTask를 구현한 후 doInBackground나 onPostExecute 등의 콜백 메서드에 작업 내용을 기술해야 한다. 하지만 Rxjava를 사용하면 별도의 클래스 구현없이 간단하게 구현할 수 있다. Retrofit라는 라이브러리까지 사용하면 코드 몇 줄로 구현 가능하다. 


다음은 Rxjava의 특징이다. Rxjava를 다 배우게 되면 아래 특징들에 대해 이해가 갈 것이다.

1. 손쉬운 동시성

2. 콜백에서 벗어날 수 있다.

3. 리액티브 접근

4. 손쉬운 비동기 실행


Rxjava을 이해하기 위해 가장 중요하게 필요한 개념이 옵저버 패턴이다. 옵저버 패턴에 대해 모른다면 "옵저버 패턴(1) - JDK 활용" 포스트"옵저버 패턴(2) - 직접 구현" 포스트를 보는 걸 권장한다. 옵저버 패턴에 대해 간단히 설명하면 옵저버블에 연결된 여러 개의 옵저버들이 있다. 옵저버블에서 상태가 변경되면 연결된 옵저버들에게 이를 알린다. 옵저버들이 옵저버블과 연결된 것을 subscribe 즉, 구독한다 라고 한다. Rxjava에는 Observable과 Subject 개체가 Observable에 해당하고 Observer와 Subject 개체는 옵저버에 해당한다. 간단하게 Rxjava를 사용한 예를 보자. 참고로 필자는 Rxjava를 공부했는 데 Rxjava2가 나오고 Rxjava에 대한 지원이 곧 이루어지지 않는다고 해 Rxjava2 코드를 사용할 것이고 이를 기반으로 설명할 것이다. Rxjava2에 대해 익숙하지 않아 틀린 점이 있을 수도 있으니 양해 바란다. 다음은 Rxjava2를 사용한 간단한 예이다. 

public class main { public static void main(String[] args) { Observable<String> observable = Observable.create( new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> arg0) throws Exception { arg0.onNext("Hello world");                         arg0.onComplete(); } }); observable.subscribe(new HelloworldSubscriber()); } } class HelloworldSubscriber extends DefaultObserver<String> { @Override public void onComplete() { System.out.println("Complete"); } @Override public void onError(Throwable arg0) { System.out.println("Error"); } @Override public void onNext(String item) { System.out.println("Receive: " + item); } }


- 결과


: 위 코드를 알아보자. 먼저 Observable.create를 통해 옵저버블을 생성한다. Create 메서드는 ObservableOnSubscribe 객체를 인자로 가진다. 옵저버가 옵저버블을 구독(subscribe) 하면 ObservableOnSubscribe 객체의 subscribe 메서드가 실행된다. 옵저버는 onComplete, onError, onNext 메서드를 구현해야 한다. onNext는 옵저버블에서 생성된 데이터 즉, 위 예에서는 "Hello world"를 옵저버에게 전달하는 역할을 한다. onComplete는 옵저버블이 옵저버에게 전달할 데이터가 없을 때 모든 작업을 마쳤을 때 알리는 기능을 한다. onError는 에러가 났다는 걸 옵저버에게 알리는 역할을 한다.  


여기서 구독하는 방식에 대해서 알아보자. 옵저버블은 핫과 콜드로 나뉜다. 핫 옵저버블은 데이터(아이템/시퀀스)가 생성되자 마자 발행을 시작한다. 그러면 옵저버는 처음이 아닌 중간 데이터에 대한 결과값부터 받게 된다. 콜드 옵저버블은 데이터를 발행하기 전에 옵저버가 구독할 때까지 대기한다. 위의 예제가 콜드 옵저버블이다. 따라서 옵저버는 모든 데이터에 대한 결과값을 받는다. 


Rxjava는 위 예제와 같은 패턴으로 동작한다. 더 쉬운 Observable.from과 Observable.just에 대해 알아보자. 


 - Observable.from~

: from은 리스트, 배열 안 객체를 데이터로 하는 옵저버블을 생성하거나 Future 객체의 get 결과값을 데이터로 하는 옵저버블을 생성할 수 있다. 

public class main { public static void main(String[] args) { List<Integer> items = new ArrayList<Integer>(); items.add(1); items.add(2); items.add(3); items.add(4); items.add(5); items.add(6); Observable.fromIterable(items) .subscribe(new DefaultObserver<Integer>(){ @Override public void onComplete() { System.out.println("finished"); } @Override public void onError(Throwable arg0) { System.out.println("Error!!!"); } @Override public void onNext(Integer arg0) { System.out.println(arg0+ " received"); } }); } }

- 결과

결과값을 보다시피 리스트 안에 있는 객체 차례대로 Observer에게 전달하는 걸 알 수 있다. 모든 데이터에 대해 작업이 완료되면 자동으로 onComplete가 호출된다.


 - Observable.just

: just()는 함수의 결과 값을 데이터로 하는 옵저버블을 생성할 때 주로 사용한다. 



public class main {

	public static String Hello(){
		return "Hello";
	}
	
	public static String RxJava(){
		return "RxJava";
	}
	
	public static void main(String[] args) {
			Observable.just(Hello(), " ", RxJava())
					   .subscribe(new DefaultObserver<String>(){

						@Override
						public void onComplete() {
							System.out.format("\n%s", "finish");
						}

						@Override
						public void onError(Throwable arg0) {
							System.out.println("Error");
						}

						@Override
						public void onNext(String arg0) {
							System.out.print(arg0);
						}						   
			                   });
	}
}

- 결과

just 함수의 인자값은 최대 9개까지 가능하다.

+ Recent posts