<span id="mktg5"></span>

<i id="mktg5"><meter id="mktg5"></meter></i>

        <label id="mktg5"><meter id="mktg5"></meter></label>
        最新文章專題視頻專題問答1問答10問答100問答1000問答2000關鍵字專題1關鍵字專題50關鍵字專題500關鍵字專題1500TAG最新視頻文章推薦1 推薦3 推薦5 推薦7 推薦9 推薦11 推薦13 推薦15 推薦17 推薦19 推薦21 推薦23 推薦25 推薦27 推薦29 推薦31 推薦33 推薦35 推薦37視頻文章20視頻文章30視頻文章40視頻文章50視頻文章60 視頻文章70視頻文章80視頻文章90視頻文章100視頻文章120視頻文章140 視頻2關鍵字專題關鍵字專題tag2tag3文章專題文章專題2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章專題3
        問答文章1 問答文章501 問答文章1001 問答文章1501 問答文章2001 問答文章2501 問答文章3001 問答文章3501 問答文章4001 問答文章4501 問答文章5001 問答文章5501 問答文章6001 問答文章6501 問答文章7001 問答文章7501 問答文章8001 問答文章8501 問答文章9001 問答文章9501
        當前位置: 首頁 - 科技 - 知識百科 - 正文

        Node.js pipe實現源碼解析

        來源:懂視網 責編:小采 時間:2020-11-27 22:32:55
        文檔

        Node.js pipe實現源碼解析

        Node.js pipe實現源碼解析:從前面兩篇文章,我們了解到。想要把 Readable 的數據寫到 Writable,就必須先手動的將數據讀入內存,然后寫入 Writable。換句話說,每次傳遞數據時,都需要寫如下的模板代碼 readable.on('readable', (err) => { if(err) thr
        推薦度:
        導讀Node.js pipe實現源碼解析:從前面兩篇文章,我們了解到。想要把 Readable 的數據寫到 Writable,就必須先手動的將數據讀入內存,然后寫入 Writable。換句話說,每次傳遞數據時,都需要寫如下的模板代碼 readable.on('readable', (err) => { if(err) thr

        從前面兩篇文章,我們了解到。想要把 Readable 的數據寫到 Writable,就必須先手動的將數據讀入內存,然后寫入 Writable。換句話說,每次傳遞數據時,都需要寫如下的模板代碼

        readable.on('readable', (err) => {
         if(err) throw err
        
         writable.write(readable.read())
        })
        

        為了方便使用,Node.js 提供了 pipe() 方法,讓我們可以優雅的傳遞數據

        readable.pipe(writable)

        現在,就讓我們來看看它是如何實現的吧

        pipe

        首先需要先調用 Readable 的 pipe() 方法

        // lib/_stream_readable.js
        
        Readable.prototype.pipe = function(dest, pipeOpts) {
         var src = this;
         var state = this._readableState;
        
         // 記錄 Writable
         switch (state.pipesCount) {
         case 0:
         state.pipes = dest;
         break;
         case 1:
         state.pipes = [state.pipes, dest];
         break;
         default:
         state.pipes.push(dest);
         break;
         }
         state.pipesCount += 1;
        
         // ...
        
         src.once('end', endFn);
        
         dest.on('unpipe', onunpipe);
         
         // ...
        
         dest.on('drain', ondrain);
        
         // ...
        
         src.on('data', ondata);
        
         // ...
        
         // 保證 error 事件觸發時,onerror 首先被執行
         prependListener(dest, 'error', onerror);
        
         // ...
        
         dest.once('close', onclose);
         
         // ...
        
         dest.once('finish', onfinish);
        
         // ...
        
         // 觸發 Writable 的 pipe 事件
         dest.emit('pipe', src);
        
         // 將 Readable 改為 flow 模式
         if (!state.flowing) {
         debug('pipe resume');
         src.resume();
         }
        
         return dest;
        };
        
        

        執行 pipe() 函數時,首先將 Writable 記錄到 state.pipes 中,然后綁定相關事件,最后如果 Readable 不是 flow 模式,就調用 resume() 將 Readable 改為 flow 模式

        傳遞數據

        Readable 從數據源獲取到數據后,觸發 data 事件,執行 ondata()

        ondata() 相關代碼:

        // lib/_stream_readable.js
        
         // 防止在 dest.write(chunk) 內調用 src.push(chunk) 造成 awaitDrain 重復增加,awaitDrain 不能清零,Readable 卡住的情況
         // 詳情見 https://github.com/nodejs/node/issues/7278
         var increasedAwaitDrain = false;
         function ondata(chunk) {
         debug('ondata');
         increasedAwaitDrain = false;
         var ret = dest.write(chunk);
         if (false === ret && !increasedAwaitDrain) {
         // 防止在 dest.write() 內調用 src.unpipe(dest),導致 awaitDrain 不能清零,Readable 卡住的情況
         if (((state.pipesCount === 1 && state.pipes === dest) ||
         (state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)
         ) && 
         !cleanedUp) {
         debug('false write response, pause', src._readableState.awaitDrain);
         src._readableState.awaitDrain++;
         increasedAwaitDrain = true;
         }
         // 進入 pause 模式
         src.pause();
         }
         }
        
        

        在 ondata(chunk) 函數內,通過 dest.write(chunk) 將數據寫入 Writable

        此時,在 _write() 內部可能會調用 src.push(chunk) 或使其 unpipe,這會導致 awaitDrain 多次增加,不能清零,Readable 卡住

        當不能再向 Writable 寫入數據時,Readable 會進入 pause 模式,直到所有的 drain 事件觸發

        觸發 drain 事件,執行 ondrain()

        // lib/_stream_readable.js
        
         var ondrain = pipeOnDrain(src);
        
         function pipeOnDrain(src) {
         return function() {
         var state = src._readableState;
         debug('pipeOnDrain', state.awaitDrain);
         if (state.awaitDrain)
         state.awaitDrain--;
         // awaitDrain === 0,且有 data 監聽器
         if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
         state.flowing = true;
         flow(src);
         }
         };
         }
        
        

        每個 drain 事件觸發時,都會減少 awaitDrain,直到 awaitDrain 為 0。此時,調用 flow(src),使 Readable 進入 flow 模式

        到這里,整個數據傳遞循環已經建立,數據會順著循環源源不斷的流入 Writable,直到所有數據寫入完成

        unpipe

        不管寫入過程中是否出現錯誤,最后都會執行 unpipe()

        // lib/_stream_readable.js
        
        // ...
        
         function unpipe() {
         debug('unpipe');
         src.unpipe(dest);
         }
        
        // ...
        
        Readable.prototype.unpipe = function(dest) {
         var state = this._readableState;
         var unpipeInfo = { hasUnpiped: false };
        
         // 啥也沒有
         if (state.pipesCount === 0)
         return this;
        
         // 只有一個
         if (state.pipesCount === 1) {
         if (dest && dest !== state.pipes)
         return this;
         // 沒有指定就 unpipe 所有
         if (!dest)
         dest = state.pipes;
        
         state.pipes = null;
         state.pipesCount = 0;
         state.flowing = false;
         if (dest)
         dest.emit('unpipe', this, unpipeInfo);
         return this;
         }
        
         // 沒有指定就 unpipe 所有
         if (!dest) {
         var dests = state.pipes;
         var len = state.pipesCount;
         state.pipes = null;
         state.pipesCount = 0;
         state.flowing = false;
        
         for (var i = 0; i < len; i++)
         dests[i].emit('unpipe', this, unpipeInfo);
         return this;
         }
        
         // 找到指定 Writable,并 unpipe
         var index = state.pipes.indexOf(dest);
         if (index === -1)
         return this;
        
         state.pipes.splice(index, 1);
         state.pipesCount -= 1;
         if (state.pipesCount === 1)
         state.pipes = state.pipes[0];
        
         dest.emit('unpipe', this, unpipeInfo);
        
         return this;
        };
        
        

        Readable.prototype.unpipe() 函數會根據 state.pipes 屬性和 dest 參數,選擇執行策略。最后會觸發 dest 的 unpipe 事件

        unpipe 事件觸發后,調用 onunpipe(),清理相關數據

        // lib/_stream_readable.js
        
         function onunpipe(readable, unpipeInfo) {
         debug('onunpipe');
         if (readable === src) {
         if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
         unpipeInfo.hasUnpiped = true;
         // 清理相關數據
         cleanup();
         }
         }
         }
        

        End

        在整個 pipe 的過程中,Readable 是主動方 ( 負責整個 pipe 過程:包括數據傳遞、unpipe 與異常處理 ),Writable 是被動方 ( 只需要觸發 drain 事件 )

        總結一下 pipe 的過程:

      1. 首先執行 readbable.pipe(writable),將 readable 與 writable 對接上
      2. 當 readable 中有數據時,readable.emit('data'),將數據寫入 writable
      3. 如果 writable.write(chunk) 返回 false,則進入 pause 模式,等待 drain 事件觸發
      4. drain 事件全部觸發后,再次進入 flow 模式,寫入數據
      5. 不管數據寫入完成或發生中斷,最后都會調用 unpipe()
      6. unpipe() 調用 Readable.prototype.unpipe(),觸發 dest 的 unpipe 事件,清理相關數據
      7. 參考:

        https://github.com/nodejs/node/blob/master/lib/_stream_readable.js

        https://github.com/nodejs/node/blob/master/lib/_stream_writable.js

        聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com

        文檔

        Node.js pipe實現源碼解析

        Node.js pipe實現源碼解析:從前面兩篇文章,我們了解到。想要把 Readable 的數據寫到 Writable,就必須先手動的將數據讀入內存,然后寫入 Writable。換句話說,每次傳遞數據時,都需要寫如下的模板代碼 readable.on('readable', (err) => { if(err) thr
        推薦度:
        標簽: 實現 源碼 pipe
        • 熱門焦點

        最新推薦

        猜你喜歡

        熱門推薦

        專題
        Top
        主站蜘蛛池模板: 久99精品视频在线观看婷亚洲片国产一区一级在线| 免费播放一区二区三区| 手机在线毛片免费播放| 亚洲的天堂av无码| 57pao一国产成视频永久免费| 色婷婷六月亚洲婷婷丁香| 一区二区三区四区免费视频| 亚洲AV无码精品色午夜果冻不卡| 日韩在线免费看网站| 亚洲另类自拍丝袜第1页| 中文字幕无码免费久久99| 亚洲日韩精品A∨片无码加勒比| 女人18一级毛片免费观看| 亚洲av永久无码精品秋霞电影秋| 国产三级免费电影| 国产成人1024精品免费| 久久久久亚洲AV成人无码| 一二三四在线播放免费观看中文版视频| 久久精品国产亚洲AV蜜臀色欲| 男女啪啪永久免费观看网站| 日本免费精品一区二区三区| 久久精品国产亚洲网站| 波多野结衣在线免费观看| 亚洲熟妇AV日韩熟妇在线| 亚洲精品国产福利一二区| 永久免费av无码网站yy| 亚洲伊人久久大香线蕉啊| 日本一道本高清免费| 在线观看免费黄色网址| 亚洲乱码卡一卡二卡三| yy6080亚洲一级理论| 99在线观看精品免费99| 苍井空亚洲精品AA片在线播放| 亚洲精品国产精品乱码不99| 91青青国产在线观看免费| 精品国产_亚洲人成在线| 亚洲av中文无码乱人伦在线咪咕 | 日韩视频在线观看免费| 国产精品高清视亚洲一区二区| 亚洲毛片av日韩av无码| 91精品国产免费|