针对RxJS 是不是前端开发的未来? 这个问题,大家可以各抒己见!如果还不了解 RxJS,可以跟着本文一起大概的去学习一下,再做出自己的判断。
学习一项新的技术,我觉得首先应该是去先看官方文档,和 github 开源仓库等。先从本源上学习了解到设计者的思想和官方用法。自己学习完后,如有一些还不理解,可以找一些社区教程、书籍、文档等资源来辅助学习理解。这样才能保证尽可能从本源学习,有自己的所见所闻,得出自己的理解,不是人云亦云、口口相传的可能有错误的答案。
学习 RxJS 我的主要有三大步骤:
- 理解相关概念及思想
- 熟悉各种操作符
- 联想使用场景
github 地址:https://github.com/ReactiveX/rxjs
官方文档地址(建议直接看官方的):https://rxjs.dev/
中文社区文档地址(辅助阅读):https://cn.rx.js.org/manual/overview.html
中文文档(辅助阅读):https://rxjs.tech/
学习RxJS操作符和响应式编程原则: https://reactive.how/
RxJS 可视化理解: https://rxviz.com/
RxJS 简介
本篇文章是基于 V7.8.0
官方简介:JavaScript 的响应式扩展库
RxJS(Reactive Extensions for JavaScript) 是一个使用 Observables 进行响应式编程的库,可以更轻松地编写异步或基于回调的代码。
该项目是对 Reactive-Extensions/RxJS(RxJS 4) 的重写,具有更好的性能、更好的模块化、更好的可调试调用堆栈,同时保持大部分向后兼容,只有一些破坏性的变更(breaking changes)是为了减少外层的 API 。
RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 Array 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。
Think of RxJS as Lodash for events.
RxJS 的 Logo 是鱼,是因为 RxJS 的概念和操作符可以被看作是一条鱼在游动,数据流就像是鱼在水中游动的路径。鱼的形象也代表了 RxJS 的灵活性和可扩展性,可以适应不同的应用场景和需求。此外,鱼也象征着 RxJS 的响应式编程思想,即数据流的变化会引起相应的响应和处理。
代表“流”的变量标示符,都是用 \$ 符号结尾,这是 RxJS 编程中普遍使用的风格,被称为“芬兰式命名法”(Finnish Notation)。
为什么选 RxJS
随着时代的发展,技术也在不断更新换代。
面向对象式编程 ——》 函数式编程
函数式编程就是非常强调使用函数来解决问题的一种编程方式。
函数式编程对函数的使用有一些特殊的要求,这些要求包括以下几点:
- 声明式(Declarative)
- 纯函数(Pure Function)
- 数据不可变性(Immutability)
从语言角度讲,JavaScript 当然不算一个纯粹意义上的函数式编程语言,但是,JavaScript 中的函数有第一公民的身份,因为函数本身就是一个对象,可以被赋值给一个变量,可以作为参数传递,由此可以很方便地应用函数式编程的许多思想。
JavaScript 并不是纯粹的函数式编程语言,但是,通过应用一些编程规范,再借助一点工具的帮助,我们完全可以用 JavaScript 写出函数式的代码,RxJS 就是辅助我们写出函数式代码的一种工具。
指令式编程 ——》 响应式编程
RxJS 就是兼具函数式和响应式两种先进编程风格的框架。
RxJS 是一个组织异步逻辑的库,它有很多 operator,可以极大的简化异步逻辑的编写。
它是由数据源产生数据,经过一系列 operator 的处理,最后传给接收者。
但是 RxJS 的 operator 多呀,组合起来可以实现非常复杂的异步逻辑处理。
还能帮助我们解决一些问题:
- 如何控制大量代码的复杂度
- 如何保持代码可读性
- 如何处理异步操作
- 数据流抽象了很多现实问题
相关概念介绍
说明:以下部分主要出自官方文档(因为最好的概念介绍就是官方),对一些核心的概念、知识点做了一些总结(是摘录总结翻译,不是全盘抄录)。同时也是对官方文档意译文,帮助辅助中文式阅读。
Reactive Extension
Reactive Extension,也叫 ReactiveX,简称 Rx,是基于响应式的扩展,是各种语言实现的一个统称。
Rx 是一套通过可监听流来做异步编程的API。
ReactiveX 将 Observer 模式与 Iterator 模式以及函数式编程与集合相结合,以满足对管理事件序列的理想方式的需求。
Observer 模式
Iterator 模式
函数式编程
集合
RxJS 中解决异步事件管理的基本概念是:
- Observable:表示一个可调用的未来值或事件的集合的概念。
- Observer:是回调的集合,知道如何监听 Observable 传递的值。
- Subscription:表示 Observable 的执行,主要用于取消执行。
- Operators:采用函数式编程风格的纯函数,支持使用
map
、filter
、concat
、reduce
等操作处理集合。 - Subject:相当于一个 EventEmitter,是将一个值或事件多播给多个 Observers 的唯一途径。
- Schedulers:是控制并发的集中式调度程序,允许我们在计算发生时进行协调,例如
setTimeout
或requestAnimationFrame
或其它。
Observable *
被观察者,用来产生消息/数据。
Observable | Promise | |
---|---|---|
使用场景 | 同步、异步均可使用 | 用 Promise 包裹的多数是异步场景 |
执行时机 | 声明式惰性执行,只有在订阅后才会执行 | 创建时就立即执行 |
执行次数 | 多次调用 subscribe 函数会执行多次 | 只有第一次执行,后续都是取值 |
流程控制 | 相较于 Promise 有更为全面的操作符 | 提供串行、并行的函数 |
错误处理 | subscribe 函数捕获错误 | .catch 捕获 |
Observable 是多个值的惰性推送集合。本质其实就是一个随时间不断产生数据的一个集合,称之为流更容易理解。
import { Observable } from 'rxjs';
const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
// 订阅查看这些值的变化
console.log('just before subscribe');
observable.subscribe({
next(x) {
console.log('got value ' + x);
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
},
});
console.log('just after subscribe');
// 控制台打印出的订阅数据
just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done
Pull vs Push
pull 和 push 是两种不同的协议,用来描述数据生产者 (Producer) 如何与数据消费者 (Consumer) 进行通信的。
Observables 作为函数的泛化
Observables 像是没有参数, 但可以泛化为多个值的函数。
订阅 Observable 类似于调用函数。
Observables 能够同步或异步传递值。
Observable 剖析
Observable 的核心关注点:
- 创建 Observables
- 订阅 Observables
- 执行 Observables
- 清理 Observables
创建 Observables:
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
const id = setInterval(() => {
subscriber.next('hi');
}, 1000);
});
订阅 Observables
observable.subscribe((x) => console.log(x));
// 订阅一个 Observable 就像调用一个函数,在数据将被传送到的地方提供回调。
执行 Observables
在 Observable 执行中, 可能会发送零个到无穷多个 “Next” 通知。如果发送的是 “Error” 或 “Complete” 通知的话,那么之后不会再发送任何通知了。
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // Is not delivered because it would violate the contract
});
const observable2 = new Observable(function subscribe(subscriber) {
try {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
} catch (err) {
subscriber.error(err); // delivers an error if it caught one
}
});
清理 Observables
import { from } from 'rxjs';
const observable = from([10, 20, 30]);
const subscription = observable.subscribe((x) => console.log(x));
// Later:
subscription.unsubscribe();
// 当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。
// 只要调用 `unsubscribe()` 方法就可以取消执行。
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
// Keep track of the interval resource
const intervalId = setInterval(() => {
subscriber.next('hi');
}, 1000);
// Provide a way of canceling and disposing the interval resource
return function unsubscribe() {
clearInterval(intervalId);
};
});
function subscribe(subscriber) {
const intervalId = setInterval(() => {
subscriber.next('hi');
}, 1000);
return function unsubscribe() {
clearInterval(intervalId);
};
}
const unsubscribe = subscribe({ next: (x) => console.log(x) });
// Later:
unsubscribe(); // dispose the resources
Observer
从行为上来看,无非就是定义了如何处理上述流产生的数据,称之为流的处理方法。
const observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
Operators
观察者,用来消费消息/数据。
Subscription *
Subscription 是表示可清理资源的对象,它是由 Observable 执行之后产生的。
本质就是暂存了一个启动后的流,每一个启动后的流都是相互独立的,而这个启动后的流,就存储在subscription
中,提供了unsubscribe
,来停止这个流。
Subject *
Subject
是一类特殊的Observable
,它可以向多个Observer
多路推送数值。
Observable | Subject | |
---|---|---|
角色 | 生产者(单向) | 生产者、消费者(双向) |
消费策略 | 单播 | 多播 |
流转方式 | 内部发送/接收数据 | 外部发送/接收数据 |
数据特性 | 冷数据流 | 热数据流 |
消费时机 | 调用 subscribe | 调用 next |
Scheduler
简单 demo
推荐在 https://stackblitz.com/ 练习。
事件监听器
// 以前
document.addEventListener('click', () => console.log('我被“惦记”了!'));
// 现在
import { fromEvent } from 'rxjs';
fromEvent(document, 'click').subscribe(() => console.log('我被“惦记”了!'));
Purity
使 RxJS 强大的是它使用纯函数产生值的能力。这意味着您的代码不太容易出错。
所谓纯函数,指的是满足下面两个条件的函数:
- 函数的执行过程完全由输入参数决定,不会受除参数之外的任何数据影响。
- 函数不会修改任何外部状态,比如修改全局变量或传入的参数对象。
// 你的其他代码部分可能会弄乱你的状态
let count = 0;
document.addEventListener('click', () => console.log(`我被“惦记”了 ${++count} 次`));
// 使用 RxJS 可以隔离状态
import { fromEvent, scan } from 'rxjs';
fromEvent(document, 'click')
.pipe(scan((count) => count + 1, 0))
.subscribe((count) => console.log(`我被“惦记”了 ${count} 次`));
// scan 运算符的工作方式与数组的 reduce 类似。
// 它接受一个暴露给回调的值。回调的返回值将成为下次运行回调时下一个暴露的值。
Flow
RxJS 有一整套运算符,可以帮助您控制事件如何通过 observables 流动。
// 每秒最多点击一次的方式
let count = 0;
let rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', () => {
if (Date.now() - lastClick >= rate) {
console.log(`我被“惦记”了 ${++count} 次`);
lastClick = Date.now();
}
});
//
import { fromEvent, throttleTime, scan } from 'rxjs';
fromEvent(document, 'click')
.pipe(
throttleTime(1000),
scan((count) => count + 1, 0)
)
.subscribe((count) => console.log(`我被“惦记”了 ${count} 次`));
Values
// 在纯 JavaScript 中为每次点击添加当前鼠标 x 位置
let count = 0;
const rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', (event) => {
if (Date.now() - lastClick >= rate) {
count += event.clientX;
console.log(count);
lastClick = Date.now();
}
});
//
import { fromEvent, throttleTime, map, scan } from 'rxjs';
fromEvent(document, 'click')
.pipe(
throttleTime(1000),
map((event) => event.clientX),
scan((count, clientX) => count + clientX, 0)
)
.subscribe((count) => console.log(count));
场景介绍
都有哪些场景能用到 RxJS 呢,以下举出了一些场景例子。供大家参考,其实还有更广泛的场景,还望大家自己发觉。
异步场景
- AJAX / XHR(XMLHttpRequest) / fetch API
- Service Worker / Node Stream
- setTimeout / setInterval
- Promise
事件场景
- 各种 DOM 事件(click,dbclick,keyup、keydown…)
- css3 动画事件(transition)
- html5 Geolocation
- WebSocket / Server-Sent Events
微前端通信
// 主应用 main.js 部分内容
import { Subject } from 'rxjs' // 按需引入减少依赖包大小
const pager = new Subject()
// 在主应用注册呼机监听器,这里可以监听到其他应用的广播
pager.subscribe(v => {
console.log(`main主应用监听到子应用${v.from}发来消息:`, v)
// store.dispatch('app/setToken', v.token) // 这里处理主应用监听到改变后的逻辑
// rxjs 事件
qkEvent.toggleSidebar(v)
qkEvent.generateHeaderNavDoc(v)
qkEvent.generateDropdownMenu(v)
})
// 结合下章主应用下发资源给子应用,将pager作为一个模块传入子应用
const msg = {
// data: store.getters, // 从主应用仓库读出的数据
data: {},
// components: LibraryUi, // 从主应用读出的组件库
// utils: LibraryJs, // 从主应用读出的工具类库
// emitFnc: childEmit, // 从主应用下发emit函数来收集子应用反馈
pager // 从主应用下发应用间通信呼机
}
export const qiankunRegisterMicroApps = [
{
name: FUSANG_NAME,
entry: qiankunEntryConfig(FUSANG_NAME),
container: '#appContainer',
activeRule: '/ops-fe',
loader (loading) {
},
props: { store, pager: msg, parentRoute: router }
},
]
// 子应用
// 引入主应用Subject实例
if (props.pager) {
Vue.prototype.$pager = props.pager
}
// 使用
this.$pager && this.$pager.next({
from: 'logcenter-fe', // 从哪来 子应用的名字
event: 'sendDoc', // 跳转文档
data: [{ path: 'https://xxxx.com/docs/logcenter/', target: '_blank' }]
// 传什么数据
})
实现一个批量请求函数 multiRequest(urls, maxNum)
要求如下:
- 要求最大并发数 maxNum
- 每当有一个请求返回,就留下一个空位,可以增加新的请求
- 所有请求完成后,结果按照 urls 里面的顺序依次打出
// promise
function multiRequest(urls = [], maxNum) {
// 请求总数量
const len = urls.length;
// 根据请求数量创建一个数组来保存请求的结果
const result = new Array(len).fill(false);
// 当前完成的数量
let count = 0;
return new Promise((resolve, reject) => {
// 请求maxNum个
while (count < maxNum) {
next();
}
function next() {
let current = count++;
// 处理边界条件
if (current >= len) {
// 请求全部完成就将promise置为成功状态, 然后将result作为promise值返回
!result.includes(false) && resolve(result);
return;
}
const url = urls[current];
console.log(`开始 ${current}`, new Date().toLocaleString());
fetch(url)
.then((res) => {
// 保存请求结果
result[current] = res;
console.log(`完成 ${current}`, new Date().toLocaleString());
// 请求没有全部完成, 就递归
if (current < len) {
next();
}
})
.catch((err) => {
console.log(`结束 ${current}`, new Date().toLocaleString());
result[current] = err;
// 请求没有全部完成, 就递归
if (current < len) {
next();
}
});
}
});
}
// RxJS
// 假设这是你的http请求函数
function httpGet(url) {
return new Promise(resolve => setTimeout(() => resolve(`Result: ${url}`), 2000));
}
const array = [
'https://httpbin.org/ip',
'https://httpbin.org/user-agent',
'https://httpbin.org/delay/3',
];
// mergeMap 是专门用来处理并发处理的 rxjs 操作符
// mergeMap 第二个参数2的意思是,from(array)每次并发量是2,只有promise执行结束才接着取array里面的数据
// mergeMap第一个参数 httpGet的意思是每次并发,从from(array)中取的数据如何包装,这里是作为httpGet的参数
const source = from(array)
.pipe(mergeMap(httpGet, 2))
.subscribe(val => console.log(val));
RxJS 和 Nest
nest 的 interceptor 就用了 rxjs 来处理响应,但常用的 operator 也就几个:
- tap: 不修改响应数据,执行一些额外逻辑,比如记录日志、更新缓存等
- map:对响应数据做修改,一般都是改成 {code, data, message} 的格式
- catchError:在 exception filter 之前处理抛出的异常,可以记录或者抛出别的异常
- timeout:处理响应超时的情况,抛出一个 TimeoutError,配合 catchErrror 可以返回超时的响应
使用 tap operator 来添加一些日志、缓存等逻辑:
import { AppService } from './app.service';
import { CallHandler, ExecutionContext, Injectable, Logger, NestInterceptor } from '@nestjs/common';
import { Observable, tap } from 'rxjs';
@Injectable()
export class TapTestInterceptor implements NestInterceptor {
constructor(private appService: AppService) {}
private readonly logger = new Logger(TapTestInterceptor.name);
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
return next.handle().pipe(tap((data) => {
// 这里是更新缓存的操作,这里模拟下
this.appService.getHello();
this.logger.log(`log something`, data);
}))
}
}
使用 map operator 来对 controller 返回的数据做一些修改:
import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common';
import { map, Observable } from 'rxjs';
@Injectable()
export class MapTestInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
return next.handle().pipe(map(data => {
return {
code: 200,
message: 'success',
data
}
}))
}
}
使用 catchError 处理抛出的异常:
import { CallHandler, ExecutionContext, Injectable, Logger, NestInterceptor } from '@nestjs/common';
import { catchError, Observable, throwError } from 'rxjs';
@Injectable()
export class CatchErrorTestInterceptor implements NestInterceptor {
private readonly logger = new Logger(CatchErrorTestInterceptor.name)
intercept (context: ExecutionContext, next: CallHandler): Observable<any> {
return next.handle().pipe(catchError(err => {
this.logger.error(err.message, err.stack)
return throwError(() => err)
}))
}
}
RxJS 和 Angular
RxJS 和 Vue
RxJS 和 React
RxJS 和 React 一样,实践的都是响应式编程的概念,从取名上就可以看出来。
React 版本(无 RxJS):
import React, { useState } from 'react';
// 傻瓜组件,无状态组件
const CounterView = ({ count, onIncrement, onDecrement }) => (
<div>
<h1>Count: {count}</h1>
<button onClick={onIncrement}>+</button>
<button onClick={onDecrement}>-</button>
</div>
);
// 聪明组件,有状态组件
const Counter = () => {
const [count, setCount] = useState(0);
const onIncrement = () => {
setCount(count + 1);
};
const onDecrement = () => {
setCount(count - 1);
};
return (
<CounterView
count={count}
onIncrement={onIncrement}
onDecrement={onDecrement}
/>
);
};
export default Counter;
React + RxJS 版本:
- 把 onIncrement 和 onDecrement 的函数调用转化为数据流中的数据。(Observable)
- 把数据流中的数据改变转变为对组件状态的修改。(Observer)
在 RxJS 中 Subject 能既扮演 Observable 又扮演 Observer 的角色。
import React, { useState, useEffect } from 'react';
import { Subject } from 'rxjs/Subject';
import { scan } from 'rxjs/operators';
const Counter = () => {
const [count, setCount] = useState(0);
// 创造了一个Subject对象
// 这个对象就是连接 RxJS 和 React 的纽带
const counter = new Subject();
useEffect(() => {
const observer = value => setCount(value);
// 利用 scan 累计 counter 中所有数据的总和
// scan 产生的 Observable 对象吐出的每个数据都通过 setCount 来修改当前组件的状态就可以
counter
.pipe(scan((result, inc) => result + inc, 0))
.subscribe(observer);
}, [counter]);
// 从 Observable 的角度,将 counter 代表的 Observable 定位成所有加减数字的数据流
// 当需要加 1 时,往 counter 里推送一个 1;当需要减 1 时,往 counter 里推送一个 -1
// 如果有需求改变想要加减其他的数值,那也只需要往 counter 里推送对应的正数或者负数就可以了
return (
<div>
<h1>{count}</h1>
<button onClick={() => counter.next(1)}>+</button>
<button onClick={() => counter.next(-1)}>-</button>
</div>
);
};
export default Counter;
Subject:
scan:https://rxjs.dev/api/index/function/scan
用于封装和管理状态。在使用 seed
值(第二个参数)或来自源的第一个值建立了初始状态之后,对来自源的每个值调用累加器(或“reducer 函数”)。
React 和 RxJS 高阶组件版本:
import React, { useState } from 'react';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import { scan } from 'rxjs/operators';
const CounterView = ({ count, onIncrement, onDecrement }) => (
<div>
<h1>Count: {count}</h1>
<button onClick={onIncrement}>+</button>
<button onClick={onDecrement}>-</button>
</div>
);
const useCounter = () => {
const [counter] = useState(() => new BehaviorSubject(0));
const count$ = counter.pipe(
scan((result, inc) => result + inc, 0)
);
const onIncrement = () => counter.next(1);
const onDecrement = () => counter.next(-1);
const [count, setCount] = useState(0);
count$.subscribe(value => setCount(value));
return { count, onIncrement, onDecrement };
};
const Counter = () => {
const { count, onIncrement, onDecrement } = useCounter();
return (
<CounterView
count={count}
onIncrement={onIncrement}
onDecrement={onDecrement} />
);
};
export default Counter;
BehaviorSubject:https://rxjs.dev/api/index/class/BehaviorSubject
Subject的一个变体,需要初始值,并在订阅时发出其当前值。
BehaviorSubject 可以指定一个“默认数据”,如果不给某个 BehaviorSubject 塞任何数据,每一个观察者在订阅 BehaviorSubject 的时候依然可以获得一个数据,这非常适合 Counter 这个应用的要求,因为计数器需要一个初始默认值为 0。
React 和 RxJS 版本的一个秒表⏱️:
import React, { useState, useEffect } from "react";
import { Subject, BehaviorSubject, interval, of, EMPTY
} from "rxjs";
import {
scan,
switchMap,
mergeMap,
map,
timeInterval,
take,
} from "rxjs/operators";
import padStart from "lodash/padStart";
const ms2Time = (milliseconds) => {
let ms = parseInt(milliseconds % 1000, 10);
let seconds = parseInt((milliseconds / 1000) % 60, 10);
let minutes = parseInt((milliseconds / (1000 * 60)) % 60, 10);
let hours = parseInt(milliseconds / (1000 * 60 * 60), 10);
return (
padStart(hours, 2, "0") +
":" +
padStart(minutes, 2, "0") +
":" +
padStart(seconds, 2, "0") +
"." +
padStart(ms, 3, "0")
);
};
const StopWatchView = ({ milliseconds, onStart, onStop, onReset }) => {
return (
<div>
<h1>{ms2Time(milliseconds)}</h1>
<button onClick={onStart}>开始</button>
<button onClick={onStop}>停止</button>
<button onClick={onReset}>重设</button>
</div>
);
};
const START = "start";
const STOP = "stop";
const RESET = "reset";
const StopWatch = () => {
const [milliseconds, setMilliseconds] = useState(0);
useEffect(() => {
// button 代表秒表上按钮点击动作的数据流
const button = new Subject();
// time$代表秒表当前应该展示的时间,无论哪个按钮被点击,都会打断time$原有的产生数据方式
const time$ = button.pipe(
switchMap((value) => {
switch (value) {
case START: {
// 当点击“开始”时,我们使用 interval 配合 scan 来产生累积递增的毫秒数,
// 精确度是 10 毫秒而不是 1,
// 是因为 JS 运行环境往往也不会达到毫秒级别的绝对精确。
return interval(10).pipe(
timeInterval(),
scan((result, ti) => result + ti.interval, 0)
);
}
case STOP:
return EMPTY;
case RESET:
return of(0);
default:
return throwError("Invalid value ", value);
}
})
);
const stopWatch = new BehaviorSubject(0);
const subscription = stopWatch
.pipe(
mergeMap((value) => time$),
map((value) => setMilliseconds(value))
)
.subscribe();
return () => subscription.unsubscribe();
}, []);
const onStop = () => setMilliseconds(STOP);
const onStart = () => setMilliseconds(START);
const onReset = () => setMilliseconds(RESET);
return (
<StopWatchView
milliseconds={milliseconds}
onStart={onStart}
onStop={onStop}
onReset={onReset}
/>
);
};
export default StopWatch;
三个按钮的点击操作当然可以看作数据流来看待。对于 StopWatchView,需要渲染 milliseconds 属性,而且这个 milliseconds 的序列也可以看作一个数据流,当点击开始之后,这个数据流应该是持续不断地产生新的数据;当点击停止之后,这个数据流就不应该再产生数据。
EMPTY:https://rxjs.dev/api/index/const/EMPTY
常量,一个简单的 Observable,不向 Observer 发出任何项,并立即发出完整的通知。
RxJS 和 Redux
Redux 原版:
// Store.js
import {createStore} from 'redux';
import reducer from './Reducer.js';
const initValues = {
count: 0
};
const store = createStore(reducer, initValues);
export default store;
Redux 和 RxJS 版本
// Store.js
// 不使用 Redux 的 createStore 来创造 Store 对象,
// 而是使用我们自己定制的 createReactiveStore
import createReactiveStore from './createReactiveStore';
import reducer from './Reducer.js';
const initValues = {
count: 0
};
const store = createReactiveStore(reducer, initValues);
export default store;
import { Subject } from 'rxjs';
import { scan, startWith, tap } from 'rxjs/operators';
const createReactiveStore = (reducer, initialState) => {
const action$ = new Subject();
let currentState = initialState;
const store$ = action$.pipe(
startWith(initialState),
scan(reducer),
tap((state) => {
currentState = state;
})
);
return {
dispatch: (action) => {
return action$.next(action);
},
getState: () => currentState,
subscribe: (func) => {
store$.subscribe(func);
},
};
};
export default createReactiveStore;
createReactiveStore 和 createStore 基本能达到一致的效果,但是还是有一点小小的功能差异,因为 createReactiveStore 依赖于 RxJS 的数据流,而数据流如果不被订阅的话,整个管道上每个环节的操作是不会运行的。
假设,在调用 createReactiveStore 产生的 Store 对象的 subscribe 之前,先利用这个 Store 的 dispatch 函数派送了 action 对象,是不会引起数据流操作的,所以对应的 currentState 也不会发生改变,这样,当晚些时候调用 Store 的 subscribe 时候,得到的状态就不是正确的结果。
为了克服这个问题,一定要保证 createReactiveStore 产生的 Store 对象第一时间被订阅,这并不是什么困难的事情,react-redux 的 connect 函数实际上就替我们做了对 Store 的订阅。如下:
import React from 'react';
import {connect} from 'react-redux';
import * as Actions from './Actions.js';
const CounterView = ({count, onIncrement, onDecrement}) => (
<div>
<h1>Count: {count}</h1>
<button onClick={onIncrement}>+</button>
<button onClick={onDecrement}>-</button>
</div>
);
function mapStateToProps(state, ownProps) {
return {
count: state.count
}
}
function mapDispatchToProps(dispatch, ownProps) {
return {
onIncrement: () => dispatch(Actions.increment()),
onDecrement: () => dispatch(Actions.decrement()),
}
}
const Counter = connect(
mapStateToProps,
mapDispatchToProps
)(CounterView);
export default Counter;
最后
其实 RxJS 最重要的部分就是操作符,但是由于操作符太多,也就不一一介绍了,可根据自己需求去官网找对应的操作符。找操作服的方法推荐去:https://rxjs.dev/operator-decision-tree
RxJS 是不是前端开发的未来?还请各位小伙伴做出自己的思考!’