Skip to content

Rxjava2Demo Flowable的demo18一点小问题 #1

@teaim

Description

@teaim
Flowable
        .create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                int i = 0;
                while (true) {
                    if (e.requested() == 0) continue;//此处添加代码,让flowable按需发送数据
                    System.out.println("发射---->" + i);
                    i++;
                    e.onNext(i);
                }
            }
        }, BackpressureStrategy.MISSING)
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Subscriber<Integer>() {
            private Subscription mSubscription;

            @Override
            public void onSubscribe(Subscription s) {
                s.request(1);            //设置初始请求数据量为1
                mSubscription = s;
            }

            @Override
            public void onNext(Integer integer) {
                try {
                    Thread.sleep(50);
                    System.out.println("接收------>" + integer);
                    mSubscription.request(1);//每接收到一条数据增加一条请求量
                } catch (InterruptedException ignore) {
                }
            }

            @Override
            public void onError(Throwable t) {
            }

            @Override
            public void onComplete() {
            }
        });

这段代码我直接运行并没有反应啊, 都没有打日志, 什么情况

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions