-
Notifications
You must be signed in to change notification settings - Fork 4
Open
Description
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
int i = 0;
while (true) {
if (e.requested() == 0) continue;//此处添加代码,让flowable按需发送数据
System.out.println("发射---->" + i);
i++;
e.onNext(i);
}
}
}, BackpressureStrategy.MISSING)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
private Subscription mSubscription;
@Override
public void onSubscribe(Subscription s) {
s.request(1); //设置初始请求数据量为1
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(50);
System.out.println("接收------>" + integer);
mSubscription.request(1);//每接收到一条数据增加一条请求量
} catch (InterruptedException ignore) {
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
这段代码我直接运行并没有反应啊, 都没有打日志, 什么情况
Metadata
Metadata
Assignees
Labels
No labels