首页 值得一看 🔍️

针对RxJS 是不是前端开发的未来? 这个问题,大家可以各抒己见!如果还不了解 RxJS,可以跟着本文一起大概的去学习一下,再做出自己的判断。

image.png

学习一项新的技术,我觉得首先应该是去先看官方文档,和 github 开源仓库等。先从本源上学习了解到设计者的思想和官方用法。自己学习完后,如有一些还不理解,可以找一些社区教程、书籍、文档等资源来辅助学习理解。这样才能保证尽可能从本源学习,有自己的所见所闻,得出自己的理解,不是人云亦云、口口相传的可能有错误的答案。

学习 RxJS 我的主要有三大步骤:

  1. 理解相关概念及思想
  2. 熟悉各种操作符
  3. 联想使用场景

github 地址:https://github.com/ReactiveX/rxjs

官方文档地址(建议直接看官方的):https://rxjs.dev/

中文社区文档地址(辅助阅读):https://cn.rx.js.org/manual/overview.html

中文文档(辅助阅读):https://rxjs.tech/

学习RxJS操作符和响应式编程原则: https://reactive.how/

RxJS 可视化理解: https://rxviz.com/

RxJS 简介

本篇文章是基于 V7.8.0

官方简介:JavaScript 的响应式扩展库

RxJS(Reactive Extensions for JavaScript) 是一个使用 Observables 进行响应式编程的库,可以更轻松地编写异步或基于回调的代码。

该项目是对 Reactive-Extensions/RxJS(RxJS 4) 的重写,具有更好的性能、更好的模块化、更好的可调试调用堆栈,同时保持大部分向后兼容,只有一些破坏性的变更(breaking changes)是为了减少外层的 API 。

RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable附属类型 (Observer、 Schedulers、 Subjects) 和受 Array 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。

Think of RxJS as Lodash for events.

image.png

RxJS 的 Logo 是鱼,是因为 RxJS 的概念和操作符可以被看作是一条鱼在游动,数据流就像是鱼在水中游动的路径。鱼的形象也代表了 RxJS 的灵活性和可扩展性,可以适应不同的应用场景和需求。此外,鱼也象征着 RxJS 的响应式编程思想,即数据流的变化会引起相应的响应和处理。

代表“流”的变量标示符,都是用 \$ 符号结尾,这是 RxJS 编程中普遍使用的风格,被称为“芬兰式命名法”(Finnish Notation)。

为什么选 RxJS

随着时代的发展,技术也在不断更新换代。

面向对象式编程 ——》 函数式编程

函数式编程就是非常强调使用函数来解决问题的一种编程方式。

函数式编程对函数的使用有一些特殊的要求,这些要求包括以下几点:

  • 声明式(Declarative)
  • 纯函数(Pure Function)
  • 数据不可变性(Immutability)

从语言角度讲,JavaScript 当然不算一个纯粹意义上的函数式编程语言,但是,JavaScript 中的函数有第一公民的身份,因为函数本身就是一个对象,可以被赋值给一个变量,可以作为参数传递,由此可以很方便地应用函数式编程的许多思想。

JavaScript 并不是纯粹的函数式编程语言,但是,通过应用一些编程规范,再借助一点工具的帮助,我们完全可以用 JavaScript 写出函数式的代码,RxJS 就是辅助我们写出函数式代码的一种工具。

指令式编程 ——》 响应式编程

RxJS 就是兼具函数式和响应式两种先进编程风格的框架。

RxJS 是一个组织异步逻辑的库,它有很多 operator,可以极大的简化异步逻辑的编写。

它是由数据源产生数据,经过一系列 operator 的处理,最后传给接收者。

但是 RxJS 的 operator 多呀,组合起来可以实现非常复杂的异步逻辑处理。

还能帮助我们解决一些问题:

  • 如何控制大量代码的复杂度
  • 如何保持代码可读性
  • 如何处理异步操作
  • 数据流抽象了很多现实问题

相关概念介绍

说明:以下部分主要出自官方文档(因为最好的概念介绍就是官方),对一些核心的概念、知识点做了一些总结(是摘录总结翻译,不是全盘抄录)。同时也是对官方文档意译文,帮助辅助中文式阅读。

Reactive Extension

Reactive Extension,也叫 ReactiveX,简称 Rx,是基于响应式的扩展,是各种语言实现的一个统称。

Rx 是一套通过可监听流来做异步编程的API。

image.png

ReactiveXObserver 模式Iterator 模式以及函数式编程集合相结合,以满足对管理事件序列的理想方式的需求。

Observer 模式

Iterator 模式

函数式编程

集合

RxJS 中解决异步事件管理的基本概念是:

  • Observable:表示一个可调用的未来值或事件的集合的概念。
  • Observer:是回调的集合,知道如何监听 Observable 传递的值。
  • Subscription:表示 Observable 的执行,主要用于取消执行。
  • Operators:采用函数式编程风格的纯函数,支持使用 mapfilterconcatreduce 等操作处理集合。
  • Subject:相当于一个 EventEmitter,是将一个值或事件多播给多个 Observers 的唯一途径。
  • Schedulers:是控制并发的集中式调度程序,允许我们在计算发生时进行协调,例如 setTimeoutrequestAnimationFrame 或其它。

Observable *

被观察者,用来产生消息/数据。

Observable Promise
使用场景 同步、异步均可使用 用 Promise 包裹的多数是异步场景
执行时机 声明式惰性执行,只有在订阅后才会执行 创建时就立即执行
执行次数 多次调用 subscribe 函数会执行多次 只有第一次执行,后续都是取值
流程控制 相较于 Promise 有更为全面的操作符 提供串行、并行的函数
错误处理 subscribe 函数捕获错误 .catch 捕获

Observable 是多个值的惰性推送集合。本质其实就是一个随时间不断产生数据的一个集合,称之为流更容易理解。

image.png

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});

// 订阅查看这些值的变化
console.log('just before subscribe');
observable.subscribe({
  next(x) {
    console.log('got value ' + x);
  },
  error(err) {
    console.error('something wrong occurred: ' + err);
  },
  complete() {
    console.log('done');
  },
});
console.log('just after subscribe');

// 控制台打印出的订阅数据
just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

Pull vs Push

pullpush 是两种不同的协议,用来描述数据生产者 (Producer) 如何与数据消费者 (Consumer) 进行通信的。

image.png

Observables 作为函数的泛化

Observables 像是没有参数, 但可以泛化为多个值的函数。

订阅 Observable 类似于调用函数。

Observables 能够同步或异步传递值。

Observable 剖析

Observable 的核心关注点:

  • 创建 Observables
  • 订阅 Observables
  • 执行 Observables
  • 清理 Observables

创建 Observables:

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  const id = setInterval(() => {
    subscriber.next('hi');
  }, 1000);
});

订阅 Observables

observable.subscribe((x) => console.log(x));
// 订阅一个 Observable 就像调用一个函数,在数据将被传送到的地方提供回调。

执行 Observables

在 Observable 执行中, 可能会发送零个到无穷多个 “Next” 通知。如果发送的是 “Error” 或 “Complete” 通知的话,那么之后不会再发送任何通知了。

import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  subscriber.next(4); // Is not delivered because it would violate the contract
});


const observable2 = new Observable(function subscribe(subscriber) {
  try {
    subscriber.next(1);
    subscriber.next(2);
    subscriber.next(3);
    subscriber.complete();
  } catch (err) {
    subscriber.error(err); // delivers an error if it caught one
  }
});

清理 Observables

import { from } from 'rxjs';

const observable = from([10, 20, 30]);
const subscription = observable.subscribe((x) => console.log(x));
// Later:
subscription.unsubscribe();
// 当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。
// 只要调用 `unsubscribe()` 方法就可以取消执行。
import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
  // Keep track of the interval resource
  const intervalId = setInterval(() => {
    subscriber.next('hi');
  }, 1000);

  // Provide a way of canceling and disposing the interval resource
  return function unsubscribe() {
    clearInterval(intervalId);
  };
});
function subscribe(subscriber) {
  const intervalId = setInterval(() => {
    subscriber.next('hi');
  }, 1000);

  return function unsubscribe() {
    clearInterval(intervalId);
  };
}

const unsubscribe = subscribe({ next: (x) => console.log(x) });

// Later:
unsubscribe(); // dispose the resources

Observer

从行为上来看,无非就是定义了如何处理上述流产生的数据,称之为流的处理方法。

const observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

Operators

观察者,用来消费消息/数据。

Subscription *

Subscription 是表示可清理资源的对象,它是由 Observable 执行之后产生的。

本质就是暂存了一个启动后的流,每一个启动后的流都是相互独立的,而这个启动后的流,就存储在subscription中,提供了unsubscribe,来停止这个流。

Subject *

Subject是一类特殊的Observable,它可以向多个Observer多路推送数值。

Observable Subject
角色 生产者(单向) 生产者、消费者(双向)
消费策略 单播 多播
流转方式 内部发送/接收数据 外部发送/接收数据
数据特性 冷数据流 热数据流
消费时机 调用 subscribe 调用 next

Scheduler

简单 demo

推荐在 https://stackblitz.com/ 练习。

image.png

事件监听器

// 以前
document.addEventListener('click', () => console.log('我被“惦记”了!'));

// 现在
import { fromEvent } from 'rxjs';

fromEvent(document, 'click').subscribe(() => console.log('我被“惦记”了!'));

Purity

使 RxJS 强大的是它使用纯函数产生值的能力。这意味着您的代码不太容易出错。

所谓纯函数,指的是满足下面两个条件的函数:

  • 函数的执行过程完全由输入参数决定,不会受除参数之外的任何数据影响。
  • 函数不会修改任何外部状态,比如修改全局变量或传入的参数对象。
// 你的其他代码部分可能会弄乱你的状态
let count = 0;
document.addEventListener('click', () => console.log(`我被“惦记”了 ${++count} 次`));

// 使用 RxJS 可以隔离状态
import { fromEvent, scan } from 'rxjs';

fromEvent(document, 'click')
  .pipe(scan((count) => count + 1, 0))
  .subscribe((count) => console.log(`我被“惦记”了 ${count} 次`));

// scan 运算符的工作方式与数组的 reduce 类似。
// 它接受一个暴露给回调的值。回调的返回值将成为下次运行回调时下一个暴露的值。

Flow

RxJS 有一整套运算符,可以帮助您控制事件如何通过 observables 流动。

// 每秒最多点击一次的方式
let count = 0;
let rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', () => {
  if (Date.now() - lastClick >= rate) {
    console.log(`我被“惦记”了 ${++count} 次`);
    lastClick = Date.now();
  }
});

// 
import { fromEvent, throttleTime, scan } from 'rxjs';

fromEvent(document, 'click')
  .pipe(
    throttleTime(1000),
    scan((count) => count + 1, 0)
  )
  .subscribe((count) => console.log(`我被“惦记”了 ${count} 次`));

Values

// 在纯 JavaScript 中为每次点击添加当前鼠标 x 位置
let count = 0;
const rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', (event) => {
  if (Date.now() - lastClick >= rate) {
    count += event.clientX;
    console.log(count);
    lastClick = Date.now();
  }
});

// 
import { fromEvent, throttleTime, map, scan } from 'rxjs';

fromEvent(document, 'click')
  .pipe(
    throttleTime(1000),
    map((event) => event.clientX),
    scan((count, clientX) => count + clientX, 0)
  )
  .subscribe((count) => console.log(count));

场景介绍

都有哪些场景能用到 RxJS 呢,以下举出了一些场景例子。供大家参考,其实还有更广泛的场景,还望大家自己发觉。

异步场景

  • AJAX / XHR(XMLHttpRequest) / fetch API
  • Service Worker / Node Stream
  • setTimeout / setInterval
  • Promise

事件场景

  • 各种 DOM 事件(click,dbclick,keyup、keydown…)
  • css3 动画事件(transition)
  • html5 Geolocation
  • WebSocket / Server-Sent Events

微前端通信

// 主应用 main.js 部分内容
import { Subject } from 'rxjs' // 按需引入减少依赖包大小

const pager = new Subject()

// 在主应用注册呼机监听器,这里可以监听到其他应用的广播
pager.subscribe(v => { 
  console.log(`main主应用监听到子应用${v.from}发来消息:`, v)
  // store.dispatch('app/setToken', v.token) // 这里处理主应用监听到改变后的逻辑
  // rxjs 事件
  qkEvent.toggleSidebar(v)
  qkEvent.generateHeaderNavDoc(v)
  qkEvent.generateDropdownMenu(v)
})

// 结合下章主应用下发资源给子应用,将pager作为一个模块传入子应用
const msg = { 
  // data: store.getters, // 从主应用仓库读出的数据
  data: {},
  // components: LibraryUi, // 从主应用读出的组件库
  // utils: LibraryJs, // 从主应用读出的工具类库
  // emitFnc: childEmit, // 从主应用下发emit函数来收集子应用反馈
  pager // 从主应用下发应用间通信呼机
}

export const qiankunRegisterMicroApps = [
  {
    name: FUSANG_NAME,
    entry: qiankunEntryConfig(FUSANG_NAME),
    container: '#appContainer',
    activeRule: '/ops-fe',
    loader (loading) {
    },
    props: { store, pager: msg, parentRoute: router }
  },
]
// 子应用
// 引入主应用Subject实例
if (props.pager) {
    Vue.prototype.$pager = props.pager
}

// 使用
this.$pager && this.$pager.next({
  from: 'logcenter-fe', // 从哪来 子应用的名字
  event: 'sendDoc', // 跳转文档
  data: [{ path: 'https://xxxx.com/docs/logcenter/', target: '_blank' }] 
  // 传什么数据
})

实现一个批量请求函数 multiRequest(urls, maxNum)

要求如下:

  • 要求最大并发数 maxNum
  • 每当有一个请求返回,就留下一个空位,可以增加新的请求
  • 所有请求完成后,结果按照 urls 里面的顺序依次打出
// promise
function multiRequest(urls = [], maxNum) {
  // 请求总数量
  const len = urls.length;
  // 根据请求数量创建一个数组来保存请求的结果
  const result = new Array(len).fill(false);
  // 当前完成的数量
  let count = 0;

  return new Promise((resolve, reject) => {
    // 请求maxNum个
    while (count < maxNum) {
      next();
    }
    function next() {
      let current = count++;
      // 处理边界条件
      if (current >= len) {
        // 请求全部完成就将promise置为成功状态, 然后将result作为promise值返回
        !result.includes(false) && resolve(result);
        return;
      }
      const url = urls[current];
      console.log(`开始 ${current}`, new Date().toLocaleString());
      fetch(url)
        .then((res) => {
          // 保存请求结果
          result[current] = res;
          console.log(`完成 ${current}`, new Date().toLocaleString());
          // 请求没有全部完成, 就递归
          if (current < len) {
            next();
          }
        })
        .catch((err) => {
          console.log(`结束 ${current}`, new Date().toLocaleString());
          result[current] = err;
          // 请求没有全部完成, 就递归
          if (current < len) {
            next();
          }
        });
    }
  });
}
// RxJS

// 假设这是你的http请求函数
function httpGet(url) {
  return new Promise(resolve => setTimeout(() => resolve(`Result: ${url}`), 2000));
}

const array = [
  'https://httpbin.org/ip', 
  'https://httpbin.org/user-agent',
  'https://httpbin.org/delay/3',
];

// mergeMap 是专门用来处理并发处理的 rxjs 操作符
// mergeMap 第二个参数2的意思是,from(array)每次并发量是2,只有promise执行结束才接着取array里面的数据
// mergeMap第一个参数 httpGet的意思是每次并发,从from(array)中取的数据如何包装,这里是作为httpGet的参数
const source = from(array)
    .pipe(mergeMap(httpGet, 2))
    .subscribe(val => console.log(val));

RxJS 和 Nest

nest 的 interceptor 就用了 rxjs 来处理响应,但常用的 operator 也就几个:

  • tap: 不修改响应数据,执行一些额外逻辑,比如记录日志、更新缓存等
  • map:对响应数据做修改,一般都是改成 {code, data, message} 的格式
  • catchError:在 exception filter 之前处理抛出的异常,可以记录或者抛出别的异常
  • timeout:处理响应超时的情况,抛出一个 TimeoutError,配合 catchErrror 可以返回超时的响应

使用 tap operator 来添加一些日志、缓存等逻辑:

import { AppService } from './app.service';
import { CallHandler, ExecutionContext, Injectable, Logger, NestInterceptor } from '@nestjs/common';
import { Observable, tap } from 'rxjs';

@Injectable()
export class TapTestInterceptor implements NestInterceptor {
  constructor(private appService: AppService) {}

  private readonly logger = new Logger(TapTestInterceptor.name);

  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
    return next.handle().pipe(tap((data) => {

      // 这里是更新缓存的操作,这里模拟下
      this.appService.getHello();

      this.logger.log(`log something`, data);
    }))
  }
}

使用 map operator 来对 controller 返回的数据做一些修改:

import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common';
import { map, Observable } from 'rxjs';

@Injectable()
export class MapTestInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
    return next.handle().pipe(map(data => {
      return {
        code: 200,
        message: 'success',
        data
      }
    }))
  }
}

使用 catchError 处理抛出的异常:

import { CallHandler, ExecutionContext, Injectable, Logger, NestInterceptor } from '@nestjs/common';
import { catchError, Observable, throwError } from 'rxjs';

@Injectable()
export class CatchErrorTestInterceptor implements NestInterceptor {
  private readonly logger = new Logger(CatchErrorTestInterceptor.name)

  intercept (context: ExecutionContext, next: CallHandler): Observable<any> {
    return next.handle().pipe(catchError(err => {
      this.logger.error(err.message, err.stack)
      return throwError(() => err)
    }))
  }
}

RxJS 和 Angular

RxJS 和 Vue

RxJS 和 React

RxJS 和 React 一样,实践的都是响应式编程的概念,从取名上就可以看出来。

React 版本(无 RxJS):

import React, { useState } from 'react';

// 傻瓜组件,无状态组件
const CounterView = ({ count, onIncrement, onDecrement }) => (
  <div>
    <h1>Count: {count}</h1>
    <button onClick={onIncrement}>+</button>
    <button onClick={onDecrement}>-</button>
  </div>
);

// 聪明组件,有状态组件
const Counter = () => {
  const [count, setCount] = useState(0);

  const onIncrement = () => {
    setCount(count + 1);
  };

  const onDecrement = () => {
    setCount(count - 1);
  };

  return (
    <CounterView
      count={count}
      onIncrement={onIncrement}
      onDecrement={onDecrement}
    />
  );
};

export default Counter;

React + RxJS 版本:

  1. 把 onIncrement 和 onDecrement 的函数调用转化为数据流中的数据。(Observable)
  2. 把数据流中的数据改变转变为对组件状态的修改。(Observer)

在 RxJS 中 Subject 能既扮演 Observable 又扮演 Observer 的角色。

import React, { useState, useEffect } from 'react';
import { Subject } from 'rxjs/Subject';
import { scan } from 'rxjs/operators';

const Counter = () => {
  const [count, setCount] = useState(0);

  // 创造了一个Subject对象
  // 这个对象就是连接 RxJS 和 React 的纽带
  const counter = new Subject();

  useEffect(() => {
    const observer = value => setCount(value);

    // 利用 scan 累计 counter 中所有数据的总和
    // scan 产生的 Observable 对象吐出的每个数据都通过 setCount 来修改当前组件的状态就可以
    counter
        .pipe(scan((result, inc) => result + inc, 0))
        .subscribe(observer);
  }, [counter]);

  // 从 Observable 的角度,将 counter 代表的 Observable 定位成所有加减数字的数据流
  // 当需要加 1 时,往 counter 里推送一个 1;当需要减 1 时,往 counter 里推送一个 -1
  // 如果有需求改变想要加减其他的数值,那也只需要往 counter 里推送对应的正数或者负数就可以了
  return (
    <div>
      <h1>{count}</h1>
      <button onClick={() => counter.next(1)}>+</button>
      <button onClick={() => counter.next(-1)}>-</button>
    </div>
  );
};

export default Counter;

Subject:

scan:https://rxjs.dev/api/index/function/scan

用于封装和管理状态。在使用 seed 值(第二个参数)或来自源的第一个值建立了初始状态之后,对来自源的每个值调用累加器(或“reducer 函数”)。

image.png

React 和 RxJS 高阶组件版本:

import React, { useState } from 'react';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import { scan } from 'rxjs/operators';

const CounterView = ({ count, onIncrement, onDecrement }) => (
  <div>
    <h1>Count: {count}</h1>
    <button onClick={onIncrement}>+</button>
    <button onClick={onDecrement}>-</button>
  </div>
);

const useCounter = () => {
  const [counter] = useState(() => new BehaviorSubject(0));

  const count$ = counter.pipe(
    scan((result, inc) => result + inc, 0)
  );

  const onIncrement = () => counter.next(1);
  const onDecrement = () => counter.next(-1);

  const [count, setCount] = useState(0);

  count$.subscribe(value => setCount(value));

  return { count, onIncrement, onDecrement };
};

const Counter = () => {
  const { count, onIncrement, onDecrement } = useCounter();

  return (
      <CounterView
          count={count}
          onIncrement={onIncrement}
          onDecrement={onDecrement} />
  );
};

export default Counter;

BehaviorSubject:https://rxjs.dev/api/index/class/BehaviorSubject

Subject的一个变体,需要初始值,并在订阅时发出其当前值。

BehaviorSubject 可以指定一个“默认数据”,如果不给某个 BehaviorSubject 塞任何数据,每一个观察者在订阅 BehaviorSubject 的时候依然可以获得一个数据,这非常适合 Counter 这个应用的要求,因为计数器需要一个初始默认值为 0。

React 和 RxJS 版本的一个秒表⏱️:

import React, { useState, useEffect } from "react";

import { Subject, BehaviorSubject, interval, of, EMPTY
} from "rxjs";
import {
  scan,
  switchMap,
  mergeMap,
  map,
  timeInterval,
  take,
} from "rxjs/operators";
import padStart from "lodash/padStart";

const ms2Time = (milliseconds) => {
  let ms = parseInt(milliseconds % 1000, 10);
  let seconds = parseInt((milliseconds / 1000) % 60, 10);
  let minutes = parseInt((milliseconds / (1000 * 60)) % 60, 10);
  let hours = parseInt(milliseconds / (1000 * 60 * 60), 10);

  return (
    padStart(hours, 2, "0") +
    ":" +
    padStart(minutes, 2, "0") +
    ":" +
    padStart(seconds, 2, "0") +
    "." +
    padStart(ms, 3, "0")
  );
};

const StopWatchView = ({ milliseconds, onStart, onStop, onReset }) => {
  return (
    <div>
      <h1>{ms2Time(milliseconds)}</h1>
      <button onClick={onStart}>开始</button>
      <button onClick={onStop}>停止</button>
      <button onClick={onReset}>重设</button>
    </div>
  );
};

const START = "start";
const STOP = "stop";
const RESET = "reset";

const StopWatch = () => {
  const [milliseconds, setMilliseconds] = useState(0);

  useEffect(() => {
    // button 代表秒表上按钮点击动作的数据流
    const button = new Subject();

    // time$代表秒表当前应该展示的时间,无论哪个按钮被点击,都会打断time$原有的产生数据方式
    const time$ = button.pipe(
      switchMap((value) => {
        switch (value) {
          case START: {
             // 当点击“开始”时,我们使用 interval 配合 scan 来产生累积递增的毫秒数,
             // 精确度是 10 毫秒而不是 1,
             // 是因为 JS 运行环境往往也不会达到毫秒级别的绝对精确。
            return interval(10).pipe(
              timeInterval(),
              scan((result, ti) => result + ti.interval, 0)
            );
          }
          case STOP:
            return EMPTY;
          case RESET:
            return of(0);
          default:
            return throwError("Invalid value ", value);
        }
      })
    );

    const stopWatch = new BehaviorSubject(0);

    const subscription = stopWatch
      .pipe(
        mergeMap((value) => time$),
        map((value) => setMilliseconds(value))
      )
      .subscribe();

    return () => subscription.unsubscribe();
  }, []);

  const onStop = () => setMilliseconds(STOP);
  const onStart = () => setMilliseconds(START);
  const onReset = () => setMilliseconds(RESET);

  return (
    <StopWatchView
      milliseconds={milliseconds}
      onStart={onStart}
      onStop={onStop}
      onReset={onReset}
    />
  );
};

export default StopWatch;

三个按钮的点击操作当然可以看作数据流来看待。对于 StopWatchView,需要渲染 milliseconds 属性,而且这个 milliseconds 的序列也可以看作一个数据流,当点击开始之后,这个数据流应该是持续不断地产生新的数据;当点击停止之后,这个数据流就不应该再产生数据。

EMPTY:https://rxjs.dev/api/index/const/EMPTY

常量,一个简单的 Observable,不向 Observer 发出任何项,并立即发出完整的通知。

RxJS 和 Redux

Redux 原版:

// Store.js
import {createStore} from 'redux';
import reducer from './Reducer.js';

const initValues = {
  count: 0
};
const store = createStore(reducer, initValues);

export default store;

Redux 和 RxJS 版本

// Store.js
// 不使用 Redux 的 createStore 来创造 Store 对象,
// 而是使用我们自己定制的 createReactiveStore
import createReactiveStore from './createReactiveStore';
import reducer from './Reducer.js';

const initValues = {
  count: 0
};

const store = createReactiveStore(reducer, initValues);

export default store;
import { Subject } from 'rxjs';
import { scan, startWith, tap } from 'rxjs/operators';

const createReactiveStore = (reducer, initialState) => {
  const action$ = new Subject();
  let currentState = initialState;

  const store$ = action$.pipe(
    startWith(initialState),
    scan(reducer),
    tap((state) => {
      currentState = state;
    })
  );

  return {
    dispatch: (action) => {
      return action$.next(action);
    },
    getState: () => currentState,
    subscribe: (func) => {
      store$.subscribe(func);
    },
  };
};

export default createReactiveStore;

createReactiveStore 和 createStore 基本能达到一致的效果,但是还是有一点小小的功能差异,因为 createReactiveStore 依赖于 RxJS 的数据流,而数据流如果不被订阅的话,整个管道上每个环节的操作是不会运行的。

假设,在调用 createReactiveStore 产生的 Store 对象的 subscribe 之前,先利用这个 Store 的 dispatch 函数派送了 action 对象,是不会引起数据流操作的,所以对应的 currentState 也不会发生改变,这样,当晚些时候调用 Store 的 subscribe 时候,得到的状态就不是正确的结果。

为了克服这个问题,一定要保证 createReactiveStore 产生的 Store 对象第一时间被订阅,这并不是什么困难的事情,react-redux 的 connect 函数实际上就替我们做了对 Store 的订阅。如下:

import React from 'react';
import {connect} from 'react-redux';

import * as Actions from './Actions.js';

const CounterView = ({count, onIncrement, onDecrement}) => (
  <div>
    <h1>Count: {count}</h1>
    <button onClick={onIncrement}>+</button>
    <button onClick={onDecrement}>-</button>
  </div>
);

function mapStateToProps(state, ownProps) {
  return {
    count: state.count
  }
}

function mapDispatchToProps(dispatch, ownProps) {
  return {
    onIncrement: () => dispatch(Actions.increment()),
    onDecrement: () => dispatch(Actions.decrement()),
  }
}

const Counter = connect(
    mapStateToProps,
    mapDispatchToProps
)(CounterView);

export default Counter;

最后

其实 RxJS 最重要的部分就是操作符,但是由于操作符太多,也就不一一介绍了,可根据自己需求去官网找对应的操作符。找操作服的方法推荐去:https://rxjs.dev/operator-decision-tree

RxJS 是不是前端开发的未来?还请各位小伙伴做出自己的思考!’



文章评论

未显示?请点击刷新