File tree Expand file tree Collapse file tree 3 files changed +80
-0
lines changed
src/test/java/tool/rxjava Expand file tree Collapse file tree 3 files changed +80
-0
lines changed Original file line number Diff line number Diff line change 5252 <version >1.26.2</version >
5353 </dependency >
5454
55+ <!-- 异步化编程 工具类 -->
56+ <dependency >
57+ <groupId >io.reactivex.rxjava3</groupId >
58+ <artifactId >rxjava</artifactId >
59+ <version >3.1.8</version >
60+ </dependency >
61+
5562 </dependencies >
5663</project >
Original file line number Diff line number Diff line change 1+
2+
3+ ## RxJava
4+
5+ RxJava 是一个使用可观察序列来编写异步和基于事件的程序的库。
6+ 它扩展了观察者模式,支持数据/事件序列的丰富操作,如映射、过滤、合并等。
7+ RxJava 的核心思想是,数据(事件)的序列会在不同的时间点被发出、处理并最终交付给观察者(订阅者)
8+
9+ ### 核心概念
10+ - Observable:可观察序列,它定义了数据/事件序列的生成方式。
11+ - Observer:观察者,它定义了如何处理数据/事件序列。
12+ - Subscription:订阅,表示Observable与Observer之间的关联,使Observer能够取消订阅。
13+ - Schedulers:调度器,用于管理数据何时、在哪种线程以及如何并发地发出、处理和传递。
14+
15+
16+ ### 特点
17+ - 非阻塞操作:RxJava 支持在不阻塞主线程的情况下执行长时间运行的任务,例如网络调用或文件读写。
18+ - 线程调度:利用内建的Schedulers,可以灵活地调整任务执行所在的线程,提升系统效率。
19+ - 异常管理:RxJava 设计了一套通用的异常处理流程,在异步环境中捕捉和应对异常更加直接。
20+ - 多样化的操作符:一系列的操作符可供选择,用于对数据流进行变换和聚合,比如 map、filter 和 concat,这显著提升了编码的表现力和适应性。
21+
22+
23+
Original file line number Diff line number Diff line change 1+ package tool .rxjava ;
2+
3+ import io .reactivex .rxjava3 .core .Observable ;
4+ import io .reactivex .rxjava3 .schedulers .Schedulers ;
5+ import org .junit .jupiter .api .Test ;
6+
7+ import java .util .Arrays ;
8+ import java .util .concurrent .TimeUnit ;
9+
10+ /**
11+ * 在当前Java开发领域,异步编程已成为提高应用性能、响应能力和扩展性的重要方法之一。
12+ * RxJava 是一种基于观察者模式的Java虚拟机实现,旨在通过异步和事件驱动的方式构建应用程序。
13+ * 它不仅简化了异步和事件驱动编程,并且通过提供丰富多样的操作符来处理数据流,增强了其功能性和灵活性。
14+ */
15+ public class RxjavaTest {
16+
17+
18+ @ Test
19+ void test01 () {
20+
21+ // 创建一个Observable,模拟异步操作
22+ Observable <Integer > observable = Observable .fromIterable (Arrays .asList (1 , 2 , 3 ))
23+ .subscribeOn (Schedulers .io ()) // 指定Observable在IO线程上执行
24+ .observeOn (Schedulers .single ()) // // 指定Observer在单线程上接收数据
25+ .map (item -> {
26+ TimeUnit .SECONDS .sleep (2L );
27+ return item + 1 ;
28+ });
29+
30+ // 订阅Observable,并处理接收到的数据
31+ observable .subscribe (
32+ result -> System .out .println (result ),// 接收到的数据
33+ error -> error .printStackTrace (), // 错误处理
34+ () -> System .out .println ("complete" )// 完成通知
35+ );
36+
37+ // 主线程可以继续执行其他任务,无需等待Observable完成
38+
39+ System .out .println ("end" );
40+
41+ try {
42+ TimeUnit .SECONDS .sleep (10L );
43+ } catch (InterruptedException e ) {
44+ e .printStackTrace ();
45+ }
46+
47+ }
48+
49+
50+ }
You can’t perform that action at this time.
0 commit comments