前言
前段时间写了一篇对协程的一些理解,里面提到了不管是协程还是callback,本质上其实提供的是一种异步无阻塞的编程模式;并且介绍了java中对异步无阻赛这种编程模式的支持,主要提到了Future和CompletableFuture;之后有同学在下面留言提到了RxJava,刚好最近在看微服务设计这本书,里面提到了响应式扩展(Reactive extensions,Rx),而RxJava是Rx在JVM上的实现,所有打算对RxJava进一步了解。
RxJava简介
RxJava的官网地址:https://github.com/ReactiveX/RxJava,
其中对RxJava进行了一句话描述:RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
大意就是:一个在Java VM上使用可观测的序列来组成异步的、基于事件的程序的库。
更详细的说明在Netflix技术博客的一篇文章中描述了RxJava的主要特点:
1.易于并发从而更好的利用服务器的能力。
2.易于有条件的异步执行。
3.一种更好的方式来避免回调地狱。
4.一种响应式方法。
与CompletableFuture对比
之前提到CompletableFuture真正的实现了异步的编程模式,一个比较常见的使用场景:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(耗时函数);
Future<Integer> f = future.whenComplete((v, e) -> {
System.out.println(v);
System.out.println(e);
});
System.out.println("other...");
下面用一个简单的例子来看一下RxJava是如何实现异步的编程模式:
Observable<Long> observable = Observable.just(1, 2)
.subscribeOn(Schedulers.io()).map(new Func1<Integer, Long>() {
@Override
public Long call(Integer t) {
try {
Thread.sleep(1000); //耗时的操作
} catch (InterruptedException e) {
e.printStackTrace();
}
return (long) (t * 2);
}
});
observable.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("error" + e);
}
@Override
public void onNext(Long result) {
System.out.println("result = " + result);
}
});
System.out.println("other...");
Func1中以异步的方式执行了一个耗时的操作,Subscriber(观察者)被订阅到Observable(被观察者)中,当耗时操作执行完会回调Subscriber中的onNext方法。
其中的异步方式是在subscribeOn(Schedulers.io())中指定的,Schedulers.io()可以理解为每次执行耗时操作都启动一个新的线程。
结构上其实和CompletableFuture很像,都是异步的执行一个耗时的操作,然后在有结果的时候主动告诉我结果。那我们还需要RxJava干嘛,不知道你有没有注意,上面的例子中其实提供2条数据流[1,2],并且处理完任何一个都会主动告诉我,当然这只是它其中的一项功能,RxJava还有很多好用的功能,在下面的内容会进行介绍。
异步观察者模式
上面这段代码有没有发现特别像设计模式中的:观察者模式;首先提供一个被观察者Observable,然后把观察者Subscriber添加到了被观察者列表中;
RxJava中一共提供了四种角色:Observable、Observer、Subscriber、Subjects
Observables和Subjects是两个被观察者,Observers和Subscribers是观察者;
当然我们也可以查看一下源码,看一下jdk中的Observer和RxJava的Observer
jdk中的Observer:
public interface Observer {
void update(Observable o, Object arg);
}
RxJava的Observer:
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
同时可以发现Subscriber是implements Observer的:
public abstract class Subscriber<T> implements Observer<T>, Subscription
可以发现RxJava中在Observer中引入了2个新的方法:onCompleted()和onError()
onCompleted():即通知观察者Observable没有更多的数据,事件队列完结
onError():在事件处理过程中出异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出。
正是因为RxJava提供了同步和异步两种方式进行事件的处理,个人觉得异步的方式更能体现RxJava的价值,所以这里给他命名为异步观察者模式。
好了,下面正式介绍RxJava的那些灵活的操作符,这里仅仅是简单的介绍和简单的实例,具体用在什么场景下,会在以后的文章中介绍
Maven引入
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.2.4</version>
</dependency>
创建Observable
1.create()创建一个Observable,并为它定义事件触发规则
Observable<Integer> observable = Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
for (int i = 0; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
});
observable.subscribe(new Observer<Integer>() {...});
2.from()可以从一个列表中创建一个Observable,Observable将发射出列表中的每一个元素
List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
items.add(i);
}
Observable<Integer> observable = Observable.from(items);
observable.subscribe(new Observer<Integer>() {...});
3.just()将传入的参数依次发送出来
Observable<Integer> observable = Observable.just(1, 2, 3);
observable.subscribe(new Observer<Integer>() {...});
过滤Observable
1.filter()来过滤我们观测序列中不想要的值
List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
items.add(i);
}
Observable<Integer> observable = Observable.from(items).filter(
new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer t) {
return t == 1;
}
});
observable.subscribe(new Observer<Integer>() {...});
2.take()和taskLast()分别取前几个元素和后几个元素
List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
items.add(i);
}
Observable<Integer> observable = Observable.from(items).take(3);
observable.subscribe(new Observer<Integer>() {...});
Observable<Integer> observable = Observable.from(items).takeLast(2);
3.distinct()和distinctUntilChanged()
distinct()过滤掉重复的值
List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(10);
items.add(10);
Observable<Integer> observable = Observable.from(items).distinct();
observable.subscribe(new Observer<Integer>() {...});
distinctUntilChanged()列发射一个不同于之前的一个新值时让我们得到通知
List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(100);
items.add(100);
items.add(200);
Observable<Integer> observable = Observable.from(items).distinctUntilChanged();
observable.subscribe(new Observer<Integer>() {...});
4.first()和last()分别取第一个元素和最后一个元素
List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
items.add(i);
}
// Observable<Integer> observable = Observable.from(items).first();
Observable<Integer> observable = Observable.from(items).last();
observable.subscribe(new Observer<Integer>() {...});
5.skip()和skipLast()分别从前或者后跳过几个元素
List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
items.add(i);
}
// Observable<Integer> observable = Observable.from(items).skip(2);
Observable<Integer> observable = Observable.from(items).skipLast(2);
observable.subscribe(new Observer<Integer>() {...});
6.elementAt()取第几个元素进行发射
List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
items.add(i);
}
Observable<Integer> observable = Observable.from(items).elementAt(2);
observable.subscribe(new Observer<Integer>() {...});
7.sample()指定发射间隔进行发射
List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 50000; i++) {
items.add(i);
}
Observable<Integer> observable = Observable.from(items).sample(1,TimeUnit.MICROSECONDS);
observable.subscribe(new Observer<Integer>() {...});
8.timeout()设定的时间间隔内如果没有得到一个值则发射一个错误
List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
items.add(i);
}
Observable<Integer> observable = Observable.from(items).timeout(1,TimeUnit.MICROSECONDS);
observable.subscribe(new Observer<Integer>() {...onError()...});
9.debounce()在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个
List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
items.add(i);
}
Observable<Integer> observable = Observable.from(items).debounce(1,TimeUnit.MICROSECONDS);
observable.subscribe(new Observer<Integer>() {...});
转换Observable
1.map()接收一个指定的Func对象然后将它应用到每一个由Observable发射的值上
List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
items.add(i);
}
Observable<Integer> observable = Observable.from(items).map(
new Func1<Integer, Integer>() {
@Override
public Integer call(Integer t) {
return t * 2;
}
});
observable.subscribe(new Observer<Integer>() {...});
2.flatMap()函数提供一种铺平序列的方式,然后合并这些Observables发射的数据
final Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(3));
List<Integer> items = new ArrayList<Integer>();
for (int i = 0; i < 5; i++) {
items.add(i);
}
Observable<Integer> observable = Observable.from(items).flatMap(
new Func1<Integer, Observable<? extends Integer>>() {
@Override
public Observable<? extends Integer> 声明:所有来源为“聚合数据”的内容信息,未经本网许可,不得转载!如对内容有异议或投诉,请与我们联系。邮箱:marketing@think-land.com
支持全球约2.4万个城市地区天气查询,如:天气实况、逐日天气预报、24小时历史天气等
支持识别各类商场、超市及药店的购物小票,包括店名、单号、总金额、消费时间、明细商品名称、单价、数量、金额等信息,可用于商品售卖信息统计、购物中心用户积分兑换及企业内部报销等场景
涉农贷款地址识别,支持对私和对公两种方式。输入地址的行政区划越完整,识别准确度越高。
根据给定的手机号、姓名、身份证、人像图片核验是否一致
通过企业关键词查询企业涉讼详情,如裁判文书、开庭公告、执行公告、失信公告、案件流程等等。