الوحدة Stream

من موسوعة حسوب

الاستقرار: 2-مستقر

المجرى هو واجهة مجرّدة للعمل مع البيانات المتدفقة في Node.js. توفّر الوحدة stream واجهة برمجية (API) أساسية تجعل من السهل بناء كائنات تتعامل مع واجهة المجرى.

يوجد العديد من كائنات المجرى التي توفرها Node.js. على سبيل المثال، http.IncomingMessage (طلبيات الخادم HTTP) و process.stdout هما نسخ من الصنف stream.

يمكن أن تكون المجاري قابلة للقراءة (readable)، أو قابلة للكتابة (writable)، أو كليهما. كل المجاري هي نسخ من الصنف EventEmitter.

يمكن الوصول إلى الوحدة stream باستخدام:

const stream = require('stream');

لمَّا كان من الضروري فهم كيفية عمل المجاري، فإنَّ الوحدة stream بحد ذاتها هي أكثر فائدةً للمطورين الذين ينشئون أنواعًا جديدةً من نسخ المجاري. المطورون الذين يستهلكون كائنات المجاري بالمقام الأول نادرًا ما يحتاجون لاستخدام الوحدة stream بشكل مباشر.

تنظيم هذا المستند

يُقسّم هذا التوثيق إلى قسمين رئيسيين مع قسم ثالث للملاحظات الإضافية. يشرح القسم الأول عناصر الواجهة البرمجية للمجرى (stream API) التي تكون مطلوبة لاستخدام المجاري ضمن أي تطبيق. يشرح القسم الثاني عناصر الواجهة البرمجية (API) التي تُكون مطلوبةً لتطبيق أنواع جديدة من المجاري.

أنواع المجاري

يوجد أربعة أنواع رئيسية للمجاري ضمن Node.js هي:

بالإضافة إلى ذلك، تحوي هذه الوحدة الدوال الخدمية pipeline و finished.

نمط الكائن

كل المجاري المُنشأة من قبل واجهات Node.js تعمل حصريًا على السلاسل النصية والكائنات Buffer (أو Uint8Array). مع ذلك، فمن الممكن لتطبيقات المجاري أن تعمل مع أنواع بيانات أخرى في JavaScript (باستثناء النوع null، الذي يخدم غرضًا خاصًا ضمن المجاري). مثل هذه المجاري يؤخذ بالحسبان تشغيلها في "نمط الكائن" (object mode).

تُبدَّل نُسَخ المجاري إلى نمط الكائن باستخدام الخيار objectMode عند إنشاء المجرى. محاولة قلب مجرى موجود إلى نمط الكائن ليست آمنةً.

التخزين المؤقت Buffering

سوف يخزن كلا المجريين Writable و Readable البيانات في مخزن مؤقت داخلي لكي يمكن استعادتها باستخدام التابع writable.writableBuffer أو readable.readableBuffer على التوالي.

كمية البيانات القابلة للتخزين تعتمد على الخيار highWaterMark المُمرر إلى باني المجرى. من أجل المجاري القياسية، يحدد الخيار highWaterMark العدد الكلي من البايتات. أمَّا من أجل المجاري المُشغلة في نمط الكائن، فيحدد الخيار highWaterMark العدد الكلي من الكائنات.

تُخزّن البيانات مؤقتًا في المجاري التي من النوع Readable عندما يستدعي التابع stream.push(chunk)‎. إذا لم يستدعي من يستخدم المجرى التابع stream.read()‎‎، فستتوضع البيانات ضمن طابور داخلي إلى أن تُستخدَم.

حالما يصل الحجم الكلي لذاكرة القراءة الداخلية المؤقتة عتبةَ محددة من قبل highWaterMark، سيوقفُ المجرى مؤقتًا قراءة البيانات من المصادر الأساسية حتى تُستهلك البيانات الحالية المخزنة مؤقتًا (ذلك أنّ المجرى سيوقف استدعاء التابع readable._read()‎‎ المحلي الذي يُستخدم لوضع البيانات في ذاكرة القراءة المؤقتة).

تُخزّن البيانات مؤقتًا في المجاري التي من النوع Writable عندما يُستدعى التابع writable.write(chunk)‎ مرارًا وتكرارًا. طالما أنّ الحجم الكلي لذاكرة الكتابة المؤقتة الداخلية هو أدنى من العتبة المضبوطة بمقدار highWaterMark، سوف يعيد استدعاء writable.write()‎ القيمة true. حالما يصل أو يتجاوز حجم ذاكرة التخزين المؤقت الداخلية القيمة highWaterMark، سوف تُعاد القيمة false.

الهدف الأساسي من واجهات الوحدة stream البرمجية - على وجه الخصوص التابع stream.pipe()‎ - هو تقييد حجم التخزين المؤقت للبيانات إلى مستويات مقبولة من أجل التوافق بين المصادر (source) والوجهات (destination) ذات السرعات المختلفة لكي لا تمتلئ الذاكرة المتوافرة.

بما أن كلا المجريين Duplex و Transform يجمعان بين النوعين Readable و Writable، يمتلك كل واحد منها ذاكرتي تخزين مؤقتة. هاتان الذاكراتان داخليتين ومنفصلتين عن بعضهما وتستخدمان للقراءة والكتابة يف آن واحد، مما يسمح لكل طرف من المجرى بالتشغيل بشكل مستقل عن الآخر بينما يضمن تدفق بيانات مناسب وفعّال. على سبيل المثال، النُسخ net.Socket هي مجاري من النوع Duplex طرفها القابل للقراءة (Readable) يسمح باستهلاك البيانات المُستقبَلة من المقبس وطرفها القابل للكتابة (Writable) يسمح بكتابة البيانات على المقبس. بسبب أن البيانات قد تُكتب إلى المقبس بمعدل أسرع أو أبطأ من البيانات التي تُستقبل وتكتب على المجرى، فمن الضروري لكل طرف أن يعمل (ويخزِّن البيانات) بشكل مستقل عن الآخر.

الواجهات البرمجية لمستخدمي المجرى

كل تطبيقات Node.js تقريبًا، مهما كانت بسيطة، تستخدم المجاري بطريقةٍ ما. الآتي هو مثال لاستخدام المجاري في تطبيق Node.js والذي ينفّذ مُخدّم HTTP:

const http = require('http');

const server = http.createServer((req, res) => {
  


// والذي هو مجرى قابل للقراءة ،http.IncomingMessage هو Req  
//والذي هو مجرى قابل للكتابة http.ServerResponse هو Req  


  let body = '';


//utf8 احصل على البيانات كسلاسل 
//إذا لم يُجهّز الترميز ستُستقبل كائنات ذاكرة مؤقتة

  req.setEncoding('utf8');



//حالما يُضاف مستمع 'data'تطلق المجاري القابلة للقراءة أحداث  

  req.on('data', (chunk) => {
    body += chunk;
  });

يشير الحدث 'end' أن الجسم الكامل قد استُقبل.

  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // اكتب شيئًا يثير اهتمام المستخدم
      res.write(typeof data);
      res.end();
    } catch (er) {



// !سيئة json ،أوه 
      res.statusCode = 400;
      return res.end(`error: ${er.message}`);
    }
  });
});

server.listen(1337);

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token o in JSON at position 1

المجاري ذات النوع Writable (مثل res في المثال) توفر توابع مثل write()‎ و end()‎ والتي تُستخدم لكتابة البيانات على المجرى.

المجاري ذات النوع Readable تستخدم واجهات الصنف EventEmitter البرمجية من أجل اشعار شيفرة التطبيق عندما تكون البيانات متوفرة للقراءة من المجرى. يمكن قراءة هذه البيانات المتوفرة من المجرى بعدة طرق.

تستخدم المجاري التي من النوع Writable و Readable واجهات الصنف EventEmitter البرمجية بطرق متنوعة للبقاء على اتصال دائم بالحالة الحالية للمجرى.

تذكر أن المجاري التي من النوع Duplex و Transform هي مجاري قابلة للكتاية (Writable) والقراءة (Readable).

لا يُطلب من التطبيقات التي إمّا تكتب البيانات أو تقرؤها من المجرى تنفيذ واجهات الوحدة stream بشكل مباشر، وبذلك لا يوجد عمومًا سبب لاستدعاء الوحدة stream عبر require('stream')‎‎.

ينبغي على المطورين الراغبين بإنشاء أنواع جديدة من المجاري الرجوع إلى القسم الواجهات البرمجية لمنفذي المجاري.

المجاري القابلة للكتابة

المجاري القابلة للكتابة إجمالًا تمثِّل مكانًا قابلًا لكتابة البيانات فيه.

تتضمن الأمثلة التالية مجارٍ قابلة للكتابة:

بعض هذه الأمثلة هي فعليًا مجاري من النوع Duplex والتي تنفّذ الواجهة Writable.

كل المجاري التي من النوع Writable تنفّذ الواجهة المعرّفة بالصنف stream.Writable.

بينما قد تختلف نسخ المجاري التي من النوع Writable بطرق متنوعة، كل المجاري Writable تتبع نمط الاستخدام الأساسي كما هو موضّح في المثال أدناه:

const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');

الصنف stream.Writable

أضيف في الإصدار: 0.9.4.

الحدث 'close'

يُطلَق الحدث 'close' عندما يكون المجرى أو أحد موارده الأساسية (واصف الملفات مثلًا) قد أُغلق. يشير هذا الحدث أنّه لن تُطلق المزيد من الأحداث، ولن تحصل المزيد من العمليات المتعلقة بالمجرى.

لا تُطلق كل المجاري Writable الحدث 'close'.

الحدث 'drain'

أُضيف في الإصدار:0.9.4.

إذا أعاد التابع stream.write(chunk)‎ القيمة false، فسوف يُطلَق الحدث 'drain' عندما يكون من المناسب استئناف كتابة البيانات على المجرى.

//كتابة البيانات إلى المجرى القابل للكتابة المزوّد لمليون مرّة
//كن منتبهًا للضغط العائد


function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {

//آخر مرّة

        writer.write(data, encoding, callback);
      } else {



//انظر إذا كان ينبغي لنا الإستمرار أو الإنتظار
//لا تمرر دالة رد النداء لأنّنا لم ننته بعد

        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {

//وجب التوقف باكرًا
//اكتب بعض المزيد حالما تنضب

      writer.once('drain', write);
    }
  }
}
الحدث 'error'

أضيف في الإصدار: 0.9.4.

يُطلق الحدث 'error' إذا حصل خطأ أثناء الكتابة على أو إرسال البيانات إلى المجرى. سوف يُمرَّر إلى دالة رد النداء المنصتة الوسيط Error فقط عند استدعائها.

لن يُغلق المجرى عندما يُطلق الحدث 'error'.

الحدث 'finish'

أضيف في الإصدار: 0.9.4.

سوف يُطلق الحدث 'finish' بعد استدعاء التابع  stream.end()‎‎، وبعد أن دُفعت كل البيانات للنظام الأساسي.

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
  console.error('All writes are now complete.');
});
الحدث  'pipe'

أُضيف في الإصدار: 0.9.4.

  • src:‏ <stream.Readable>مجرى القراءة والذي يُوصل إلى هذا المجرى القابل للكتابة.

يٌطلَق الحدث 'pipe' عندما  يُستدعى التابع stream.pipe()‎ على مجرًى قابلٍ للقراءة مُضيفًا المجرى القابل للكتابة إلى مجموعة وجهاته.

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
  console.error('something is piping into the writer');
  assert.equal(src, reader);
});
reader.pipe(writer);
الحدث 'unpipe'

أضيف في الإصدار: 0.9.4.

  • src:‏ <stream.Readable> مجرى المصدر الذي لغى الاتصال بهذا المجرى القابل للكاتبة.

يُطلَق الحدث 'unpipe' عندما يُستدعى التابع stream.unpipe()‎‎ على مجرًى قابلٍ للقراءة، مزيلًا المجرى القابل لكتابة من مجموعة وجهاته.

يُطلق هذا الحدث أيضًا في الحالة التي يطلق فيها المجرى القابل للكاتبة خطأً عندما يُوصل به مجرى قابل للقراءة.

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
  console.error('Something has stopped piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
writable.cork()‎

أضيف في الإصدار: 0.11.2.

يجبر التابع writable.cork()‎ كل البيانات المكتوبة على أن تُخزّن مؤقتًا في الذاكرة. ستُدفع البيانات المخزّنة عندما يُستدعى أحد التابعين stream.uncork()‎‎‎‎ أو stream.end()‎.

المقصد الأساسي من التابع writable.cork()‎ هو تجنب حالة يحدث فيها كتابة العديد من القطع الصغيرة من البيانات إلى المجرى دون جلب نسخةٍ منها إلى المخزن المؤقت الداخلي، إذ سيكون لذلك تأثير عكسي على الأداء. في مثل هذه الحالات، يمكن أن تنجز التطبيقات التي تنّفذ التابع writable._writev()‎ عملية الكتابة المخزّنة مؤقتًا بأسلوب أكثر مثاليةً.

انظر أيضًا: التابع writable.uncork()‎.

writable.destroy([error]‎)‎

أُضيف في الإصدار: 8.0.0

يهدم التابع المجرى، ويطلق الأحداث 'error' و 'close' المُمررة. بعد هذا الاستدعاء، تكون المجاري القابلة للكتابة قد انتهت والاستدعاءات اللاحقة للتابع write()‎ والتابع end()‎ سوف تُفضي إلى خطأ ERR_STREAM_DESTROYED. لا ينبغي أن يعيد المنفّذون تعريف هذا التابع، ولكن يمكنهم أن يستعملوا التابع writable._destroy()‎ بدلًا عن ذلك.

writable.end([chunk][, encoding][, callback]‎)‎

سجل التغييرات

الإصدار التغييرات
10.0.0 يعيد هذا التابع الآن مرجعًا إلى مجرًى من النوع writable.
8.0.0 يمكن الآن أن يكون الوسيط chunk نسخة من النوع Uint8Array.
0.9.4 أضيف هذا التابع في الإصدار 0.9.4
  • chunk:‏ ‎<string>‎ | <Buffer>‎ | <Uint8Array>‎ | <any>‎ بيانات اختيارية يراد كتابتها. من أجل المجاري التي لا تعمل في وضع الكائن، يجب أن يكون الوسيط chunk سلسلةً نصيةً (string) أو كائنًا من النوع Buffer أو النوع Uint8Array. من أجل المجاري في نمط الكائن، يمكن أن تكون chunk أي قيمة غير null.
  • encoding:‏ <string> التشفير، إذا كانت  chunk سلسلة نصية.
  • <Function> :‎callback‎ دالة رد نداء اختيارية عندما يُنهى المجرى.
  • القيمة المعادة: ‎<this>‎‎

يشير استدعاء التابع writable.end()‎ أنّه لا مزيد من البيانات يراد كتابتها على المجرى القابل للكتابة (Writable). سيسمح الوسيطان الاختياريان chunk و encoding أن تُكتب قطعة إضافية واحدة أخيرة من البيانات مباشرةً قبل إغلاق المجرى. إذا زُودت، سترفق دالة رد نداء callback اختيارية كمنصت للحدث 'finish'.

سيثير استدعاء التابع stream.write()‎ بعد استدعاء stream.end()‎ خطأً.

// 'world!‎' ومن ثم أنهِ ب ' hello,‎' اكتب 

const fs = require('fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');

//كتابة المزيد ليست مسموحة الآن
writable.setDefaultEncoding(encoding)‎

سجل التغييرات

الإصدار التغييرات
6.1.0 يعيد هذا التابع الآن مرجعًا إلى مجرًى من النوع writable.
0.11.15 أضيف في 0.11.15

يضبط التابع writable.setDefaultEncoding()‎ الترميزَ الافتراضي (encoding) للمجرى القابل للكتابة (Writable).

writable.uncork()‎

أُضيف في الإصدار: 0.11.2.

يفرِّغ التابع writable.uncork()‎ كل البيانات التي خُزِّنت منذ أن استُدعي stream.cork()‎.

عند استخدام التابع writable.cork()‎ والتابع ()writable.uncork لإدارة التخزين المؤقت أثناء الكتابة على مجرًى، يُنصح بأن يؤخر استدعاء writable.uncork()‎ باستخدام process.nextTick()‎. يسمح القيام بذلك بتجميع كل استدعاءات التابع writable.write()‎ التي تحصل خلال طورٍ واحدٍ لحلقة أحداث Node.js مُعطاة.

stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());

إذا استُدعي التابع writable.cork()‎ عدة مرات لمجرىً ما، فيجب أن يُستدعى التابع writable.uncork()‎ نفس عدد الاستدعاءات لتفريغ البيانات المخزنة.

stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
  stream.uncork();


//مرّة ثانية uncork()‎لن تُفرّغ البيانات حتى يُستدعى التابع 

  stream.uncork();
});

انظر أيضًا: التابع writable.cork()‎.

writable.writableHighWaterMark

أُضيفت في الإصدار: 9.3.0.

يعيد القيمة highWaterMark التي مُرِّرت عند إنشاء المجرى القابل للكتابة (Writable).

writable.writableLength

أُضيفت في الإصدار: 9.4.0.

تحوي هذه الخاصية عدد البايتات (أو الكائنات) الجاهزة للكتابة والموجودة في الطابور. توفر هذه القيمة بيانات داخلية متعلقة بحالة القيمة highWaterMark.

writable.write(chunk[, encoding][, callback]‎)‎

سجل التغييرات

الإصدار التغييرات
8.0.0 يمكن الآن أن يكون الوسيط نسخة من النوع Uint8Array.
6.0.0 سوف يعتبر دائمًا تمريرالقيمة null كمعامل chunk غير صالح الآن، حتى في نمط الكائن.
0.9.4 أُضيف هذا التابع.
  • chunk‏: ‎<string>‎ |‎ <Buffer>‎ |‎ <Uint8Array>‎ |‎ <any‎>‎  بيانات اختيارية للكتابة. من أجل المجاري التي لا تعمل في نمط الكائن، يجب أن تكون chunk سلسلة نصية أو Buffer أو Uint8Array. من أجل مجاري نمط الكائن، يمكن أن تكون chunk أي نوع من أنواع بيانات JavaScript غير null.
  • <string>‎  :encoding الترميز، إذا كان الوسيط chunk سلسلةً نصيةً.
  • callback:‏ <Function‎> دالة رد النداء المراد استدعاؤها عندما تُدفع قطعة البيانات.
  • القيمة المُعادة: ‎<boolean>‎‎ قيمة منطقية تكون false إذا كان المجرى يريد للشيفرة المستدعية أن تنتظر انطلاق الحدث 'drain' قبل الاستمرار بكتابة المزيد من البيانات؛ وإلّا تكون true.

يكتب التابع writable.write()‎ بعض البيانات إلى المجرى. ويستدعي رد النداء (callback) المُزوّد حالما تكون البيانات المكتوبة قد عولجت بشكل كامل. إذا حصل خطأ، ربما يُستدعى callback أو لا مع تمرير الخطأ كوسيطه الأول. للكشف عن أخطاء الكتابة بشكل موثوق، أضف مُنصتًا إلى الحدث 'error'.

القيمة المعادة هي true إذا كانت ذاكرة التخزين الداخلية أقل من القيمة highWaterMark المُهيئة عندما أُنشئ المجرى بعد إقرار chunk. إذا أُعيد false، ينبغي أن تتوقف المحاولات الإضافية لكتابة البيانات على المجرى حتى يُطلق الحدث 'drain'.

طالما أن المجرى لم يفرغ، فإن استدعاء write()‎ سوف يخزّن chunk مؤقتًا، ويعيد خطأ. حالما تُستهلك كل القطع الحالية المخزنة مؤقتًا (قُبلَت للتسليم من قبل نظام التشغيل)، سوف يُطلق الحدث 'drain'. حين يعيد write()‎ القيمة false، فمن المستحسن ألّا تكتب المزيد من القطع حتى يُطلق الحدث 'drain'. بينما يُسمح باستدعاء write()‎ على مجرًى لم يفرغ بعد، فسوف تخزن Node.js مؤقتًا كل القطع المكتوبة حتى يحصل استخدام ذاكرة أعظمي،عند هذه النقطة ستتوقف دون قيود. حتى قبل حصول التوقف، سيسبب الاستخدام المرتفع للذاكرة أداءًا ضعيفًا لجامع القمامة (منظّف الذاكرة) وارتفاع مجموعة الذاكرة المُقِيمة (RSS) [والذي لن يعاد استرجاعه للنظام بشكل طبيعي، حتى بعد عدم الحاجة للذاكرة]. بما أن المقابس TCP قد لا تنضب أبدًا إذا لم يقرأ الند (peer) البعيد البيانات، ربما تقود الكتابة على مقبس لا يُصرّف لحصول ثغرة قابلة للاستغلال (exploitable vulnerability).

كتابة البيانات على مجرى لا ينفد هو معضلة خصوصًا مع مجاري التحويل ( Transform)، لأنّ مجاري Transform  تتوقف افتراضيًا حتى تنقل تُتصل بمجرى آخر أو يضاف معالج للحدث 'data' أو الحدث 'readable'.

اذا أمكن توليد البيانات التي ستُكتب أو جلبها عند الطلب، فمن المستحسن تغليفها في نفس المجرى ثم تحويلها إلى المجرى القابل للقراءة عبر استخدام التابع stream.pipe()‎.

ولكن إذا كان رُجّج استدعاء write()‎، فمن الممكن مراعاة الضغط العائد وتجنب مشاكل الذاكرة باستخدام الحدث 'drain':

function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb);
  } else {
    process.nextTick(cb);
  }
}


//قبل القيام بأي كتابة أخرى cb  الانتظار حتى يُستدعى 

write('hello', () => {
  console.log('write completed, do more writes now');
});

سوف يهمل المجرى Writable في نمط الكائن الوسيط encoding دائمًا.

المجاري القابلة للقراءة

المجاري القابلة للقراءة هي فكرة مجرّدة عن مصدرٍ تُستهلك منه البيانات.

أمثلة عن المجاري المقروءة (Readable) هي:

كل المجاري  القابلة للقراءة (Readable) تطبّق الواجهة المعرّفة من قبل الصنف stream.Readable.

نمطا القراءة

تعمل المجاري القابلة للقراءة (Readable) بفاعلية في أحد النمطين التاليين: نمط التدفق (flowing mode) ونمط التوقف المؤقتً (paused mode). هذه الأنماط منفصلة عن نمط الكائن. يمكن أن يكون المجرى المقروء في نمط الكائن أو لا، بغض النظر عن إذا كان في نمط التدفق أو نمط التوقف المؤقت.

  • في نمط التدفق، تُقرأ البيانات من النظام الرئيسي بشكل تلقائي وتُقدّم للتطبيق بأسرع ما يمكن باستخدام الأحداث عبر الواجهة EventEmitter.
  • في نمط التوقف المؤقت، يجب أن يُستدعى stream.read()‎ بشكل صريح لقراءة قطع البيانات من المجرى.

تبدأ كل المجاري القابلة للقراءة في نمط التوقف المؤقت ولكن يمكن تبديلها إلى نمط التدفق بإحدى الطرق التالية:

يمكن نقل Readable بالعكس إلى نمط التوقف باستخدام أحد التالي:

  • إذا لم يكن هناك أنبوب متصل بالوجهات، فيمكن ذلك عبر استدعاء التابع stream.pause()‎.
  • إذا كان هناك أنبوب متصل بالوجهات، فيمكن ذلك عبر إزالة كل أنابيب الوجهات. قد تُزال العديد من أنابيب الوجهات باستدعاء التابع stream.unpipe()‎.

المبدأ الأساسي الذي يجب تذكره دومًا هو أنّ المجرى القابل للقراءة (Readable) لن يولّد بيانات حتى تتوافر آلية إمّا لاستهلاك تلك البيانات أو تجاهلها. إذا كانت آلية الاستهلاك معطلة أو مُستبعدة، سوف يحاول Readable إيقاف توليد البيانات.

من أجل التوافقية مع الإصدارات السابقة، لن توقف ازالة معالج حدث 'data' المجرى تلقائيًا. أيضًا، إذا كان هنالك ممرات للوجهات، فحين ذلك  لن يضمَن استدعاء stream.pause()‎ أن يبقى المجرى متوقفًا حالما تفرغ هذه الوجهات وتسأل عن المزيد من البيانات.

إذا قُلب Readable إلى نمط التدفق ولم يتوفر مستهلكون لمعالجة البيانات، ستضيع تلك البيانات. يمكن أن يحصل هذا على سبيل المثال، عندما يُستدعى تابع readable.resume()‎ دون منصت مرفق بحدث 'data'، أو عندما يُزال معالج حدث 'data' من المجرى.

إضافة معالج حدث 'readable' يجعل توقف تدفق المجرى تلقائيًا، وتُتستهلك البيانات بواسطة readable.read()‎. إذا أُزيل معالج حدث 'readable'، حينذاك سيبدأ المجرى بالتدفق مُجدّدًا إذا كان هناك معالج حدث 'data' موجود.

الحالات الثلاث

هناك "وضعا" تشغيل للمجرى القابل للقراءة (Readable)، هما تلخيص مبسّط لإدارة حالات داخلية كثر تعقيدًا والتي تحصل خلال تطبيق مجرى قابل للقراءة (Readable).

على وجه التحديد وفي أي نقطة معطاة من الزمن يأخذ كل مجرى القابل للقراءة (Readable) إحدى الحالات الثلاث التالية:

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

عندما تكون readable.readableFlowing هي null ، لن تكون هناك آلية مُقدمة لاستهلاك بيانات المجرى. لذلك، لن يولد المجرى بيانات. بينما في هذه الحالة،  سيبدل إرفاق مُنصت إلى الحدث 'data' أو استدعاء التابع readable.pipe()‎ أو استدعاء التابع readable.resume()‎ قيمة الخاصية readable.readableFlowing

إلى القيمة true، متسببًا ببدء  Readable بإطلاق الأحداث بفاعلية حين توّلد البيانات.

سوف يتسبب استدعاء readable.pause()‎‎ أو readable.unpipe()‎ أو استقبال الضغط العائد بضبط قيمة الخاصية readable.readableFlowing إلى false، قاطعًا تدفق الأحداث مؤقتًا ولكن غير قاطعٍ لتدفق البيانات. بينما لن يقلب إرفاق منصت بالحدث 'data' قيمة الخاصية  readable.readableFlowing إلى true في هذه الحالة.

const { PassThrough, Writable } = require('stream');
const pass = new PassThrough();
const writable = new Writable();

pass.pipe(writable);
pass.unpipe(writable);


//التدفق القابل للقراءة متوقف الآن

pass.on('data', (chunk) => { console.log(chunk.toString()); });
pass.write('ok');  

// 'data'لن تطلق الحدث


pass.resume();    

//'data' يجب أن تُستدعى لجعل المجرى يطلق الحدث

بينما تكون قيمة readable.readableFlowing هي false، قد تتراكم البيانات داخل الذاكرة المؤقتة الداخلية للمجرى.

اختر أسلوب واجهة برمجية وحيد

تطورت الواجهات البرمجية (API) للمجرى القابل للقراءة (Readable) عبر عدة إصدارات Node.js وقدّمت عدّة توابع لاستهلاك بيانات المجرى. بشكل عام، ينبغي أن يختار المطورون واحدة من توابع استهلاك البيانات ولا ينبغي استخدام عدة توابع لاستهلاك البيانات من مجرى واحد. بشكل خاص، قد يقود استخدام مزيج من on('data')‎ أو on('readable')‎‎ أو pipe()‎ أو توابع تكرارية غير متزامنة إلى سلوك غير متوقع.

ينصح باستخدام التابع readable.pipe()‎ لأغلب المستخدمين بما أنه نُفّذ لتقديم أسهل طريقة لاستهلاك بيانات المجرى.

يمكن أن يستخدم المطورون الذين يطلبون المزيد من التحكم الدقيق على نقل وتوليد البيانات الصنف EventEmitter  و readable.on('readable')/readable.read()‎ أو الواجهات readable.pause()/readable.resume()‎.

الصنف stream.Readable

أُضيف في الإصدار: 0.9.4.

الحدث:'close'

أُضيف في الإصدار: 0.9.4.

يُطلق الحدث'close' عندما يكون المجرى أو أحد موارده الأساسية (واصف الملف مثلًا) قد أُغلق. يشير الحدث أنّه لن تُطلق مزيد من الأحداث ولن يحصل المزيد من العمليات المتعلقة بالمجرى.

لا تُطلق كل المجاري القابلة للقراءة الحدث 'close'.

الحدث:'data'

أُضيف في الإصدار: 0.9.4.

  • <Buffer> | <string> | <any> ‎:‎chunk قطع البيانات. من أجل المجاري التي لا تعمل في نمط الكائن، ستكون القطعة إمّا سلسلة نصية أو كائنًا من النوع Buffer. من أجل المجاري في نمط الكائن، يمكن أن تكون القطعة أي نوع من أنواع البيانات باستثناء null.

سيُطلق الحدث 'data' كلما تخلى المجرى عن حيازة قطعة من البيانات إلى المستهلك. قد يحصل هذا كلّما بُدِّل المجرى في نمط الكائن باستدعاء readable.pipe()‎ أو readable.resume()‎ أو بإرفاق رد نداء منصت للحدث 'data'. سوف يُطلق الحدث 'data' أيضًا كلّما استُدعي التابع readable.read()‎ وكانت قطعة من البيانات متوفرة لتُعاد.

إرفاق مُنصت بالحدث 'data'  لمجرىً لم يُوقَف بشكل صريح سوف يقلب المجرى إلى نمط التدفق. ستُمرر البيانات بعدئذٍ حالما تكون متوفرة.

سوف يمرر ردُ نداء المُنصت قطعةَ البيانات كسلسلة نصية إذا كان التشفير الافتراضي مُحدّدًا باستخدام التابع readable.setEncoding()‎‎، وإلّا ستمرر البيانات ضمن كائن من النوع Buffer.

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
الحدث: 'end'

أُضيف في الإصدار: 0.9.4.

يُطلق الحدث 'end' عندما لا يكون هناك المزيد من البيانات لتُقرَأ من المجرى.

لن يُطلق الحدث 'end' مالم تُستهلك البيانات بالكامل. يمكن أن يُحقَّق ذلك بقلب المجرى إلى نمط التدفق، أو باستدعاء stream.read()‎ مرارًا حتى يتم استهلاك كل البيانات.

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
  console.log('There will be no more data.');
});
الحدث: 'error'

أُضيف في الإصدار: 0.9.4.

ربما يُطلق الحدث 'error' من قبل إجراء مجرى قابل للقراءة (Readable) في أي وقت. عادةً، قد يحصل ذلك إذا كان المجرى الأساسي غير قادر على توليد بيانات بسبب فشل داخلي أساسي، أو عندما يحاول إجراء المجرى دفع قطعة بيانات غير صالحة.

سيُمرَّر إلى دالة رد نداء المُنصت كائنٌ من النوع Error فقط.

الحدث: 'readable'

سجل التغييرات

الإصدار التغييرات
10.0.0 يُطلق الحدث 'readable' دائمًا في النبضة التالية بعد أن يُستدعى ‎.push()‎
10.0.0 يتطلب استخدام 'readable' استدعاءَ ‎.read()‎
0.9.4 أُضيف في الإصدار: 0.9.4.

يُطلق التابع 'readable' عندما يكون هناك بيانات متوفرة للقراءة من المجرى. في بعض الحالات، سوف يسبب ربط مُنصت بالحدث 'readable' أن تُقرَأ  بعض كمية البيانات إلى ذاكرة مؤقتة داخلية.

const readable = getReadableStreamSomehow();
readable.on('readable', function() {
 
//يوجد بعض البيانات لتُقرأ الآن

  let data;

  while (data = this.read()) {
    console.log(data);
  }
});

سوف يُطلق الحدث 'readable' أيضًا حالما يتم التوصل إلى نهاية بيانات المجرى ولكن قبل أن يُطلق الحدث 'end'. فعليًا، يشير الحدث 'readable' أن المجرى يملك معلومات جديدة وهي: إمّا بيانات جديدة متوفرة أو تم الوصول إلى نهاية المجرى. في الحالة الأولى، سوف يعيد stream.read()‎ البيانات المتوفرة. في الحالة الثانية، سوف يعيد stream.read()‎ القيمة null . على سبيل المثال، في المثال التالي، foo.txt هو ملف فارغ:

const fs = require('fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
  console.log('end');
});

خرج تشغيل هذا السكربت هو:

$ node test.js
readable: null
end

بشكل عام، فإنّ  آليات readable.pipe()‎ والحدث  'data' هي أسهل للفهم من الحدث 'readable'. ولكن، معالجة 'readable' قد تنتج زيادة بمعدل الانتاجية .

إذا استُخدم  كلا الحدثين 'readable' و 'data'  في نفس الوقت، فإن الحدث  'readable' يأخذ الأولوية في التحكم بالتدفق؛  أي أنّ الحدث 'data' سوف يُطلق فقط إذا استُدعي stream.read()‎. قد تصبح قيمة الخاصية  readableFlowing هي false . إذا كان هناك مُنصتات للحدث 'data' عندما يُزال الحدث 'readable'، سيبدأ المجرى بالتدفق؛ أي أن الأحداث 'data' سوف تُطلق دون استدعاء ‎.resume()

readable.destroy([error]‎)‎

أُضيف في الإصدار: 8.0.0.

  • ‎‎<Error>‎‎ :error الخطأ الحاصل الذي سيُمرَّر عند تحميل الحدث 'error'.
  • القيمة المُعادة: ‎<this>‎‎

يهدم التابع المجرى ويطلق الحدث 'error' و 'close'. بعد هذا الاستدعاء، سوف يحرر المجرى القابل للقراءة أيّة موارد داخلية وسوف تُتجاهل الاستدعاءات اللاحقة للتابع push()‎ . لا ينبغي على المنفذين إعادة تعريف هذا التابع، ولكن يمكنهم تطبيق readable._destroy()‎‎ بدلًا عنه.

readable.isPaused()‎

أُضيف في الإصدار: 0.11.14.

  • القيمة المُعادة: ‎<boolean>‎‎

يعيد التابع readable.isPaused()‎ حالة التشغيل الحالية للمجرى Readable . يُستخدم هذا التابع بالمقام الأول من قبل آليات ترتكز على التابع readable.pipe()‎. في معظم الحالات الطبيعية، لا يوجد حاجة لاستخدام هذا التابع بشكل مباشر.

const readable = new stream.Readable();

readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false
readable.pause()‎

أضيف في الإصدار: 0.9.4.

القيمة المُعادة: ‎<this>

سوف يسبب التابع readable.pause()‎ وقف إطلاق أحداث 'data' في نمط التدفق، مع إخراج المجرى من نمط التدفق. أية بيانات تصبح متوفرة ستبقى في الذاكرة المؤقتة الداخلية.

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  readable.pause();
  console.log('There will be no additional data for 1 second.');
  setTimeout(() => {
    console.log('Now data will start flowing again.');
    readable.resume();
  }, 1000);
});

لا يملك التابع readable.pause()‎ أي تأثير إذا كان هناك مُنصت للحدث 'readable'.

readable.pipe(destination[, options]‎)‎

أُضيف في الإصدار: 0.9.4.

  • <stream.Writable>‎ :destination الوجهة التي ستُكتَب فيها البيانات.
  • <Object> :options‎ خيارات النقل.
    • <boolean>‎ :‎end إنهاء الكاتب عندما ينتهي القارئ. القيمة الافتراضية: true.
  • القيمة المُعادة: ‎<stream.Writable>‎ تعاد الوجهة destination سامحةً بسلسلة من الأنابيب والممرات إذا كانت المجاري من النوع المزدوج (Duplex) أو مجاري تحويل (Transform).

يربط التابع readable.pipe()‎ مجرًى قابلًا للكتابة مع مجرًى قابلٍ للقراءة (Readable) مسببًا قلبه تلقائيّا إلى نمط التدفق ودفع جميع بياناته إلى المجرى القابل للكتابة المرتبط به. أي كأن هذا التابع يضع أنبوبًا بين المجريين لتتدفق البيانات من أحدهما إلى الآخر عبره. سيضبط تدفق البيانات تلقائيًا لذلك لن يُغمر مجرى Writable الهدف من قبل مجرى قابل للقراءة(Readable) أسرع منه.

ينقل المثال التالي كل البيانات من المجرى readable إلى ملف اسمه file.txt:

const fs = require('fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');



//'file.txt' تذهب كل البيانات من المجرى القابل للقراءة إلى

readable.pipe(writable);

من الممكن ربط عدة مجاري قابلة للكتابة (Writable) مع مجرى واحد قابل للقراءة (Readable). يعيد التابع readable.pipe()‎ مرجعًا إلى مجرى الوجهة جاعلًا من الممكن إقامة سلسلة من المجاري المتصلة ببعضها بأنابيب:

const fs = require('fs');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

بشكل افتراضي، يُستدعى stream.end()‎ على مجرى الوجهة القابل للكتابة (Writable) عندما يطلق مجرى Readable المصدر الحدث 'end'. لذلك لن تكون الوجهة قابلةً للكتابة بعد الآن. لتعطيل هذا السلوك الافتراضي، يمكن أن تمرير القيمة false إلى الخيار end مسبّبًا بقاء الوجهة مفتوحة:

reader.pipe(writer, { end: false });
reader.on('end', () => {
  writer.end('Goodbye\n');
});

أحد التحذيرات المهمة أنّه إذا  أطلق المجرى Readable خطأً أثناء المعالجة، لن يُغلق مجرى الوجهة Writable تلقائيًا، لذا من الضروري في هذه الحالة إغلاق كل مجرى يدويًا لمنع التسريب في الذاكرة.

لن يغلق المجريان process.stderr و process.stdout القابلان للكتابة (Writable) أبدًا حتى تنتهي عملية Node.js بغض النظر عن الخيارات المحدّدة.

readable.read([size]‎)‎

أضيف في الإصدار: 0.9.4.

يسحب التابع readable.read()‎ بعض البيانات من الذاكرة المؤقتة الداخلية ويعيدها. إذا لم يكن هناك بيانات متوافرة للقراءة، يعيد التابع القيمة null. بشكل افتراضي، ستعاد البيانات في كائن من النوع Buffer  إلّا إذا حُدد الترميز باستخدام التابع readable.setEncoding()‎ أو إذا كان المجرى يشتغل في نمط الكائن.

يحدد الوسيط الاختياري size عدد البايتات المحدد للقراءة. إذا لم يكن هنالك بايتات بالحجم size متوافرة للقراءة، ستُعاد القيمة null إلّا إذا كان المجرى قد انتهى. في هذه الحالة، ستُعاد كل البيانات المتبقية في ذاكرة التخزين المؤقت الداخلية.

إذا لم يكن الوسيط size محدّدًا، ستُعاد كل البيانات المحتواة في ذاكرة التخزين المؤقت الداخلية.

ينبغي أن يُستدعى التابع readable.read()‎ على مجاري Readable المُشغّلة في نمط التوقف المؤقت فقط، في نمط التدفق، يُستدعى readable.read()‎ تلقائيًا حتى فراغ ذاكرة التخزين المؤقت الداخلية بالكامل.

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  let chunk;
  while (null !== (chunk = readable.read())) {
    console.log(`Received ${chunk.length} bytes of data.`);
  }
})

سيعيد المجرى Readable في نمط الكائن عنصرًا وحيدًا دائمًا من استدعاء readable.read(size)‎ بغض النظر عن عن قيمة الوسيط size.

إذا أعاد التابع readable.read()‎ قطعة من البيانات، سيُطلق أيضًا الحدث 'data'.

استدعاء stream.read([size]‎)‎ بعد أن يكون الحدث 'end' قد أُطلق سيعيد null. لن تُطلَق أي أخطاء وقت التشغيل (runtime).

readable.readableHighWaterMark

أضيفت في الإصدار: 9.3.0.

  • القيمة المعادة: ‎<number>‎‎

تعيد قيمة الخاصية highWaterMark المُمرّرة عند إنشاء مجرى Readable هذا.

readable.readableLength

أُضيفت في الإصدار:9.4.0.

تحوي هذه الخاصية عدد البايتات (أو الكائنات) الجاهزة للقراءة في الطابور. تقدم هذه القيمة البيانات الداخلية المخزنة بما يتوافق مع حالة highWaterMark.

readable.resume()‎

سجل التغييرات

الإصدار التغييرات
10.0.0 لا يملك resume()‎‎ تأثير إذا كان هناك استماع للحدث  'readable'.
0.9.4 أُضيف في الإصدار 0.9.4.

يسبب التابع readable.resume()‎ إيقاف صريح لمجرى Readable  لاستئناف إطلاق أحداث 'data'، قالبًا المجرى إلى نمط التدفق.

يمكن أن يُستخدم التابع readable.resume()‎ لاستهلاك كامل للبيانات من المجرى دون معالجة أي منها فعليًا.

getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Reached the end, but did not read anything.');
  });

لا يملك التابع readable.resume()‎ تأثير إذا كان هناك مُنصت للحدث  'readable'.

readable.setEncoding(encoding)‎

أُضيف في الإصدار: 0.9.4.

  • ‎‎<string>‎ :encoding الترميز المراد استعماله.
  • القيمة المعادة: ‎<this>

يضبط التابع readable.setEncoding()‎ ترميز المحارف للبيانات المقروءة من المجرى Readable.

بشكل افتراضي، لا يُخصص أي ترميز وستعاد بيانات المجرى في كائنات من النوع Buffer، ضبط الترميز يتسبب بأن تُعاد بيانات المجرى كسلسلة نصية مرمّزة بالترميز المحدد بدلًا من إعادتها في كائنات من النوع Buffer. على سبيل المثال، سيسسب استدعاء readable.setEncoding('utf8')‎ بأن تُفسر بيانات الخرج كبيانات مرمزة بالترميز UTF-8، وتُمرر كسلاسل نصية. سوف يسبب استدعاء readable.setEncoding('hex')‎ أن تُرمّز البيانات بشكل سلسلة نصية ستة عشرية.

سيعالج المجرى Readable المحارف متعددة البايتات المُستلمة خلال المجرى بصورة صحيحة وإلّا قد تصبح مرمّزة بشكل غير صحيح إذا سُحبت من المجرى ببساطة في كائنات من النوع Buffer.

const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
  assert.equal(typeof chunk, 'string');
  console.log('got %d characters of string data', chunk.length);
});
readable.unpipe([destination]‎)‎

أضيف في الإصدار:0.9.4.

  • <stream.Writable>‎ :‎destination مجرى محدد اختياريًا
  • القيمة المُعادة: ‎<this>

يفصل التابع readable.unpipe()‎ مجرى Writable المربوط مؤخرًا باستخدام التابع  stream.pipe()‎.

إذا لم يُحدد مجرى الوجهة (destination)، ستفصل حينذاك كل الأنابيب(الممرات).

إذا كان destination محددًا، ولكن لا يوجد أنابيب(قنوات) مُنصّبة عليه، لن يفعل التابع حينذاك شيئًا.

const fs = require('fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');



// 'file.txt' إلى readable تذهب كل البيانات من المجرى القابل للقراءة  
//ولكن فقط للثانية الأولى

readable.pipe(writable);
setTimeout(() => {
  console.log('Stop writing to file.txt');
  readable.unpipe(writable);
  console.log('Manually close the file stream');
  writable.end();
}, 1000);
readable.unshift(chunk)‎

سجل التغييرات

الإصدار التغييرات
8.0.0 يمكن أن يكون الآن الوسيط  chunk نسخةَ Uint8Array.
0.9.11 أُضيف في الإصدار: 0.9.11.
  • <Buffer>‎ ‎| ‎<Uint8Array>‎ ‎| ‎<string>‎ ‎| ‎<any>‎  :chunk  قطع البيانات المراد منعها من القفز (الإزاحة) فوق الطابور. من أجل المجاري غير المشغّلة في نمط الكائن، يجب أن تكون chunk سلسلة نصية أو كائنًا من النوع Buffer أو النوع Uint8Array. من أجل المجاري في نمط الكائن، يمكن أن يقبل chunk أي نوع من أنواع بيانات JavaScript باستثناء null.

يدفع التابع readable.unshift()‎‎ قطع البيانات خلفًا إلى ذاكرة التخزين المؤقتة الداخلية. هذا مفيد في حالات معيّنة تُقرَأ فيها بيانات محدَّدة لا حاجة لقراءتها من المجرى، لذلك يُمكن أن تُزاح هذه البيانات لتُمرر أخرى إلى طرف آخر.

هذا مفيد في حالات معينة حيث يُستهلك المجرى بشيفرات تتطلب "عدم استهلاك" لبعض كمية البيانات المسحوبة بشكل مثالي من المصدر، لذلك يمكن أن تُمرر البيانات إلى بعض الأطراف الأخرى.

لايمكن أن يُستدعى التابع stream.unshift(chunk)‎ بعد أن أُطلق الحدث 'end' أو سوف يُرمى خطأ وقت التشغيل.

ينبغي على المطورين المستخدمين للتابع stream.unshift()‎ التبديل إلى استعمال مجرى التحويل (Transform) بدلًا عن المجرى Readable. انظر مقطع الواجهات البرمجية لمنفذي المجاري للمزيد من المعلومات.

// \n\nانتزاع العنوان المحدّد ب 
//إذا حصلت على الكثير unshift()استخدم
//(error, header, stream) (استدعِ دالة رد النداء مع (الخطأ، المجرى، العنوان 

const { StringDecoder } = require('string_decoder');
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  const decoder = new StringDecoder('utf8');
  let header = '';
  function onReadable() {
    let chunk;
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk);
      if (str.match(/\n\n/)) {
        
        //أوجد حدود العنوان
        const split = str.split(/\n\n/);
        header += split.shift();
        const remaining = split.join('\n\n');
        const buf = Buffer.from(remaining, 'utf8');
        stream.removeListener('error', callback);
        
       // unshift() قبل استدعاء 'readable' أزل مُنصت   

        stream.removeListener('readable', onReadable);
        if (buf.length)
          stream.unshift(buf);
       
        // يمكن الآن قراءة جسم الرسالة من المجرى
        callback(null, header, stream);
      } else {
       
       //لا يزال يقرأ العنوان
        header += str;
      }
    }
  }
}

خلافًا للتابع stream.push(chunk)‎، لن ينهي التابع stream.unshift(chunk)‎ عملية القراءة بإعادة تصفير  حالة القراءة الداخلية للمجرى. يمكن أن يسبب هذا نتائج غير متوقعة إذا استُدعي readable.unshift()‎ أثناء القراءة (أي من داخل تنفيذ stream._read()‎ على مجرى مخصص).

اتباع استدعاء التابع readable.unshift()‎‎ بالتابع stream.push('')‎  مباشرةً سوف يعيد تصفير حالة القراءة بشكل مناسب، ولكن الأفضل ببساطة هو تجنب استدعاء readable.unshift()‎‎ أثناء عملية تنفيذ القراءة.

readable.wrap(stream)‎

أضيف في الإصدار: 0.9.4.

  • <Stream>‎ :stream مجرى قابل للقراءة من "نمط قديم" (old style).
  • القيمة المُعادة: ‎<this>‎

قبل الإصدار Node.js 0.10، لم تطبّق المجاري كامل الواجهة البرمجية للوحدة stream كما هي معرّفة الآن. (انظر التوافق مع إصدارات Node.js الأقدم للمزيد من المعلومات.)

عند استخدام مكتبات Node.js أقدم والتي تطلق الأحداث 'data' وفيها التابع stream.pause()‎ الذي هو إرشادي فقط، يمكن أن يُستخدم التابع readable.wrap()‎ لإنشاء مجرًى قابل للقراءة يستخدم المجرى القديم كمصدر لبياناته.

سيكون من النادر الحاجة إلى استخدام readable.wrap()‎ ولكن التابع قُدّم كملائمة للتفاعل مع تطبيقات Node.js أقدم.

const { OldReader } = require('./old-api-module.js');
const { Readable } = require('stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);

myReader.on('readable', () => {
  myReader.read(); //...الخ
});
readable[Symbol.asyncIterator]‎(‎)‎

أضيف في الإصدار: 10.0.0.

الاستقرار: 1- تجريبي

القيمة المعادة:  ‎<AsyncIterator>‎ لاستهلاك كامل المجرى.

const fs = require('fs');

async function print(readable) {
  readable.setEncoding('utf8');
  let data = '';
  for await (const k of readable) {
    data += k;
  }
  console.log(data);
}

print(fs.createReadStream('file')).catch(console.log);

إذا انتهت الحلقة التكرارية ب break أو throw، فسوف يُدمّر المجرى. بعبارة أخرى، سوف يستهلك التكرار عبر المجرى جميع بياناته ويقرأ المجرى كاملًا. سيُقرَأ المجرى بقطع بيانات ذات حجم مساوٍ لقيمة الخيار highWaterMark. في مثال الشيفرة في الأعلى، ستكون البيانات في قطعة وحيدة إذا حوى الملف على أقل من 64 كيلو بت من البيانات لأنه قيمة الخيار  highWaterMark  لم تُقدّم للتابع fs.createReadStream()‎.

المجاري المزدوجة (Duplex) ومجاري التحويل (Transform)

الصنف stream.Duplex

سجل التغييرات

الإصدار التغييرات
6.8.0 ستعيد الآن نُسخ Duplex القيمة true عند فحص instanceof stream.Writable
0.9.4 أضيف في 0.9.4.

المجاري المزدوجة (Duplex) هي المجاري التي تطبّق كلا واجهتي المجرى القابل للقراءة (Readable) والمجرى القابل للكتابة (Writable).

تتضمن أمثلة مجاري Duplex:

الصنف: stream.Transform

أُضيف في الإصدار: 0.9.4.

مجاري التحويل (Transform streams) هي مجاري من النوع المزدوج (Duplex) حيث أنَّ الخرج مرتبط بطريقة ما مع الدخل. مثل كل مجاري Duplex، تطبّق مجاري Transform كلا واجهتي المجرى القابل للقراءة (Readable) والمجرى القابل للكتابة (Writable).

من الأمثلة عن المجاري Transform:

transform.destroy([error]‎)‎

أُضيف في الإصدار: 8.0.0.

يدّمر المجرى، ويطلق الحدث 'error'. بعد هذا الاستدعاء، سيحرر مجرى التحويل أي مصادر داخلية، ينبغي على المستخدمين ألّا يعيدو تعريف هذا التابع، ولكن يمكنهم تطبيق readable._destroy()‎ بدلًا عنه. التنفيذ الافتراضي لتابع ‎_destroy()‎ من أجل المجرى Transform يطلق أيضًا الحدث 'close'.

stream.finished(stream, callback)‎

أُضيف في الإصدار:  10.0.0.

  • <Stream>‎ :‎stream مجرى قابل للقراءة و/أو قابل للكتابة.
  • <Function>‎ :callback دالة رد نداء والتي تأخذ وسيط خطأ اختياري.

دالة تستعمل للحصول على إشعارات عندما لا يعود المجرى قابلًا للقراءة، أو قابلًا للكتابة أو واجه خطأً ما، أو جرى إغلاقه بشكل سابق لأوانه.

const { finished } = require('stream');

const rs = fs.createReadStream('archive.tar');

finished(rs, (err) => {
  if (err) {
    console.error('Stream failed', err);
  } else {
    console.log('Stream is done reading');
  }
});

rs.resume(); //تفريغ المجرى

هذا التابع مفيد خصوصًا في حالات معالجة الخطأ الذي يُدمّر فيه المجرى بشكل سابق لأوانه (مثل طلب HTTP مُجهض) ولم يُطلق الحدث 'end' أو 'finish'. الواجهة finished البرمجية هي promisify'able  قابلة للتعامل مع الوعود (promise) كذلك.

const finished = util.promisify(stream.finished);

const rs = fs.createReadStream('archive.tar');

async function run() {
  await finished(rs);
  console.log('Stream is done reading');
}

run().catch(console.error);
rs.resume(); // تفريغ المجرى

stream.pipeline(...streams[, callback]‎)‎

أُضيف في الإصدار: 10.0.0.

  • <Stream>‎  : ‎.‎.‎.‎streams مجرييان أو أكثر للربط بينهما.
  • <Function>‎ :callback دالة رد نداء تأخذ وسيط خطأ اختياري.

تابع نموذجي للنقل (الربط) بين المجاري موجهةً الأخطاء ومنظفةً بشكل ملائم و مُزوِدةً  بدالة رد نداء عندما ينتهي خط النقل.

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');



//البرمجية لربط سلسلة مجاري معًا بسهولة والحصول على إشعارات pipeline استخدم واجهة  
//عندما يتم التوصيل والنقل بشكل كامل
// :ضخمًا بكفاءة tar يحتمل ملف gzip خط النقل إلى   
 



pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

الواجهة البرمجية pipeline  قابلة للتعامل مع الوعود (promise) أيضًا:

const pipeline = util.promisify(stream.pipeline);

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz')
  );
  console.log('Pipeline succeeded');
}

run().catch(console.error);

الواجهات البرمجية لمنفذي المجاري

صُممت الواجهات البرمجية للوحدة stream لتجعل بالإمكان تنفيذ المجاري بسهولة باستعمال وحدة الوراثة النموذجية في JavaScript.

أولًا، سيعرّف مطوّر المجرى صنف جافا سكريبت جديد والذي سيكون توسعة لأحد أصناف المجاري الأساسية الأربعة

(stream.Writable أو stream.Readable أو stream.Duplex أو stream.Transform) ضامنًا أنها تستدعي باني صنف الأب المناسب:

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    super(options);
    // ...
  }
}

ثم يجب أن ينفّذ صنف المجرى الجديد واحدة أو اكثر من التوابع المحددة بالاعتماد على نوع المجرى الذي يُننشأ، كما هو مفصّل في المخطط أدناه:

حالة الاستخدام الصنف توابع للتطبيق
قراءة فقط Readable ‎_read
كتابة فقط Writable _write, ‎_writev‎, _final
قراءة وكتابة Duplex _read,‎ ‎_write‎, ‎_writev‎, ‎_final‎

لا ينبغي أبدًا أن تستدعي شيفرة التنفيذ للمجرى التوابع "العامة" (public) للمجرى والتي ضُمّنت للاستخدام من قبل المستهلكين (كما هو موصوف في مقطع الواجهات البرمجية لمستهلكي المجرى). فعل ذلك قد يقود إلى تأثيرات جانبية معاكسة في شيفرة التطبيق المستهلك للمجرى.

البناء المبسّط (Simplified Construction)

أُضيف في الإصدار: 1.2.0.

من أجل العديد من الحالات البسيطة، من الممكن انشاء مجرىً دون الإعتماد على الوراثة، يمكن أن يُنجز هذا بانشاء مباشر لنسخ من الكائنات stream.Writable أو stream.Readable أو stream.Duplex أو stream.Transform وتمرير التوابع المناسبة كخيارات للباني.

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  }
});

تنفيذ مجرى قابل للكتابة

وُسِع الصنف stream.Writable لينفذ المجرى Writable.

يجب أن تستدعي مجاري Writable المخصصة الباني new stream.Writable([options]‎)‎ وتنفّذ التابع

writable._write(‎)‎‎. ويمُكن أن يُنفّذ التابع ‎‎writable._writev(‎)‎ أيضًا.

new stream.Writable([options‎]‎)‎

الإصدار التغييرات
10.0.0 أضيف الخيار emitClose  ليحدد فيما إذا أطلق 'close' عند الهدم.
  •  ‎<Object>options
    • ‎‎<number>‎ :highWaterMark مستوى الذاكرة المؤقتة عندما يبدأ stream.write()‎ بإعادة false. القيمة الإفتراضية:  16384 (16kb) أو 16 من أجل المجاري objectMode.
    • <boolean>‎ :decodeStrings قيمة منطقية تحدد إذا كان يراد ترميز السلاسل النصية ككائنات من النوع Buffer أم لا  قبل تمريرها إلى stream._write()‎، مستخدمًا الترميز المحدد في استدعاء stream.write()‎. القيمة الإفتراضية: true.
    • <string>‎ :defaultEncoding الترميز الافتراضي الذي يُستخدم عندما لا يحدد ترميز كوسيط في stream.write()‎. القيمة الإفتراضية: 'utf8'.
    • ‎‎<boolean>‎ :objectMode قيمة منطقية تحدد فيما إذا كانت stream.write(anyObj)‎ عملية صالحة أم لا. عند ضبطها، يصبح من الممكن كتابة قيم من أي نوع من أنواع  JavaScript باستثناء السلاسل النصية، أو Buffer أو Uint8Array  إذا دُعمت من قبل منفّذ المجرى. القيمة الإفتراضية: false.
    • ‎‎<boolean>‎ :emitClose قيمة منطقية تحدد فيما إذا كان ينبغي أن يطلقَ المجرى الحدثَ 'close' بعد أن يُدمّر أو لا. القيمة الإفتراضية: true.
    • <Function>‎ :write تنفيذ للتابع stream._write()‎.
    • ‎‎‎<Function>‎ :writev تنفيذ للتابع stream._writev()‎ .
    • <Function>‎ :destroy تنفيذ للتابع stream._destroy()‎.
    • ‎‎<Function>‎ :final تنفيذ للتابع stream._final()‎.
const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
   
//stream.Writable() استدعِ باني ‎ 

    super(options);
    // ...
  }
}

أو عند استخدام البواني وفق النمط الذي يسبق ES6:

const { Writable } = require('stream');
const util = require('util');

function MyWritable(options) {
  if (!(this instanceof MyWritable))
    return new MyWritable(options);
  Writable.call(this, options);
}
util.inherits(MyWritable, Writable);

أو، باستخدام نهج الباني المبسّط :

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  }
});

writable._write(chunk, encoding, callback)‎

  • <Buffer>‎ |‎ <string>‎ |‎ <any>‎ :chunk قطعة البيانات المراد كتابتها. سوف تكون دائمًا من النوع buffer إلا إذا كان الخيار decodeStrings مضبوطًا إلى القيمة false أو أن المجرى يعمل في نمط الكائن.
  • <string>‎ :encoding إذا كانت القطعة هي سلسلة نصية فإن encoding سيكون هو ترميز المحارف لهذه السلسلة. إذا كانت القطعة chunck من النوع Buffer، أو أن المجرى يعمل في نمط الكائن، سيُتجاهل الوسيط encoding.
  • <Function>‎ :callback يستدعي هذه الدالة (مع وسيط خطأ اختياري) عندما تنتهي معالجة القطع المعطاة.

يجب أن توفر كل مجاري Writable التابع writable._write()‎  لإرسال البيانات إلى المصادر الأساسية.

تقدّم مجاري التحويل (Transform) تنفيذها الخاص للتابع  writable._write()‎.

يجب ألّا تُستدعَى هذه الدالة من قبل الشيفرة التطبيق مباشرةً. يجب أن تُنفّذ باستخدام صنف ابن، وتُستدعى من قبل توابع الصنف Writable الداخلية فقط.

يجب أن تستدعَى الدالة callback لتشير إمّا إلى انتهاء الكتابة بنجاح أو فشلها مع خطأ. يجب أن يكون الوسيط الأول المُمرر إلى callback هو كائن Error إذا فشل الإستدعاء أو null إذا نجحت الكتابة.

كل الاستدعاءات للتابع writable.write()‎ التي تحصل بين وقت استدعاء writable._write()‎ واستدعاء الدالة callback سوف تؤدي إلى تخزين البيانات المكتوبة مؤقتًا. عندما تستدعى الدالة callback، قد يطلق المجرى الحدث 'drain'. إذا كان تطبيق المجرى قادرًا على معالجة قطع متعددة من البيانات مرّة واحدة، فينبغي أن يُنفَّذ التابع writable._writev()‎.

إذا كانت الخاصية decodeStrings  مضبوطة بشكل صريح إلى false في خيارات الباني. عند ذلك ستبقى chunk نفس الكائن الذي يمُرر إلى التابع ‎‎.write()‎  وقد تكون سلسلة نصية بدلًا من Buffer. هذا من أجل دعم التطبيقات التي تمتلك معالجة مثالية لبعض ترميزات البيانات النصية. في هذه الحالة، سوف يحدد الوسيط encoding ترميز محارف السلسلة النصية. وإلّا، سوف يُهمل الوسيط encoding بشكل آمن.

سيكون التابع writable._write()‎  مسبوقًا بالرمز _ لأنه داخلي للصنف الذي عرّفه، وينبغي ألّا يُستدعى مطلقًا بشكل مباشر من قبل برامج المستخدم.

writable._writev(chunks, callback)

  • <Object‎[‎]‎>‎ :chunks القطع التي ستُكتب. كل قطعة لها الشكل التالي: ‎{ chunk:‎ ..., encoding:‎ ... }‎‎.
  • <Function>‎ :callback دالة رد نداء (مع وسيط خطأ بشكل اختياري) لتُستدعى عند انتهاء المعالجة للقطع المُزوّدة.

يجب ألّا تُستدعى هذه الدالة من قبل شيفرة التطبيق مباشرةً، يجب أن تُنفّذ من قبل صنف ابن، وتُستدعى من قبل توابع الصنف Writable الداخلية فقط.

يمكن أن يُنفّذ التابع writable._writev()‎ بالإضافة إلى writable._write()‎ في تطبيقات المجاري والتي تمتلك القدرة على معالجة قطع متعددة من البيانات في المرة الواحدة. إذا نُفّذت، سوف يُستدعى التابع مع كل قطع البيانات المخزّنة حاليًا في طابور الكتابة.

يُسبق التابع writable._writev()‎  بالرمز _ لأنّه داخلي للصنف الذي عرّفه، وينبغي ألّا يُستدعى مطلقًا من قبل من قبل برامج المستخدم.

writable._destroy(err, callback)

أضيف في الإصدار:  8.0.0.

  • ‎‎<Error>‎ :err خطأ محتمل.
  • <Function>‎ :callback دالة رد نداء والتي تأخذ وسيط خطأ اختياري.

يُستدعى التابع ‎_destroy()‎ من قبل التابع writable.destroy()‎. يمكن أن يُعاد تعريفه من قبل صنف ابن ولكن يجب ألّا يُستدعى بشكل مباشر.

writable._final(callback)‎‎

أضيف في الإصدار: 8.0.0.

<Function>‎ :callback يستدعي هذه الدالة (مع وسيط خطأ اختياري) عند انتهاء كتابة أي بيانات متبقية.

يجب ألّا يُستدعى التابع ‎_final()‎ بشكل مباشر. يمكن تنفيذه من قبل صنف ابن؛ وإذا كان ذلك، سوف يُستدعى مع توابع الصنف Writable الداخلية فقط.

سوف تُستدعى هذه الدالة الاختيارية قبل أن يغلق المجرى، مؤخرةً الحدث 'finish' حتى تستدعى الدالة callback. هذا مفيد لإغلاق المصادر أو كتابة البيانات المخزّنة مؤقتًا قبل أن ينتهي المجرى.

أخطاء أثناء الكتابة

يُنصح بأن يُكتب تقرير بالأخطاء الحاصلة أثناء معالجة التابعين writable._write()‎ و writable._writev()‎ عن طريق استدعاء دالة رد النداء وتمرير الخطأ كأول وسيط. هذا سوف يسبب انطلاق الحدث 'error' من قِبَل المجرى Writable. رمي خطأ Error أثناء writable._write()‎ يمكن أن ينتج سلوك غير متوقّع ومتضارب بالاعتماد على كيفية استعمال المجرى. يضمن استخدام دالة رد النداء معالجة أخطاء متناسقة ومتوقعة.

إذا كان المجرى Readable موصولًا بأنبوب مع المجرى Writable وأطلق Writable خطأً في هذه الأثناء، فسوف ينفصل المجرى Readable.

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
});

مثالٌ عن مجرى قابل للكتابة (Writable)

يشرح التالي تنفيذ مجرى مخصص من النوع Writable  وبسيط إلى حد ما (عديم الجدوى إلى حدٍ ما). بينما تكون نسخة المجرى Writable المحدد ليست ذات فائدة محددة حقيقية. يشرح المثال كل العناصر المطلوبة لنسخة مجرى Writable مخصصة:

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    super(options);
    // ...
  }

  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
}

فك ترميز الذواكر المؤقتة في المجرى القابل للكتابة (Writable)

فك ترميز الذواكر المؤقتة هو مهمة شائعة، على سبيل المثال، عند استخدام محوّلات دخلها هو سلسلة نصية. فهي ليست عملية بديهية عند استخدام ترميز محارف متعدد البايتات، مثل UTF-8. يعرض المثال التالي كيفية فك ترميز سلاسل متعددة البايتات باستخدام StringDecoder و Writable.

const { Writable } = require('stream');
const { StringDecoder } = require('string_decoder');

class StringWritable extends Writable {
  constructor(options) {
    super(options);
    this._decoder = new StringDecoder(options && options.defaultEncoding);
    this.data = '';
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk);
    }
    this.data += chunk;
    callback();
  }
  _final(callback) {
    this.data += this._decoder.end();
    callback();
  }
}

const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();

w.write('currency: ');
w.write(euro[0]);
w.end(euro[1]);

console.log(w.data); //€ :عملة

تنفيذ مجرى قابل للقراءة (Readable)

يُوسّع الصنف stream.Readable لينفّذ مجرى قابل للقراءة (Readable).

يجب أن تستدعي مجاري Readable المخصصة الباني stream.Readable([options]‎)‎ وتنفّذ التابع readable._read()‎.

new stream.Readable([options]‎)‎

  • ‎<Object> :options
    •  ‎‎<number> ‎:highWaterMark العدد الأعظمي من البايتات لتُخزّن في الذاكرة المؤقتة الداخلية قبل ابطال القراءة من المصادر الأساسية. القيمة الإفتراضية: 16384 (16kb) أو 16  لأجل مجاري objectMode.
    • <string> ‎:encoding إذا كان محدّدًا،عندئذ سوف يُفك ترميز الذاكرة إلى سلاسل نصية باستخدام الترميز المحدد. القيمة الإفتراضية: null.
    • <boolean> ‎:objectMode قيمة منطقية تحدد فيما إذا كان ينبغي أن يتصرف هذا المجرى كمجرى من الكائنات. يعني أن يعيد stream.read(n)‎ قيمة وحيدة بدلًا من Buffer بالحجم n. القيمة الإفتراضية: false.
    • <Function> ‎:read تنفيذ للتابع stream._read()‎.
    • ‎‎<Function>‎ :destroy تنفيذ للتابع stream._destroy()‎.
const { Readable } = require('stream');

class MyReadable extends Readable {
  constructor(options) {
    
//stream.Readable(options)‎  يستدعي الباني 
    super(options);
    // ...
  }
}

أو عند استخدام البواني بنمط سابق للنمط ES6:

const { Readable } = require('stream');
const util = require('util');

function MyReadable(options) {
  if (!(this instanceof MyReadable))
    return new MyReadable(options);
  Readable.call(this, options);
}
util.inherits(MyReadable, Readable);

أو عند استخدام نهج الباني المبسّط:

const { Readable } = require('stream');

const myReadable = new Readable({
  read(size) {
    // ...
  }
});

readable._read(size)‎

سجل التغييرات

الإصدار التغييرات
10.0.0 استدعاء ‎_‎‎read()‎ لمرّة واحدة في كل جزء صغير من النبضة (microtick).
0.9.4 أُضيف في الإصدار: 0.9.4.
  • <number>‎ :size عدد البايتات المراد قراءتها بشكل غير متزامن.

يجب ألّا تُستدعى هذه الدالة من قبل شيفرة التطبيق مباشرةً. يجب أن تنفّذ من قبل أصناف ابن. وتُستدعى من قبل توابع الصنف Readable الداخلية فقط.

كل تطبيقات المجاري القابلة للقراءة (Readable) يجب أن توفر تنفيذًا للتابع readable._read()‎ لجلب البيانات من المصادر الأساسية.

عندما يُستدعى readable‎.‎_‎read()‎، إذا كانت البيانات متوافرة من المصدر، ينبغي أن يبدأ التطبيق بدفع البيانات إلى داخل طابور القراءة باستخدام التابع this.‎push(dataChunk)‎. ينبغي أن يكمل ‎_‎read()‎ القراءة من المصادر ودفع البيانات حتى يعيد readable.push()‎ القيمة false. فقط عندما يُستدعى ‎_‎read()‎ مجدّدًا بعد أن توقف، ينبغي أن يستأنف دفع بيانات إضافية فوق الطابور.

حالما استُدعي التابع readable._read()‎، لن يُستدعى مجدّدًا حتى يُستدعى التابع readable.push()‎. يُضمن أن يُستدعى readable._read()‎ لمرّة واحدة فقط خلال التنفيذ المتزامن؛ أي -6 ^10 من النبضة (microtick).

الوسيط size هو إرشادي. من أجل تطبيقات تكون فيها القراءة هي عملية وحيدة تعيد بيانات، يمكن أن تستخدم الوسيط size  لتحديد كمية البيانات المراد جلبها. قد تتجاهل تطبيقات أخرى هذا الوسيط وتقدّم البيانات ببساطة عندما تصبح متوافرة. لا يوجد حاجة للإنتظار حتى تتوافر بايتات بالحجم size قبل استدعاء stream.push(chunk)‎.

يُسبق التابع readable.‎_‎read()‎ بالرمز _ لأنّه داخلي للصنف الذي عرّفه، ولا ينبغي أبدًا أن يُستدعى بشكل مباشر من قبل برامج المستخدم.

readable._destroy(err, callback)‎

أُضيف في الإصدار: 8.0.0

  • <Error>‎ :err خطأ مُحتمل.
  • <Function>‎ :callback دالة رد نداء والتي تأخذ وسيط خطأ اختياري.

يُستدعى تابع ‎_‎destroy()‎  من قبل readable.destroy()‎. يمكن أن يُعاد تعريفه من قبل صنف ابن ولكن يجب ألّا يُستدعى بشكل مباشر.

readable.push(chunk[, encoding]‎)‎

سجل التغييرات

الإصدار التغييرات
8.0.0 يمكن الآن أن يكون الوسيط chunk نسخة من Uint8Array.
  • <Buffer>‎ | <Uint8Array>‎ | <string>‎ | <null>‎ | <any>‎ :chunk قطعة من البيانات للدفع إلى طابور القراءة. من أجل المجاري التي لا تعمل في نمط الكائن، يجب أن تكون chunk هي سلسلة نصية أو Buffer أو Uint8Array. من أجل المجاري التي تعمل في نمط الكائن، يمكن أن تكون chunk أي نوع من أنواع بيانات JavaScript.
  • <string>‎ :encoding ترميز قطع السلاسل النصية. يجب أن يكون ترميز Buffer صالحًا، مثل 'utf8' أو 'ascii'.
  • القيمة المُعادة: ‎<boolean>‎‎  تكون القيمة true إذا كان هناك قطع اضافية من البيانات قد يستمر دفعها، وإلّا فستكون false.

عندما تكون chunk هي من النوع Buffer أو Uint8Array أو string، ستضاف البيانات التي تحتويها إلى الطابور الداخلي لمستخدمي المجاري لتصبح متوافرة للقراءة والاستهلاك من قبل مستخدمي المجرى. تمرير القيمة null إلى الخيار chunk يشير إلى نهاية المجرى ‎(EOF)‎، أي لا يوجد بعدها المزيد من البيانات لتكتب.

عندما يعمل Readable في نمط التوقف المؤقت (paused mode)، يمكن قراءة البيانات المضافة مع التابع readable.push()‎ باستدعاء التابع readable.read()‎ عندما يُطلق الحدث 'readable'.

عندما يعمل Readable في نمط التدفق، ستُستلم كل البيانات المضافة مع readable.push()‎ بمجرد إطلاق الحدث 'data'.

يُصمم التابع readable.push()‎ ليكون مرنًا قدر الإمكان. على سبيل المثال، عند تغليف مصدر منخفض المستوى (lower-level source) والذي يقدّم بعض أشكال آليات التوقف أو الإستئناف، ورد نداء البيانات، يمكن أن يُغلَّف المصدر منخفض المستوى من قبل نسخة Readable مخصصة:

//readStop()‎ و readStart() المصدر هو كائن مع التابعين
//والذي يحصل على استدعاء عندما يمتلك بيانات  ‎`ondata` ومع العضو 
//والذي يحصل على استدعاء عندما تنتهي البيانات `onend` ومع العضو


class SourceWrapper extends Readable {
  constructor(options) {
    super(options);

    this._source = getLowlevelSourceObject();

   
//كل مرّة توجد بيانات ، ادفعها إلى داخل الذاكرة المؤقتة الداخلية 


    this._source.ondata = (chunk) => {

//  عند ذلك أوقف القراءة من المصدر،false القيمة push()إذا أعاد التابع

      if (!this.push(chunk))
        this._source.readStop();
    };

   

//`null`عندما ينتهي المصدر، ادفع قطعة اشارات النهاية

    this._source.onend = () => {
      this.push(null);
    };
  }



// عندما يريد المجرى سحب المزيد من البيانات إليه _read‎‎ سوف يُستدعى
//يُتجاهل وسيط الحجم الموصى به في هذه الحالة

  _read(size) {
    this._source.readStart();
  }
}

يُعَدّ التابع eadable.push()‎  للاستدعاء فقط من قبل مُنفّذات المجرى Readable، وفقط من داخل التابع readable._read()‎.

من أجل المجاري التي لا تعمل في نمط الكائن، إذا كان معامل chunk الخاص بالتابع  readable.push()‎‎ هو undefined، سيُعامل كسلسلة نصية فارغة أو ذاكرة تخزين مؤقت فارغة. انظر readable.push('')‎ للمزيد من المعلومات.

أخطاء أثناء القراءة

يوصى بأن تُطلَق الأخطاء الحاصلة خلال معالجة التابع readable._read()‎ باستخدام الحدث 'error' بدلًا من أن تُرمى فقط. قد ينتج رمي خطأٍ من داخل readable._read()‎ إلى سلوك غير متوقع و متضارب بالإعتماد على إذا كان المجرى يعمل في نمط التدفق أو النمط المتوقف. يضمن استخدام الحدث 'error' معالجة أخطاء متوقعة ومتناسقة.

const { Readable } = require('stream');

const myReadable = new Readable({
  read(size) {
    if (checkSomeErrorCondition()) {
      process.nextTick(() => this.emit('error', err));
      return;
    }
// القيام ببعض الأعمال
  }
});

مثالٌ عن مجرى عدٍّ (Counting Stream)

التالي هو مثال أساسي لمجرى قابل للقراءة (Readable) والذي يطلق الأعداد من 1 إلى 1,000,000 بترتيب تصاعدي ومن ثم ينتهي.

const { Readable } = require('stream');

class Counter extends Readable {
  constructor(opt) {
    super(opt);
    this._max = 1000000;
    this._index = 1;
  }

  _read() {
    const i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      const str = String(i);
      const buf = Buffer.from(str, 'ascii');
      this.push(buf);
    }
  }
}

تنفيذ مجرى مزدوج (Duplex)

المجرى المزدوج (Duplex) هو الذي ينفِّذ كلا الواجهتين Readable و Writable، مثل اتصال المقبس TCP.

لأنّ JavaScript لا تدعم الوراثة المتعددة، وُسِّع الصنف stream.Duplex لينفّذ المجرى Duplex (كمقابل لتوسعة الصنفين stream.Readable  و stream.Writable).

يرث stream.Duplex من الصنف stream.Readable بشكل نموذجي (prototypically) ومن الصنف stream.Writable بشكل طفيلي (parasitically)، ولكن سوف يعمل instanceof بشكل صحيح لأجل كلا الصنفين الأساسيين بسبب إعادة تعريف Symbol.hasInstance على stream.Writable.

يجب أن تستدعي مجاري Duplex المخصصة الباني new stream.Duplex([options]‎)‎ وتنفّذ كلا التابعين readable._read()‎ و writable._write()‎.

new stream.Duplex(options)‎

سجل التغييرات

الإصدار التغييرات
8.4.0 دُعم الآن الخياران readableHighWaterMark و writableHighWaterMark
  • <Object> ‎:options يمرر إلى كلا البانيين Writable و Readable. ويملك أيضًا الحقول الآتية:
    • <boolean>‎ :allowHalfOpen قيمة منطقية إذا كانت false،فسوف ينهي المجرى تلقائيًا الطرف القابل للكتابة عندما ينتهي الطرف القابل للقراءة. القيمة الافتراضية: true.
    • <boolean> ‎:readableObjectMode يضبط هذا الخيار الوضع objectMode للطرف القابل للقراءة في المجرى. ليس له تأثير إذا كانت objectMode هي true. القيمة الافتراضية: false.
    • ‎<boolean>‎ :writableObjectMode يضبط هذا الخيار الوضع objectMode للطرف القابل للكتابة في المجرى. ليس له تأثير إذا كانت objectMode  هي true. القيمة الافتراضية: false.
    • <number>‎ :readableHighWaterMark يضبط القيمة highWaterMark لأجل الطرف القابل للقراءة من المجرى. لا يملك تأثيرًا إذا كانت القيمة highWaterMark معطاة.
    • <number> :writableHighWaterMark‎ يضبط القيمة highWaterMark لأجل الطرف القابل للكتابة من المجرى. لا يملك تأثيرًا إذا كانت القيمة highWaterMark معطاة.
const { Duplex } = require('stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    // ...
  }
}

أو عند استخدام البواني بنمط سابق للنمط ES6:

const { Duplex } = require('stream');
const util = require('util');

function MyDuplex(options) {
  if (!(this instanceof MyDuplex))
    return new MyDuplex(options);
  Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);

أو باستخدام نهج الباني المُبسّط:

const { Duplex } = require('stream');

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  }
});

مثال على مجرى مزدوج (Duplex)

يشرح التالي مثال بسيط عن مجرى من النوع المزدوج (Duplex) والذي يغلّف كائن مصدر افتراضي منخفض المستوى إلى أي بيانات يمكن أن تُكتب وأن تُقرأ، ولو باستخدام واجهات برمجية غير متوافقة مع مجاري Node.js. يشرح المثال البسيط التالي مجرى Duplex والذي يخزّن مؤقتًا البيانات القادمة والمكتوبة عليه بواسطة الواجهة Writable ثم يعاد قراءتها مجدَّدًا (والتي تُقرأ تراجعيًا) بواسطة الواجهة Readable.

const { Duplex } = require('stream');
const kSource = Symbol('source');

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options);
    this[kSource] = source;
  }

  _write(chunk, encoding, callback) {
  
// المصادر الأساسية تتعامل فقط مع السلاسل النصية
    if (Buffer.isBuffer(chunk))
      chunk = chunk.toString();
    this[kSource].writeSomeData(chunk);
    callback();
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding));
    });
  }
}

النقطة الأكثر أهمية في مجاري Duplex هي أنّ كل طرف من الطرفين Readable و Writable  يعمل بشكل مستقل عن الآخر رغم تواجدهما داخل نسخة كائن واحدة.

تشغيل المجاري المزدوجة بنمط الكائن

من أجل المجاري المزدوجة (Duplex)، يمكن أن يُضبط الخيار objectMode على وجه الحصر إمّا على الطرف Readable أو الطرف Writable باستخدام الخيارات readableObjectMode و writableObjectMode على التوالي.

في المثال التالي، على سبيل المثال، يُنشأ مجرى جديد من النوع Transform (والذي هو نوع من المجاري المزدوجة (Duplex)) والذي يمتلك طرف قابل للكتابة (Writable) بنمط الكائن والذي يقبل أرقام JavaScript  حُوّلت إلى سلاسل نصية ستة عشرية على الطرف Readable.

const { Transform } = require('stream');



كل مجاري التحويل (Transform) هي أيضًا مجاري مزدوجة (Duplex)//

const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
   

//ضغط القطعة إلى رقم إذا كان ضروريًا
    chunk |= 0;

   
// تحويل القطعة إلى شيء مختلف
    const data = chunk.toString(16);

   
//ادفع البيانات إلى الطابور القابل للقراءة
    callback(null, '0'.repeat(data.length % 2) + data);
  }
});

myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));

myTransform.write(1);
// يطبع: 01
myTransform.write(10);
// 0a :يطبع
myTransform.write(100);
// يطبع: 64

تطبيق مجرى التحويل (Transform)

مجرى التحويل (Transform) هو مجرى مزدوج (Duplex) حيث أن الخرج محسوب بطريقة ما من الدخل. مجاري الوحدة zlib أو مجاري الوحدة crypto هي مثال على هذا النوع من المجاري والتي تضغط أو تشفّر أو تفك تشفير البيانات.

لايوجد ضرورة أن يكون الخرج بنفس حجم الدخل أو نفس عدد القطع أو أن يصل في نفس الوقت. على سبيل المثال، المجرى Hash سوف يملك دائًما وأبدًا قطعة خرج واحدة والتي تُوفّر عندما يُنهَى الدخل. سوف ينتج المجرى zlib خرجًا إمّا أصغر بكثير أو أكبر بكثير من الدخل.

وُسّع الصنف stream.Transform لينفّذ مجرى التحويل (Transform).

يرث الصنف stream.Transform بشكل نموذجي من stream.Duplex ويطبّق إصداراته الخاصة من التابعين writable._write()‎ و readable._read()‎ .يجب أن تنفّذ  تطبيقات مجاري Transform المخصصة التابع transform._transform()‎ وربما تنفّذ أيضًا التابع transform._flush()‎.

يجب توخي الحذر عند استخدام مجاري Transform، إذ أنّ البيانات المكتوبة إلى المجرى قد تؤدي إلى أن يصبح الطرف Writable للمجرى متوقفًا إذا كان الخرج على الطرف Readable غير مُستهلك.

new stream.Transform([options]‎)‎

const { Transform } = require('stream');

class MyTransform extends Transform {
  constructor(options) {
    super(options);
    // ...
  }
}

أو عند استخدام البواني بنمط سابق للنمط ES6:

const { Transform } = require('stream');
const util = require('util');

function MyTransform(options) {
  if (!(this instanceof MyTransform))
    return new MyTransform(options);
  Transform.call(this, options);
}
util.inherits(MyTransform, Transform);

أو باستخدام نهج الباني المبسّط:

const { Transform } = require('stream');

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  }
});

الحدثان 'finish' و 'end'

الحدثان 'finish' و 'end' هما من الصنفين stream.Writable و stream.Readable على التوالي. يُطلق الحدث 'finish' بعد أن يُستدعى stream.end()‎ وقد عُولجت كل القطع من قبل stream._transform()‎ .يُطلق الحدث 'end' بعد أن أُخرجت كل البيانات والذي يحصل بعدما استُدعي رد النداء في التابع transform._flush()‎.

transform._flush(callback)‎

  • ‎<Function> :callback‎ دالة رد نداء (مع وسيط خطأ وبيانات بشكل اختياري) لتُستدعى عندما تُدفع البيانات المتبقية.

يجب ألّا تُستدعى الدالة من قبل شيفرة التطبيق مباشرةً. يجب أن تُنفّذ من قبل أصناف ابن، وتُستدعى من قبل توابع الصنف Readable الداخلية فقط.

في بعض الحالات، قد تحتاج عملية التحويل إلى إطلاق بت إضافي من البيانات في نهاية المجرى. على سبيل المثال، سوف يخزّن مجرى الضغط  zlib كمية من الحالة الداخلية المستخدمة لضغط الخرج بشكل مثالي. ولكن عندما ينتهي المجرى، تحتاج البيانات الإضافية إلى أن تُدفَع بحيث ستكتمل البيانات المضغوطة.

قد تنفّذ مجاري تحويل (Transform) مخصصة التابع transform._flush()‎. سوف يُستدعى هذا عندما لا يكون هناك المزيد من البيانات المكتوب لتُستهلك (لتُقرأ)، ولكن قبل إطلاق الحدث 'end' مشيرًا إلى نهاية المجرى القابل للقراءة.

خلال تنفيذ transform._flush()‎، قد لا يُستدعى التابع readable.push()‎ مطلقًا أو يستدعى أكثر من مرة حسب الحاجة. يجب أن تُستدعى الدالة callback عند انتهاء عملية دفع البيانات خارج المجرى.

التابع transform._flush()‎ مسبوق بالرمز _ لأنّه داخلي للصنف الذي عرّفه، وينبغي ألّا يُستدعى مباشرةً من قبل برامج المستخدم.

transform._transform(chunk, encoding, callback)‎

  • ‎‎<Buffer> | <string> |‎ <any>‎: chunk قطعة البيانات المراد تحويلها. ستكون دائمًا كائنًا من النوع Buffer إلّا إذا ضُبط الخيار decodeStrings إلى القيمة false أو أن المجرى يعمل في نمط الكائن.
  • <string>encoding إذا كانت القطعة سلسلة نصية، عندها ستكون قيمة encoding هي نوع الترميز. إذا كانت القطعة buffer ومن ثم ستكون قيمة encoding هي قيمة خاصة 'buffer'، تتجاهلها في هذه الحالة.
  • <Function>‎: callback دالة رد نداء (مع وسيط خطأ وبيانات اختيارية) لتُستدعى بعد أن تتم معالجة chunk المزوّدة.

يجب ألّا تُستدعى الدالة من قبل شيفرة التطبيق مباشرةً. يجب أن تُنفّذ من قبل الأصناف الأبناء وتُستدعى من قبل توابع الصنف Readable الداخلية فقط.

كل تطبيقات المجاري Transform يجب أن تقدّم التابع ‎_‎transform()‎ ليقبل دخلًا وينتج خرجًا. تنفيذ transform._transform()‎ يعالج البايتات التي تُكتب، يحسب الخرج، ومن ثم يمرر ذاك الخرج خارجًا إلى الجزء القابل للقراءة باستخدام التابع readable.push()‎.

قد لا يُستدعى التابع transform.push()‎ أبدًا أو يستدعى أكثر من مرة لتوليد خرج من قطعة دخل واحدة، معتمدًا على الكمية المراد إخراجها كنتيجة للقطعة.

من الممكن ألّا يولّد خرجٌ من أي قطعة مكتوبة من بيانات الدخل.

يجب أن تُستدعى الدالة callback فقط عندما تُستهلك القطعة الحالية بالكامل. يجب أن يكون الوسيط الأول المُمرر إلى callback هو كائن من النوع Error إذا حصل خطأ أثناء معالجة الدخل أو null  بصورة أخرى. إذا مُرر وسيط ثاني إلى الدالة callback، فسوف يُحال إلى التابع  readable.push()‎. بعبارة أخرى يعادل التالي:

transform.prototype._transform = function(data, encoding, callback) {
  this.push(data);
  callback();
};

transform.prototype._transform = function(data, encoding, callback) {
  callback(null, data);
};

يُسبق التابع transform._transform()‎ بالرمز _ لأنّه داخلي بالنسبة للصنف الذي عرّفه، وينبغي ألّا يُستدعى مباشرةً من قبل برامج المستخدم.

لايُستدعى التابع transform._transform()‎ على التفرّع مطلقًا؛ تطبّق المجاري آلية الطوابير ويجب أن تستدعى الدالة callback لاستقبال القطعة التالية إمّا بشكل متزامن أو غير متزامن.

الصنف: stream.PassThrough

الصنف stream.PassThrough هو تنفيذ بسيط لمجرى التحويل (Transform) والذي يمرر بايتات الدخل ببساطة عبره إلى الخرج. الغرض منه في المقام الأول هو الأمثلة والفحص، ولكن هناك بعض حالات الاستخدام حيث يكون stream.PassThrough مفيدًا ككتلة بناء لأنواع غير مألوفة من المجاري.

ملاحظات إضافية

التوافق مع إصدارات Node.js الأقدم

قبل الإصدار Node.js 0.10، كانت واجهة المجرى Readable أبسط، ولكن أيضًا أقل قوةً وأقل فائدة.

  • بدل الإنتظار حتى استدعاء التابع stream.read()‎، سوف يبدأ إطلاق الأحداث 'data' فورًا. التطبيقات التي ستحتاج إلى إنجاز بعض كمية العمل لتقرر كيفية معالجة البيانات تكون مطالبةً بتخزين البيانات المقروءة إلى ذاكرة تخزين مؤقتة داخلية لكي لا تفقد البيانات.
  • كان التابع stream.pause()‎‎ توجيهيًا، بدلًا من أن يكون مضمونًا. عنى هذا أنّه بقي من الضروري أن يكون مستعدًّا لاستقبال الأحداث 'data' حتى عندما يكون المجرى في حالة التوقف المؤقت.

في الإصدار Node.js 0.10، أُضيف الصنف Readable. من أجل التوافقية مع برامج Node.js أقدم، تنقلب المجاري Readable إلى "نمط التدفق" عندما يُضاف معالج الحدث 'data' أو عندما يُستدعى التابع  stream.resume()‎. التأثير هو أنّه حتى عند عدم استخدام التابع stream.read()‎ و الحدث 'readable'، لم يعد من الضروري القلق حول فقد قطع 'data'.

بينما سوف تستمر معظم التطبيقات بالعمل بشكل اعتيادي إلا أن ذلك قد خلق حالةً حديةً في الحالات التالية:

  • لن يُضاف مستمع للحدث 'data'.
  • لن يُستدعى التابع أبدًا ‎stream.resume()‎.
  • لن توصل المجاري مع أي وجهة قابلة للكتابة.

على سبيل المثال، ألق نظرة فاحصة على الشيفرة التالية:

// !تحذير! معطل


net.createServer((socket) => {



 // ولكن لن نستهلك البيانات أبدًا'end' سوف نضيف مُنصتًا لحدث 

  socket.on('end', () => {


// لن تصل أبدًا إلى هنا
    socket.end('The message was received but was not processed.\n');
  });

}).listen(1337);

قبل الإصدار Node.js 0.10، ستُحذف رسالة البيانات القادمة ببساطة. ولكن، في الإصدار Node.js 0.10 وما بعده، يبقى المقبس متوقفًا إلى الأبد. الحل البديل لهذه المشكلة هو استدعاء التابع stream.resume()‎ لبدء تدفق البيانات:

// الحل البديل
net.createServer((socket) => {
  socket.on('end', () => {
    socket.end('The message was received but was not processed.\n');
  });



//  ابدأ تدفق البيانات، ارميها
  socket.resume();
}).listen(1337);

بالإضافة إلى قلب المجاري Readable الجديدة إلى نمط التدفق، يمكن تغليف المجاري ذات النمط السابق ل 0.10 في الصنف Readable باستخدام التابع readable.wrap()‎ .

readable.read(0)‎

يوجد بعض الحالات حيث من الضروري إطلاق التحديث لآليات المجاري الأساسية القابلة للقراءة، دون استهلاك فعلي لأية بيانات. في هذه الحالات، من الممكن استدعاء readable.read(0)‎،والتي ستعيد null دائمًا.

إذا كانت ذاكرة التخزين المؤقتة الداخلية  أقل من الحد highWaterMark، ولا يقرأ المجرى الآن، ومن ثمّ سيثير استدعاء التابع stream.read(0)‎  استدعاء تابع stream._read()‎ منخفض المستوى.

لما كانت معظم التطبيقات على الغالب ليست بحاجة لفعل ذلك أبدًا، يوجد حالات داخلNode.js حيث يُفعل ذلك، خصوصًا داخل مجاري الصنف Readable.

readable.push('')‎

لا يُنصح باستخدام readable.push('')‎.

دفع سلسلة نصية أو كائن من النوع Buffer أو Uint8Array بصفر من البايتات إلى مجرى لا يعمل في نمط الكائن له تأثير جانبي مثير للانتباه. ذلك لأنّ استدعاء التابع readable.push()‎، سوف ينهي الاستدعاء عملية القراءة. على أي حال، لأنّ الوسيط هو سلسلة فارغة، لن تضاف أي بيانات إلى الذاكرة المؤقتة القابلة للقراءة وبذلك لا يوجد شيء ليستهلكه المستخدم.

اختلاف highWaterMark بعد استدعاء readable.setEncoding()‎

سوف يغير استخدام التابع readable.setEncoding()‎‎‎ السلوك الذي تسلكه القيمة highWaterMark مع مجرى يعمل بدون تفعيل نمط الكائن.

عادةً، يقاس حجم الذاكرة المؤقتة الحالية اعتمادًا على القيمة highWaterMark بالبايت. ولكن بعد أن يُستدعى ‎setEncoding()‎‎، ستبدأ دالة المقارنة بقياس حجم الذاكرة المؤقتة بالمحارف.

هذه ليست اشكالية في الحالات الشائعة مع الترميز latin1 أو ascii. ولكن يُنصح بأن تكون مدركًا لهذا السلوك عند العمل مع سلاسل نصية يمكن أن تحتوي محارف متعددة البايتات.

مصادر

صفحة Stream في توثيق Node.js الرسمي.