RxJS 应用之流式读取文件

最后更新日期:2018-02-06

  问题描述:我们要在浏览器读取用户的文件并计算其 sha1 hash ,但是用户文件可能很大,全部读取会占用很多内存,如何流式读取并流式计算 hash?

  解决方案:RxJS to the rescue!

  首先我们要用到 HTML5 的 FileReader API ,这个 API 是异步的,下面这个函数将一个 Blob 变成 Promise[ArrayBuffer]

/**
 * 读取一个 Blob
 *
 * @return Promise[ArrayBuffer]
 */
function readAsArrayBuffer(blob) {
    const reader = new FileReader();
    const p = new Promise((resolve, reject) => {
        reader.onload = function(e) {
            resolve(e.target.result);
        };
        reader.onerror = function(e) {
            reject(e.target.error);
        };
    });
    reader.readAsArrayBuffer(blob);
    return p;
}

  要流式读取,首先想到的是将这个过程抽象成一个 generator ,每次 yield 出来一个分片,但 generator 的 yield 必须是同步返回一个值,要想异步返回一个值,最好的抽象显然是 Observable

  或者说:Observable = List[Promise]:一个 Observable 就是一组 Promise 构成的序列(但需要注意的是 Observable 可能是无限长的)。

  下面这个函数会按照指定的分片大小(chunkSize)读取一个 Blob ,每次返回一个分片:

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/from';
import 'rxjs/add/operator/concatMap';

/**
 * 生成从 start 到 end 且步长为 step 的序列
 */
function* range(start, end, step) {
    for (let i = start; i < end; i += step) {
        yield i;
    }
}

/**
 * 分片读取 Blob
 *
 * @return Observable[ArrayBuffer]
 */
function readAsObservable(blob, chunkSize) {
    return Observable.from(range(0, blob.size, chunkSize)).concatMap(start => {
        const endOffset = Math.min(start + chunkSize, blob.size);
        return Observable.from(readAsArrayBuffer(blob.slice(start, endOffset)));
    });
}

  目前主流的 js hash 库(如 crypto / CryptoJS)都是支持流式计算的,那么使用的时候就简单了,只需要指定 3 个回调函数即可:

import CryptoJS from 'crypto-js/core';
import WordArray from 'crypto-js/lib-typedarrays';
import 'crypto-js/sha1';

import { readAsObservable } from '../../utils/fileutil';

// 流式读取文件内容并计算 SHA1
const CHUNK_SIZE = 8 * 1024 * 1024; // 计算 hash 时采用 8MB 的分片
const sha1Promise = new Promise((resolve, reject) => {
    const hash = CryptoJS.algo.SHA1.create();
    readAsObservable(file, CHUNK_SIZE).subscribe(
        buf => {
            hash.update(WordArray.create(buf));
        },
        err => {
            console.error(err);
            reject(err);
        },
        () => {
            const res = hash.finalize().toString();
            resolve(res);
        }
    );
});

const sha1 = await sha1Promise;

  以上代码基于 rxjs 5.5.6。