Observable Android
Introduce RxJava and Android
Bài đăng này đã không được cập nhật trong 5 năm Show ReactiveX là gì?ReactiveX API tập trung vào đồng bộ dữ liệu, là kết hợp tốt nhất từ các pattern Observer, Iterator và ngôn ngữ lập trình hàm. Lấy dữ liệu theo thời gian thực là vấn đề thông dụng đòi hỏi giải pháp rạch ròi, tối ưu, và có khả năng mở rộng. Sử dụng Observables và các toán tử, ReactiveX cung cấp một tổ hợp các API linh hoạt để tạo và thao tác trên dòng dữ liệu giải quyết vấn đề đồng bộ. Giới thiệu RxJavaRxJava là thư viện mã nguồn mở implement ReactiveX trên Java. Có 2 lớp chính là Observable và Subscriber.
Một Observable có thể có một hoặc nhiều Subcriber
Khởi tạo một Observable và đăng kí event từ Observable đó như sau:Observable integerObservable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); subscriber.onCompleted(); }}); Observable này đưa ra các message 1,2,3 rồi kết thúc. Tiếp theo chúng ta tạo một Subcriber để nhận lấy dữ liệu từ ObservableSubscriber integerSubscriber = new Subscriber() { @Override public void onCompleted() { System.out.println("Complete!"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer value) { System.out.println("onNext: " + value); }}; Đăng kí subcriber với Observable và kiểm tra kết quả của chương trình:integerObservable.subscribe(integerSubscriber);// Outputs:// onNext: 1// onNext: 2// onNext: 3// Complete! Viết gọn quá trình trên bằng block sau:Observable.just(1, 2 ,3).subscribe(new Subscriber() { @Override public void onCompleted() { System.out.println("Complete!"); } @Override public void onError(Throwable e) {} @Override public void onNext(Integer value) { System.out.println("onNext: " + value); }}); Hàm Obserable.just() tạo một Observable cho phép đưa ra các giá trị 1, 2, 3 và nhận lớp inner subcriber đăng kí. OperatorTạo và subcrie một Observable khá đơn giản, tuy nhiên subcriber không cần lắng nghe hết các event của Obervable tạo ra. Ở ví dụ dưới đây, một Observable sẽ sử dụng filter để lọc dữ liệu là số lẻ trả về cho subcriber: Observable.just(1, 2, 3, 4, 5, 6) // add more numbers .filter(new Func1() { @Override public Boolean call(Integer value) { return value % 2 == 1; } }) .subscribe(new Subscriber() { @Override public void onCompleted() { System.out.println("Complete!"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer value) { System.out.println("onNext: " + value); } });// Outputs:// onNext: 1// onNext: 3// onNext: 5// Complete! Hàm filter() sẽ lọc qua dữ liệu mỗi lần Observable bắn data ra, filter sẽ trả về true nếu dữ liệu là số lẻ, khi đó các subcriber sẽ nhận được data này ở onNext(). RxJava còn cung cấp một toán tử khác cho phép custom lại dữ liệu bắn ra từ Observable là map(), xem ví dụ sau:Observable.just(1, 2, 3, 4, 5, 6) // add more numbers .filter(new Func1() { @Override public Boolean call(Integer value) { return value % 2 == 1; } }) .map(new Func1() { @Override public Double call(Integer value) { return Math.sqrt(value); } }) .subscribe(new Subscriber() { // notice Subscriber type changed to @Override public void onCompleted() { System.out.println("Complete!"); } @Override public void onError(Throwable e) { } @Override public void onNext(Double value) { System.out.println("onNext: " + value); } });// Outputs:// onNext: 1.0// onNext: 1.7320508075688772// onNext: 2.23606797749979// Complete! Dữ liệu sau khi filter gồm các số lẻ, được tính ra căn bậc 2 để trả về cho Subcriber. Operator này cho phép trả về các dữ liệu một cách linh hoạt. ** ###Integrating RxJava with Android ** ImplementĐể implement RxAndroid, trong maven dependence, ta thêm line saucompile 'io.reactivex:rxandroid:1.0.1'. Simple Threading in AndroidMột concept thường gặp trên Android là thực hiện dữ liệu ở tầng background, khi task hoàn thành, đưa kết quả hiển thị lên main thread. Với Android có nhiều cách để làm điều này như sử dụng AsynTasks, Loaders, Services, etc. Tuy nhiên những phương pháp này chưa phải tốt nhất, như AsynTasks có thể dễ dẫn đến lỗi Memory Leak, CursorLoader và ContentProvider thì đòi hỏi nhiều setup và config trong code, Services lại đòi hỏi quá nhiều tài nguyên trong background khi nó cứ chạy suốt. Hãy xem cách RxJava có thể giúp giải quyết vấn đề như thé nào.
Và task để tính toán, chạy dưới background:public String longRunningOperation() { try { Thread.sleep(2000); } catch (InterruptedException e) { // error } return "Complete!";} private class SampleAsyncTask extends AsyncTask { @Override protected String doInBackground(Void... params) { return longRunningOperation(); } @Override protected void onPostExecute(String result) { Snackbar.make(rootView, result, Snackbar.LENGTH_LONG).show(); startAsyncTaskButton.setEnabled(true); }} Và giờ convert Asyntask trên sang RxJavafinal Observable operationObservable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext(longRunningOperation()); subscriber.onCompleted(); }}); Tạo Subcriber đăng kí nhận eventstartRxOperationButton.setOnClickListener(new View.OnClickListener() { @Override public void onClick(final View v) { v.setEnabled(false); operationObservable.subscribe(new Subscriber() { @Override public void onCompleted() { v.setEnabled(true); } @Override public void onError(Throwable e) {} @Override public void onNext(String value) { Snackbar.make(rootView, value, Snackbar.LENGTH_LONG).show(); } }); }}); Định nghĩa thread cho Observablefinal Observable operationObservable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext(longRunningOperation()); subscriber.onCompleted(); }}) .subscribeOn(Schedulers.io()) // subscribeOn the I/O thread .observeOn(AndroidSchedulers.mainThread()); // observeOn the UI Thread Done! RxAndroid là một extension nhẹ cung cấp đồng bộ trên Main Thread , cũng như cho phép đồng bộ trên bất kì Handler nào. RxJava là giải pháp tốt cho multithread trên Android. Ví dụ về convert một Asyntask sang RxJava có thể tìm thấy ở github repo sau: https://github.com/alex-townsend/GettingStartedRxAndroid Reference https://www.captechconsulting.com/blogs/getting-started-with-rxjava-and-android Introduce RxJava and Android
Bài đăng này đã không được cập nhật trong 5 năm ReactiveX là gì?ReactiveX API tập trung vào đồng bộ dữ liệu, là kết hợp tốt nhất từ các pattern Observer, Iterator và ngôn ngữ lập trình hàm. Lấy dữ liệu theo thời gian thực là vấn đề thông dụng đòi hỏi giải pháp rạch ròi, tối ưu, và có khả năng mở rộng. Sử dụng Observables và các toán tử, ReactiveX cung cấp một tổ hợp các API linh hoạt để tạo và thao tác trên dòng dữ liệu giải quyết vấn đề đồng bộ. Giới thiệu RxJavaRxJava là thư viện mã nguồn mở implement ReactiveX trên Java. Có 2 lớp chính là Observable và Subscriber.
Một Observable có thể có một hoặc nhiều Subcriber
Khởi tạo một Observable và đăng kí event từ Observable đó như sau:Observable integerObservable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); subscriber.onCompleted(); }}); Observable này đưa ra các message 1,2,3 rồi kết thúc. Tiếp theo chúng ta tạo một Subcriber để nhận lấy dữ liệu từ ObservableSubscriber integerSubscriber = new Subscriber() { @Override public void onCompleted() { System.out.println("Complete!"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer value) { System.out.println("onNext: " + value); }}; Đăng kí subcriber với Observable và kiểm tra kết quả của chương trình:integerObservable.subscribe(integerSubscriber);// Outputs:// onNext: 1// onNext: 2// onNext: 3// Complete! Viết gọn quá trình trên bằng block sau:Observable.just(1, 2 ,3).subscribe(new Subscriber() { @Override public void onCompleted() { System.out.println("Complete!"); } @Override public void onError(Throwable e) {} @Override public void onNext(Integer value) { System.out.println("onNext: " + value); }}); Hàm Obserable.just() tạo một Observable cho phép đưa ra các giá trị 1, 2, 3 và nhận lớp inner subcriber đăng kí. OperatorTạo và subcrie một Observable khá đơn giản, tuy nhiên subcriber không cần lắng nghe hết các event của Obervable tạo ra. Ở ví dụ dưới đây, một Observable sẽ sử dụng filter để lọc dữ liệu là số lẻ trả về cho subcriber: Observable.just(1, 2, 3, 4, 5, 6) // add more numbers .filter(new Func1() { @Override public Boolean call(Integer value) { return value % 2 == 1; } }) .subscribe(new Subscriber() { @Override public void onCompleted() { System.out.println("Complete!"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer value) { System.out.println("onNext: " + value); } });// Outputs:// onNext: 1// onNext: 3// onNext: 5// Complete! Hàm filter() sẽ lọc qua dữ liệu mỗi lần Observable bắn data ra, filter sẽ trả về true nếu dữ liệu là số lẻ, khi đó các subcriber sẽ nhận được data này ở onNext(). RxJava còn cung cấp một toán tử khác cho phép custom lại dữ liệu bắn ra từ Observable là map(), xem ví dụ sau:Observable.just(1, 2, 3, 4, 5, 6) // add more numbers .filter(new Func1() { @Override public Boolean call(Integer value) { return value % 2 == 1; } }) .map(new Func1() { @Override public Double call(Integer value) { return Math.sqrt(value); } }) .subscribe(new Subscriber() { // notice Subscriber type changed to @Override public void onCompleted() { System.out.println("Complete!"); } @Override public void onError(Throwable e) { } @Override public void onNext(Double value) { System.out.println("onNext: " + value); } });// Outputs:// onNext: 1.0// onNext: 1.7320508075688772// onNext: 2.23606797749979// Complete! Dữ liệu sau khi filter gồm các số lẻ, được tính ra căn bậc 2 để trả về cho Subcriber. Operator này cho phép trả về các dữ liệu một cách linh hoạt. ** ###Integrating RxJava with Android ** ImplementĐể implement RxAndroid, trong maven dependence, ta thêm line saucompile 'io.reactivex:rxandroid:1.0.1'. Simple Threading in AndroidMột concept thường gặp trên Android là thực hiện dữ liệu ở tầng background, khi task hoàn thành, đưa kết quả hiển thị lên main thread. Với Android có nhiều cách để làm điều này như sử dụng AsynTasks, Loaders, Services, etc. Tuy nhiên những phương pháp này chưa phải tốt nhất, như AsynTasks có thể dễ dẫn đến lỗi Memory Leak, CursorLoader và ContentProvider thì đòi hỏi nhiều setup và config trong code, Services lại đòi hỏi quá nhiều tài nguyên trong background khi nó cứ chạy suốt. Hãy xem cách RxJava có thể giúp giải quyết vấn đề như thé nào.
Và task để tính toán, chạy dưới background:public String longRunningOperation() { try { Thread.sleep(2000); } catch (InterruptedException e) { // error } return "Complete!";} private class SampleAsyncTask extends AsyncTask { @Override protected String doInBackground(Void... params) { return longRunningOperation(); } @Override protected void onPostExecute(String result) { Snackbar.make(rootView, result, Snackbar.LENGTH_LONG).show(); startAsyncTaskButton.setEnabled(true); }} Và giờ convert Asyntask trên sang RxJavafinal Observable operationObservable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext(longRunningOperation()); subscriber.onCompleted(); }}); Tạo Subcriber đăng kí nhận eventstartRxOperationButton.setOnClickListener(new View.OnClickListener() { @Override public void onClick(final View v) { v.setEnabled(false); operationObservable.subscribe(new Subscriber() { @Override public void onCompleted() { v.setEnabled(true); } @Override public void onError(Throwable e) {} @Override public void onNext(String value) { Snackbar.make(rootView, value, Snackbar.LENGTH_LONG).show(); } }); }}); Định nghĩa thread cho Observablefinal Observable operationObservable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext(longRunningOperation()); subscriber.onCompleted(); }}) .subscribeOn(Schedulers.io()) // subscribeOn the I/O thread .observeOn(AndroidSchedulers.mainThread()); // observeOn the UI Thread Done! RxAndroid là một extension nhẹ cung cấp đồng bộ trên Main Thread , cũng như cho phép đồng bộ trên bất kì Handler nào. RxJava là giải pháp tốt cho multithread trên Android. Ví dụ về convert một Asyntask sang RxJava có thể tìm thấy ở github repo sau: https://github.com/alex-townsend/GettingStartedRxAndroid Reference https://www.captechconsulting.com/blogs/getting-started-with-rxjava-and-android |