NodeJs中的stream(流)- 基础篇
  • <dl id="ebcgd"></dl>
    <ol id="ebcgd"><source id="ebcgd"></source></ol>

    <b id="ebcgd"><fieldset id="ebcgd"><canvas id="ebcgd"></canvas><ins id="ebcgd"><progress id="ebcgd"></progress></ins></fieldset></b><sub id="ebcgd"></sub>

      <noscript id="ebcgd"></noscript><i id="ebcgd"><ruby id="ebcgd"></ruby></i>

        <bdo id="ebcgd"><sup id="ebcgd"><div id="ebcgd"><bdo id="ebcgd"></bdo></div></sup></bdo>

      1. <dd id="ebcgd"></dd>
            1. <map id="ebcgd"><embed id="ebcgd"><dd id="ebcgd"><source id="ebcgd"></source></dd><i id="ebcgd"><textarea id="ebcgd"><i id="ebcgd"><option id="ebcgd"><rp id="ebcgd"></rp></option></i></textarea></i><embed id="ebcgd"></embed></embed></map><noscript id="ebcgd"></noscript>

              <p id="ebcgd"><audio id="ebcgd"><map id="ebcgd"><i id="ebcgd"></i><canvas id="ebcgd"></canvas><tbody id="ebcgd"></tbody></map><blockquote id="ebcgd"></blockquote><map id="ebcgd"><figcaption id="ebcgd"><bdo id="ebcgd"></bdo></figcaption></map></audio><cite id="ebcgd"></cite></p>

              NodeJs中的stream(流)- 基础篇

              作者:第一期架构课学员日期:2018-04-08 18:20:02 点击:249

              一、什么是Stream(流)

              流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。

              流是可读的、可写的,或是可读写的。

              二、NodeJs中的Stream的几种类型

              Node.js 中有四种基本的流类型:

              • Readable - 可读的流(fs.createReadStream())
              • Writable - 可写的流(fs.createWriteStream())
              • Duplex - 可读写的流(net.Socket)
              • Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate())

              NodeJs中关于流的操作被封装到了Stream模块中,这个模块也被多个核心模块所引用。

              const stream = require('stream');

              在 NodeJS 中对文件的处理多数使用流来完成

              • 普通文件
              • 设备文件(stdin、stdout)
              • 网络文件(http、net)

              注:在NodeJs中所有的Stream(流)都是EventEmitter的实例

              Example:

              1.将1.txt的文件内容读取为流数据

              const fs = require('fs');
               
              // 创建一个可读流(生产者)
              let rs = fs.createReadStream('./1.txt');

              通过fs模块提供的createReadStream()可以轻松创建一个可读的文件流。但我们并有直接使用Stream模块,因为fs模块内部已经引用了Stream模块并做了封装。所以说 流(stream)在 Node.js 中是处理流数据的抽象接口,提供了基础Api来构建实现流接口的对象。

              var rs = fs.createReadStream(path,[options]);

              1.path 读取文件的路径

              2.options

              • flags打开文件的操作, 默认为’r’
              • mode 权限位 0o666
              • encoding默认为null
              • start开始读取的索引位置
              • end结束读取的索引位置(包括结束位置)
              • highWaterMark读取缓存区默认的大小64kb

              Node.js 提供了多种流对象。 例如:

              • HTTP 请求 (request response)
              • process.stdout 就都是流的实例。

              2.创建可写流(消费者)处理可读流

              将1.txt的可读流 写入到2.txt文件中 这时我们需要一个可写流

              const fs = require('fs');
              // 创建一个可写流
              let ws = fs.createWriteStream('./2.txt');
              // 通过pipe让可读流流入到可写流 写入文件
              rs.pipe(ws);
              var ws = fs.createWriteStream(path,[options]);

              1.path 读取文件的路径

              2.options

              • flags打开文件的操作, 默认为’w’
              • mode 权限位 0o666
              • encoding默认为utf8
              • autoClose:true是否自动关闭文件
              • highWaterMark读取缓存区默认的大小16kb

              pipe 它是Readable流的方法,相当于一个”管道”,数据必须从上游 pipe 到下游,也就是从一个 readable 流 pipe 到 writable 流。
              后续将深入将介绍pipe。




               

              如上图,我们把文件比作装水的桶,而水就是文件里的内容,我们用一根管子(pipe)连接两个桶使得水从一个桶流入另一个桶,这样就慢慢的实现了大文件的传输过程。

              三、为什么应该使用 Stream

              当有用户在线看视频,假定我们通过HTTP请求返回给用户视频内容

              const http = require('http');
              const fs = require('fs');
               
              http.createServer((req, res) => {
              fs.readFile(videoPath, (err, data) => {
              res.end(data);
              });
              }).listen(8080);

              但这样有两个明显的问题

              1.视频文件需要全部读取完,才能返回给用户,这样等待时间会很长
              2.视频文件一次全放入内存中,内存吃不消

              用流可以将视频文件一点一点读到内存中,再一点一点返回给用户,读一部分,写一部分。(利用了 HTTP 协议的 Transfer-Encoding: chunked 分段传输特性),用户体验得到优化,同时对内存的开销明显下降

              const http = require('http');
              const fs = require('fs');
               
              http.createServer((req, res) => {
              fs.createReadStream(videoPath).pipe(res);
              }).listen(8080);

              四、可读流(Readable Stream)

              可读流(Readable streams)是对提供数据的源头(source)的抽象。

              例如:

              • HTTP responses, on the client
              • HTTP requests, on the server
              • fs read streams
              • TCP sockets
              • process.stdin

              所有的 Readable 都实现了 stream.Readable 类定义的接口。

              可读流的两种模式(flowing 和 paused)

              1.在 flowing 模式下, 可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用。

              2.在 paused 模式下,必须显式调用 stream.read()方法来从流中读取数据片段。

              所有初始工作模式为paused的Readable流,可以通过下面三种途径切换为flowing模式:

              • 监听’data’事件
              • 调用stream.resume()方法
              • 调用stream.pipe()方法将数据发送到Writable

              流动模式flowing

              流切换到流动模式 监听data事件

              const rs = fs.createReadStream('./1.txt');
              const ws = fs.createWriteStream('./2.txt');
              rs.on('data', chunk => {
              ws.write(chunk);
              });
              ws.on('end', () => {
              ws.end();
              });

              如果写入的速度跟不上读取的速度,有可能导致数据丢失。正常的情况应该是,写完一段,再读取下一段,如果没有写完的话,就让读取流先暂停,等写完再继续。

              var fs = require('fs');
              // 读取highWaterMark(3字节)数据,读完之后填充缓存区,然后触发data事件
              var rs = fs.createReadStream(sourcePath, {
              highWaterMark: 3
              });
              var ws = fs.createWriteStream(destPath, {
              highWaterMark: 3
              });
               
              rs.on('data', function(chunk) { // 当有数据流出时,写入数据
              if (ws.write(chunk) === false) { // 如果没有写完,暂停读取流
              rs.pause();
              }
              });
               
              ws.on('drain', function() { // 缓冲区清空触发drain事件 这时再继续读取
              rs.resume();
              });
               
              rs.on('end', function() { // 当没有数据时,关闭数据流
              ws.end();
              });

              或者使用更直接的pipe

              fs.createReadStream(sourcePath).pipe(fs.createWriteStream(destPath));

              暂停模式paused

              1.在流没有 pipe() 时,调用 pause() 方法可以将流暂停
              2.pipe() 时,需要移除所有 data 事件的监听,再调用 unpipe() 方法

              read(size)

              流在暂停模式下需要程序显式调用 read() 方法才能得到数据。read() 方法会从内部缓冲区中拉取并返回若干数据,当没有更多可用数据时,会返回null。read()不会触发’data’事件。

              使用 read() 方法读取数据时,如果传入了 size 参数,那么它会返回指定字节的数据;当指定的size字节不可用时,则返回null。如果没有指定size参数,那么会返回内部缓冲区中的所有数据。
              NodeJS 为我们提供了一个 readable 的事件,事件在可读流准备好数据的时候触发,也就是先监听这个事件,收到通知又数据了我们再去读取就好了:

              const fs = require('fs');
              rs = fs.createReadStream(sourcePath);
               
              // 当你监听 readable事件的时候,会进入暂停模式
              rs.on('readable', () => {
              console.log(rs._readableState.length);
              // read如果不加参数表示读取整个缓存区数据
              // 读取一个字段,如果可读流发现你要读的字节小于等于缓存字节大小,则直接返回
              let ch = rs.read(1);
              });

              暂停模式 缓存区的数据以链表的形式保存在BufferList中

              五、可写流(Writable Stream)

              可写流是对数据流向设备的抽象,用来消费上游流过来的数据,通过可写流程序可以把数据写入设备,常见的是本地磁盘文件或者 TCP、HTTP 等网络响应。

              Writable 的例子包括了:

              • HTTP requests, on the client
              • HTTP responses, on the server
              • fs write streams
              • zlib streams
              • crypto streams
              • TCP sockets
              • child process stdin
              • process.stdout, process.stderr

              所有 Writable 流都实现了 stream.Writable 类定义的接口。

              process.stdin.pipe(process.stdout);

              process.stdout 是一个可写流,程序把可读流 process.stdin 传过来的数据写入的标准输出设备。在了解了可读流的基础上理解可写流非常简单,流就是有方向的数据,其中可读流是数据源,可写流是目的地,中间的管道环节是双向流。

              可写流使用

              调用可写流实例的 write() 方法就可以把数据写入可写流

              const fs = require('fs');
              const rs = fs.createReadStream(sourcePath);
              const ws = fs.createWriteStream(destPath);
               
              rs.setEncoding('utf-8'); // 设置编码格式
              rs.on('data', chunk => {
              ws.write(chunk); // 写入数据
              });

              监听了可读流的 data 事件就会使可读流进入流动模式,我们在回调事件里调用了可写流的 write() 方法,这样数据就被写入了可写流抽象的设备destPath中。

              write() 方法有三个参数

              • chunk {String| Buffer},表示要写入的数据
              • encoding 当写入的数据是字符串的时候可以设置编码
              • callback 数据被写入之后的回调函数

              ‘drain’事件

              如果调用 stream.write(chunk) 方法返回 false,表示当前缓存区已满,流将在适当的时机(缓存区清空后)触发 ‘drain

              const fs = require('fs');
              const rs = fs.createReadStream(sourcePath);
              const ws = fs.createWriteStream(destPath);
               
              rs.setEncoding('utf-8'); // 设置编码格式
              rs.on('data', chunk => {
              let flag = ws.write(chunk); // 写入数据
              if (!flag) { // 如果缓存区已满暂停读取
              rs.pause();
              }
              });
               
              ws.on('drain', () => {
              rs.resume(); // 缓存区已清空 继续读取写入
              });

              六、总结

              stream(流)分为可读流(flowing mode 和 paused mode)、可写流、可读写流,Node.js 提供了多种流对象。 例如, HTTP 请求 和 process.stdout 就都是流的实例。stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。它们底层都调用了stream模块并进行封装。

              后续我们将继续对stream深入解析以及Readable Writable pipe的实现

              <

              上一篇: Promise 详解与实现(遵循Promise/A+规范)

              下一篇: 理解TCP/IP协议(一)

              73099威尼斯城73099.com |威尼斯73099.com官方网站登录 |威尼斯手机娱乐官网 | |手机版 | | 澳门威尼斯3544.com,威尼斯3544.com官网登陆,威尼斯3544.com备用网址-0327.com|威尼斯澳门官方6799.com,www.6799.com,威尼斯官方娱乐6799.com登陆-vns566.net|6799.com威尼斯手机版,6799.com威尼斯网址,6799.com威尼斯官网登陆-59859.com|86087威尼斯城86087.com,威尼斯86087.com官方网站登录,威尼斯手机娱乐官网-83855.com|澳门威利斯人手机版,www.86087.com官方入口,澳门威利斯人在线娱乐-944185.com|威尼斯澳门官方4886.com,www.4886.com,威尼斯官方娱乐4886.com登陆-k92988.com|