※ 스케줄러(Schedulers)

: 안드로이드는 메인 스레드에서 네트워킹/파일 IO 등 복잡한 연산을 최소화해야 하므로 멀티스레딩은 필수이다. 스케줄러는 안드로이드에 쉽고 간단하게 멀티스레드를 적용해준다. 먼저 다음 예제를 보자. 안드로이드 스튜디오에서 실행했다. 아래 결과를 보면 Observable과 Observer가 메인 스레드에서 실행된 걸 알 수 있다. 

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Integer arr[] = new Integer[]{1,2,3};
                for(Integer i : arr){
                    Log.d("thread", Thread.currentThread().getName()+" Observable"+i);
                    e.onNext(i);
                }
            }
        }).subscribe(new DefaultObserver<Integer>() {
            @Override
            public void onNext(@NonNull Integer integer) {
                Log.d("thread",Thread.currentThread().getName()+" Observer"+integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {}

            @Override
            public void onComplete() {}
        });
    }
- 결과



- Schedulers.io()
: 비동기 I/O 작업을 위한 스케줄러이다. 스레드풀을 가진다. 아래 코드를 보면 subscribeOn(Schedulers.io()) 가 있다. subscribeOn은 Observable의 실행 스레드를 결정한다. 따라서 해당 Observable은 Schedulers.io 스레드에서 동작한다. 아래 결과를 보면 Observable과 Observer가 위 예제와 달리 main 스레드가 아닌 별도의 스레드에서 실행하는 걸 알 수 있다.
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Integer arr[] = new Integer[]{1,2,3};
                for(Integer i : arr){
                    Log.d("thread", Thread.currentThread().getName()+" Observable"+i);
                    e.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .subscribe(new DefaultObserver<Integer>() {
            @Override
            public void onNext(@NonNull Integer integer) {
                Log.d("thread",Thread.currentThread().getName()+" Observer"+integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {}

            @Override
            public void onComplete() {}
        });
    }

 결과



- Schedulers.computation()
: I/O와 관련없는 연산이나 콜백 처리를 위해 사용한다. Schedulers.io()와 다른 점은 스레드 풀의 스레드 갯수가 CPU 갯수로 제한된다. 이전 코드에서 Schedulers.io()를 Schedulers.computation()으로 바꾸면 된다. 결과를 보면 Schedulers.io()의 스레드와 다른 걸 알 수 있다.
.subscribeOn(Schedulers.computation())

 결과



- Schedulers.newThread()
: 새로운 스레드에서 작업한다.

.subscribeOn(Schedulers.newThread())

 결과



- Schedulers.trampoline()
: 현재 해당 코드가 실행되는 스레드에서 해당 Observable을 실행한다. Schedulers.trampoline()을 통해 스레드 내에서 순차적으로 큐 처리 작업을 할 수 있다. 아래 결과를 보면 안드로이드 메인 스레드에서 실행했으므로 trampoline()을 통해 main 스레드에서 Observable이 실행된다.

.subscribeOn(Schedulers.trampoline())

 결과



- Schedulers.from(Executor)
: 매개변수로 전달한 Executor 상에서 Observable을 실행한다. 아래 결과를 보면 Observable이 실행된 스레드는 아래 코드에서 만든 es 인걸 알 수 있다.
        ExecutorService es = Executors.newSingleThreadExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(@android.support.annotation.NonNull Runnable runnable) {
                return new Thread(runnable,"RxJavaSchedulers.from()");
            }
        });

	......

	.subscribeOn(Schedulers.from(es))

 결과



- AndroidSchedulers.mainThread()

: 안드로이드에서 백그라운드 스레드에서 작업 후 UI 변경을 위해 메인 스레드에 접근할 때 사용된다. 아래 코드를 보면 .observeOn(AdnroidSchedulers.mainThread())를 호출한다. observeOn은 observeOn 호출 뒤의 Observer의 작업이 수행될 스레드를 지정하는 것이다. 따라서 subscribeOn 전 작업은 Schedulers.io()에 의해 생성된 스레드로 진행되고 observeOn 뒤 작업은 안드로이드 메인 스레드에 의해 작업이 진행된다. 아래 예에 대한 결과를 보면 이를 잘 알 수 있다. 

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Integer arr[] = new Integer[]{1,2,3};
                for(Integer i : arr){
                    Log.d("thread", Thread.currentThread().getName()+" Observable"+i);
                    e.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new DefaultObserver<Integer>() {
            @Override
            public void onNext(@NonNull Integer integer) {
                Log.d("thread",Thread.currentThread().getName()+" Observer"+integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {}

            @Override
            public void onComplete() {}
        });
    }

 결과


+ Recent posts