Node.js/stream

من موسوعة حسوب
< Node.js
مراجعة 11:55، 20 نوفمبر 2018 بواسطة رهف-النجار (نقاش | مساهمات) (القسم الثاني من الصفحة)
اذهب إلى التنقل اذهب إلى البحث

الاستقرار: 2-مستقر

المجرى هو واجهة مجرّدة للعمل مع البيانات المتدفقة في Node.js. توفّر الوحدة stream واجهة برمجية (API) أساسية تجعل من السهل بناء كائنات تتعامل مع واجهة المجرى.

يوجد العديد من كائنات المجرى التي توفرها Node.js. على سبيل المثال، http.IncomingMessage (طلبيات الخادم HTTP) و process.stdout هما نسخ من الصنف stream.

يمكن أن تكون المجاري قابلة للقراءة (readable)، أو قابلة للكتابة (writable)، أو كليهما. كل المجاري هي نسخ من الصنف EventEmitter.

يمكن الوصول إلى الوحدة stream باستخدام:

const stream = require('stream');

لمَّا كان من الضروري فهم كيفية عمل المجاري، فإنَّ الوحدة stream بحد ذاتها هي أكثر فائدةً للمطورين الذين ينشئون أنواعًا جديدةً من نسخ المجاري. المطورون الذين يستهلكون كائنات المجاري بالمقام الأول نادرًا ما يحتاجون لاستخدام الوحدة stream بشكل مباشر.

تنظيم هذا المستند

يُقسّم هذا التوثيق إلى قسمين رئيسيين مع قسم ثالث للملاحظات الإضافية. يشرح القسم الأول عناصر الواجهة البرمجية للمجرى (stream API) التي تكون مطلوبة لاستخدام المجاري ضمن أي تطبيق. يشرح القسم الثاني عناصر الواجهة البرمجية (API) التي تُكون مطلوبةً لتطبيق أنواع جديدة من المجاري.

أنواع المجاري

يوجد أربعة أنواع رئيسية للمجاري ضمن Node.js هي:

  • Writable: المجاري التي يمكن أن تُكتب عليها البيانات (مثل fs.createWriteStream()‎)، و
  • Readable: المجاري التي يمكن أن تُقرأ منها البيانات. (مثل fs.createReadStream()‎‎)، و
  • Duplex: المجاري التي تجمع بين النوع Readable والنوع Writable (مثل net.Socket)
  • Transform: المجاري Duplex التي يمكن أن تعدّل أو تحوّل البيانات حسبما تُكتب أو تُقرأ (مثل zlib.createDeflate()‎).

بالإضافة إلى ذلك، تحوي هذه الوحدة الدوال الخدمية pipeline و finished.

نمط الكائن

كل المجاري المُنشأة من قبل واجهات Node.js تعمل حصريًا على السلاسل النصية والكائنات Buffer (أو Uint8Array). مع ذلك، فمن الممكن لتطبيقات المجاري أن تعمل مع أنواع بيانات أخرى في JavaScript (باستثناء النوع null، الذي يخدم غرضًا خاصًا ضمن المجاري). مثل هذه المجاري يؤخذ بالحسبان تشغيلها في "نمط الكائن" (object mode).

تُبدَّل نُسَخ المجاري إلى نمط الكائن باستخدام الخيار objectMode عند إنشاء المجرى. محاولة قلب مجرى موجود إلى نمط الكائن ليست آمنةً.

Buffering

سوف يخزن كلا المجريين Writable و Readable البيانات في مخزن مؤقت داخلي لكي يمكن استعادتها باستخدام التابع writable.writableBuffer أو readable.readableBuffer على التوالي.

كمية البيانات القابلة للتخزين تعتمد على الخيار highWaterMark المُمرر إلى باني المجرى. من أجل المجاري القياسية، يحدد الخيار highWaterMark العدد الكلي من البايتات. أمَّا من أجل المجاري المُشغلة في نمط الكائن، فيحدد الخيار highWaterMark العدد الكلي من الكائنات.

تُخزّن البيانات مؤقتًا في المجاري التي من النوع Readable عندما يستدعي التابع stream.push(chunk)‎. إذا لم يستدعي من يستخدم المجرى التابع cstream.read()‎‎، فستتوضع البيانات ضمن طابور داخلي إلى أن تُستخدَم.

حالما يصل الحجم الكلي لذاكرة القراءة الداخلية المؤقتة عتبةَ محددة من قبل highWaterMark، سيوقفُ المجرى مؤقتًا قراءة البيانات من المصادر الأساسية حتى تُستهلك البيانات الحالية المخزنة مؤقتًا (ذلك أنّ المجرى سيوقف استدعاء التابع readable._read()‎‎ المحلي الذي يُستخدم لوضع البيانات في ذاكرة القراءة المؤقتة).

تُخزّن البيانات مؤقتًا في المجاري التي من النوع Writable عندما يُستدعى التابع writable.write(chunk)‎ مرارًا وتكرارًا. طالما أنّ الحجم الكلي لذاكرة الكتابة المؤقتة الداخلية هو أدنى من العتبة المضبوطة بمقدار highWaterMark، سوف يعيد استدعاء writable.write()‎ القيمة true. حالما يصل أو يتجاوز حجم ذاكرة التخزين المؤقت الداخلية القيمة highWaterMark، سوف تُعاد القيمة false.

الهدف الأساسي من واجهات الوحدة stream البرمجية - على وجه الخصوص التابع stream.pipe()‎ - هو تقييد حجم التخزين المؤقت للبيانات إلى مستويات مقبولة من أجل التوافق بين المصادر (source) والوجهات (destination) ذات السرعات المختلفة لكي لا تمتلئ الذاكرة المتوافرة.

بما أن كلا المجريين Duplex و Transform يجمعان بين النوعين Readable و Writable، يمتلك كل واحد منها ذاكرتي تخزين مؤقتة. هاتان الذاكراتان داخليتين ومنفصلتين عن بعضهما وتستخدمان للقراءة والكتابة يف آن واحد، مما يسمح لكل طرف من المجرى بالتشغيل بشكل مستقل عن الآخر بينما يضمن تدفق بيانات مناسب وفعّال. على سبيل المثال، النُسخ net.Socket هي مجاري من النوع Duplex طرفها القابل للقراءة (Readable) يسمح باستهلاك البيانات المُستقبَلة من المقبس وطرفها القابل للكتابة (Writable) يسمح بكتابة البيانات على المقبس. بسبب أن البيانات قد تُكتب إلى المقبس بمعدل أسرع أو أبطأ من البيانات التي تُستقبل وتكتب على المجرى، فمن الضروري لكل طرف أن يعمل (ويخزِّن البيانات) بشكل مستقل عن الآخر.

الواجهات البرمجية لمستخدمي المجرى

كل تطبيقات Node.js تقريبًا، مهما كانت بسيطة، تستخدم المجاري بطريقةٍ ما. الآتي هو مثال لاستخدام المجاري في تطبيق Node.js والذي ينفّذ مُخدّم HTTP:

const http = require('http');

const server = http.createServer((req, res) => {
  


Req هو http.IncomingMessage، والذي هو مجرى قابل للقراءة //
Req هو http.ServerResponse، والذي هو مجرى قابل للكتابة //


  let body = '';


احصل على البيانات كسلاسل utf8 //
إذا لم يُجهّز الترميز ستُستقبل كائنات ذاكرة مؤقتة. //

  req.setEncoding('utf8');



تطلق المجاري القابلة للقراءة أحداث 'data' حالما يُضاف مستمع. //

  req.on('data', (chunk) => {
    body += chunk;
  });

يشير الحدث 'end' أن الجسم الكامل قد استُقبل.

  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // write back something interesting to the user:
      res.write(typeof data);
      res.end();
    } catch (er) {



أوه، json سيئة! //
      res.statusCode = 400;
      return res.end(`error: ${er.message}`);
    }
  });
});

server.listen(1337);

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token o in JSON at position 1

المجاري ذات النوع Writable (مثل res في المثال) توفر توابع مثل write()‎ و end()‎ والتي تُستخدم لكتابة البيانات على المجرى.

المجاري ذات النوع Readable تستخدم واجهات الصنف EventEmitter البرمجية من أجل اشعار شيفرة التطبيق عندما تكون البيانات متوفرة للقراءة من المجرى. يمكن قراءة هذه البيانات المتوفرة من المجرى بعدة طرق.

تستخدم المجاري التي من النوع Writable و Readable واجهات الصنف EventEmitter البرمجية بطرق متنوعة للبقاء على اتصال دائم بالحالة الحالية للمجرى.

تذكر أن المجاري التي من النوع Duplex و Transform هي مجاري قابلة للكتاية (Writable) والقراءة (Readable).

لا يُطلب من التطبيقات التي إمّا تكتب البيانات أو تقرؤها من المجرى تنفيذ واجهات الوحدة stream بشكل مباشر، وبذلك لا يوجد عمومًا سبب لاستدعاء الوحدة stream عبر require('stream')‎‎.

ينبغي على المطورين الراغبين بإنشاء أنواع جديدة من المجاري الرجوع إلى القسم الواجهات البرمجية لمنفذي المجاري.

المجاري القابلة للكتابة

المجاري القابلة للكتابة إجمالًا تمثِّل مكانًا قابلًا لكتابة البيانات فيه.

تتضمن الأمثلة التالية مجارٍ قابلة للكتابة:

  • طلب HTTP، من طرف العميل.
  • استجابة HTTP، من طرف الخادم.
  • مجاري الوحدة fs القابلة للكتابة.
  • مجاري الوحدة zlib.
  • مجاري الوحدة crypto.
  • المقابس TCP.
  • العملية الابن للمجرى stdin (هي subprocess.stdin).
  • المجرى process.stdout و process.stderr.

بعض هذه الأمثلة هي فعليًا مجاري من النوع Duplex والتي تنفّذ الواجهة Writable.

كل المجاري التي من النوع Writable تنفّذ الواجهة المعرّفة بالصنف stream.Writable.

بينما قد تختلف نسخ المجاري التي من النوع Writable بطرق متنوعة، كل المجاري Writable تتبع نمط الاستخدام الأساسي كما هو موضّح في المثال أدناه:

const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');

الصنف stream.Writable

أضيف في الإصدار: 0.9.4.

الحدث 'close'

يُطلَق الحدث 'close' عندما يكون المجرى أو أحد موارده الأساسية (واصف الملفات مثلًا) قد أُغلق. يشير هذا الحدث أنّه لن تُطلق المزيد من الأحداث، ولن تحصل المزيد من العمليات المتعلقة بالمجرى.

لا تُطلق كل المجاري Writable الحدث 'close'.

الحدث 'drain'

أُضيف في الإصدار:0.9.4.

إذا أعاد التابع stream.write(chunk)‎ القيمة false، فسوف يُطلَق الحدث 'drain' عندما يكون من المناسب استئناف كتابة البيانات على المجرى.

كتابة البيانات إلى المجرى القابل للكتابة المزوّد لمليون مرّة.//
كن منتبهًا للضغط العائد//


function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {

آخر مرّة//

        writer.write(data, encoding, callback);
      } else {



انظر إذا كان ينبغي لنا الإستمرار أو الإنتظار//
لا تمرر دالة رد النداء لأنّنا لم ننته بعد//

        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {

وجب التوقف باكرًا//
اكتب بعض المزيد حالما تنضب//

      writer.once('drain', write);
    }
  }
}
الحدث 'error'

أضيف في الإصدار: 0.9.4.

  • <Error>

يُطلق الحدث 'error' إذا حصل خطأ أثناء الكتابة على أو إرسال البيانات إلى المجرى. سوف يُمرَّر إلى دالة رد النداء المنصتة الوسيط Error فقط عند استدعائها.

لن يُغلق المجرى عندما يُطلق الحدث 'error'.

الحدث 'finish'

أضيف في الإصدار: 0.9.4.

سوف يُطلق الحدث 'finish' بعد استدعاء التابع  stream.end()‎‎، وبعد أن دُفعت كل البيانات للنظام الأساسي.

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
  console.error('All writes are now complete.');
});
الحدث  'pipe'

أُضيف في الإصدار: 0.9.4.

  • Src:‏ <stream.Readable>مجرى القراءة والذي يُوصل إلى هذا المجرى القابل للكتابة.

يٌطلَق الحدث 'pipe' عندما  يُستدعى التابع stream.pipe()‎ على مجرًى قابلٍ للقراءة مُضيفًا المجرى القابل للكتابة إلى مجموعة وجهاته.

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
  console.error('something is piping into the writer');
  assert.equal(src, reader);
});
reader.pipe(writer);
الحدث 'unpipe'

أضيف في الإصدار: 0.9.4.

  • Src:‏ <stream.Readable> مجرى المصدر الذي لغى الااتصال بهذا المجرى القابل للكاتبة.

يُطلَق الحدث 'unpipe' عندما يُستدعى التابع stream.unpipe()‎‎ على مجرًى قابلٍ للقراءة، مزيلًا المجرى القابل لكتابة من مجموعة وجهاته.

يُطلق هذا الحدث أيضًا في الحالة التي يطلق فيها المجرى القابل للكاتبة خطأً عندما يُوصل به مجرى قابل للقراءة.

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
  console.error('Something has stopped piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
writable.cork()‎

أضيف في الإصدار: 0.11.2.

يجبر التابع writable.cork()‎ كل البيانات المكتوبة على أن تُخزّن مؤقتًا في الذاكرة. ستُدفع البيانات المخزّنة عندما يُستدعى أحد التابعين stream.uncork()‎‎‎‎ أو stream.end()‎.

المقصد الأساسي من التابع writable.cork()‎ هو تجنب حالة يحدث فيها كتابة العديد من القطع الصغيرة من البيانات إلى المجرى دون جلب نسخةٍ منها إلى المخزن المؤقت الداخلي، إذ سيكون لذلك تأثير عكسي على الأداء. في مثل هذه الحالات، يمكن أن تنجز التطبيقات التي تنّفذ التابع writable._writev()‎ عملية الكتابة المخزّنة مؤقتًا بأسلوب أكثر مثاليةً.

انظر أيضًا: التابع writable.uncork()‎.

writable.destroy([error]‎)‎

أُضيف في الإصدار: 8.0.0

  • Error:‏ <Error>
  • القيمة المُعادة: ‎<this>‎‎

يهدم التابع المجرى، ويطلق الأحداث 'error' و 'close' المُمررة. بعد هذا الاستدعاء، تكون المجاري القابلة للكتابة قد انتهت والاستدعاءات اللاحقة للتابع write()‎ والتابع end()‎ سوف تُفضي إلى خطأ ERR_STREAM_DESTROYED. لا ينبغي أن يعيد المنفّذون تعريف هذا التابع، ولكن يمكنهم أن يستعملوا التابع writable._destroy()‎ بدلًا عن ذلك.

writable.end([chunk][, encoding][, callback]‎)‎

سجل التغييرات

الإصدار التغييرات
10.0.0 يعيد هذا التابع الآن مرجعًا إلى مجرًى من النوع writable.
8.0.0 يمكن الآن أن يكون الوسيط chunk نسخة من النوع Uint8Array.
0.9.4 أضيف هذا التابع في الإصدار 0.9.4
  • chunk:‏ ‎<string>‎ | <Buffer>‎ | <Uint8Array>‎ | <any>‎ بيانات اختيارية يراد كتابتها. من أجل المجاري التي لا تعمل في وضع الكائن، يجب أن يكون الوسيط chunk سلسلةً نصيةً (string) أو كائنًا من النوع Buffer أو النوع Uint8Array. من أجل المجاري في نمط الكائن، يمكن أن تكون chunk أي قيمة غير null.
  • encoding:‏ <string> التشفير، إذا كانت  chunk سلسلة نصية.
  • ‎<Function> :‎Callback‎ دالة رد نداء اختيارية عندما يُنهى المجرى.
  • القيمة المعادة: ‎<this>‎‎

يشير استدعاء التابع writable.end()‎ أنّه لا مزيد من البيانات يراد كتابتها على المجرى القابل للكتابة (Writable). سيسمح الوسيطان الاختياريان chunk و encoding أن تُكتب قطعة إضافية واحدة أخيرة من البيانات مباشرةً قبل إغلاق المجرى. إذا زُودت، سترفق دالة رد نداء اختيارية كمنصت للحدث 'finish'.

سيثير استدعاء التابع stream.write()‎ بعد استدعاء stream.end()‎ خطأً.

اكتب ' hello,‎'  ومن ثم أنهِ ب 'world!‎'//

const fs = require('fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');

كتابة المزيد ليست مسموحة الآن//
writable.setDefaultEncoding(encoding)‎

سجل التغييرات

الإصدار التغييرات
6.1.0 يعيد هذا التابع الآن مرجعًا إلى مجرًى من النوع writable.
0.11.15 أضيف في 0.11.15
  • string>‎ :encoding>
  • القيمة المعادة: <this>

يضبط التابع writable.setDefaultEncoding()‎ الترميزَ الافتراضي (encoding) للمجرى القابل للكتابة (Writable).

writable.uncork()‎

أُضيف في الإصدار: 0.11.2.

يفرِّغ التابع writable.uncork()‎ كل البيانات التي خُزِّنت منذ أن استُدعي stream.cork()‎.

عند استخدام التابع writable.cork()‎ والتابع writable.cork()‎‎ لإدارة التخزين المؤقت أثناء الكتابة على مجرًى، يُنصح بأن يؤخر استدعاء writable.uncork()‎ باستخدام process.nextTick()‎. يسمح القيام بذلك بتجميع كل استدعاءات التابع writable.write()‎ التي تحصل خلال طورٍ واحدٍ لحلقة أحداث Node.js مُعطاة.

stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());

إذا استُدعي التابع writable.cork()‎ عدة مرات لمجرىً ما، فيجب أن يُستدعى التابع writable.uncork()‎ نفس عدد الاستدعاءات لتفريغ البيانات المخزنة.

stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
  stream.uncork();


لن تُفرّغ البيانات حتى يُستدعى التابع uncork() مرّة ثانية.//

  stream.uncork();
});

انظر أيضًا: التابع writable.cork()‎.

writable.writableHighWaterMark

أُضيفت في الإصدار: 9.3.0.

  • ‎<number>‎‎‎

يعيد القيمة highWaterMark التي مُرِّرت عند إنشاء المجرى القابل للكتابة.

writable.writableLength

أُضيفت في الإصدار: 9.4.0.

تحوي هذه الخاصية عدد البايتات (أو الكائنات) الجاهزة للكتابة والموجودة في الطابور. توفر هذه القيمة بيانات داخلية متعلقة بحالة القيمة highWaterMark.

writable.write(chunk[, encoding][, callback]‎)‎

سجل التغييرات

الإصدار التغييرات
8.0.0 يمكن الآن أن يكون الوسيط نسخة من النوع Uint8Array.
6.0.0 سوف يعتبر دائمًا تمريرالقيمة null كمعامل chunk غير صالح الآن، حتى في نمط الكائن.
0.9.4 أُضيف هذا التابع.
  • ‏chunk‏: <string> |‎ <Buffer>‎ |‎ <Uint8Array>‎ |‎ <any>‏‏‎   بيانات اختيارية للكتابة. من أجل المجاري التي لا تعمل في نمط الكائن، يجب أن تكون chunk سلسلة نصية أو Buffer أو Uint8Array. من أجل مجاري نمط الكائن، يمكن أن تكون chunk أي نوع من أنواع بيانات JavaScript غير null.
  • string>‎ :encoding> الترميز، إذا كان الوسيط chunk سلسلةً نصيةً.
  • callback:‏ <Function‎> دالة رد النداء المراد استدعاؤها عندما تُدفع قطعة البيانات.
  • القيمة المُعادة: ‎<boolean>‎‎ قيمة منطقية تكون false إذا كان المجرى يريد للشيفرة المستدعية أن تنتظر انطلاق الحدث 'drain' قبل الاستمرار بكتابة المزيد من البيانات؛ وإلّا تكون true.

يكتب التابع writable.write()‎ بعض البيانات إلى المجرى. ويستدعي رد النداء (callback) المُزوّد حالما تكون البيانات المكتوبة قد عولجت بشكل كامل. إذا حصل خطأ، ربما يُستدعى callback أو لا مع تمرير الخطأ كوسيطه الأول. للكشف عن أخطاء الكتابة بشكل موثوق، أضف مُنصتًا إلى الحدث 'error'.

القيمة المعادة هي true إذا كانت ذاكرة التخزين الداخلية أقل من القيمة highWaterMark المُهيئة عندما أُنشئ المجرى بعد إقرار chunk. إذا أُعيد false، ينبغي أن تتوقف المحاولات الإضافية لكتابة البيانات على المجرى حتى يُطلق الحدث 'drain'.

طالما أن المجرى لم يفرغ، فإن استدعاء write()‎ سوف يخزّن chunk مؤقتًا، ويعيد خطأ. حالما تُستهلك كل القطع الحالية المخزنة مؤقتًا (قُبلَت للتسليم من قبل نظام التشغيل)، سوف يُطلق الحدث 'drain'. حين يعيد write()‎ القيمة false، فمن المستحسن ألّا تكتب المزيد من القطع حتى يُطلق الحدث 'drain'. بينما يُسمح باستدعاء write()‎ على مجرًى لم يفرغ بعد، فسوف تخزن Node.js مؤقتًا كل القطع المكتوبة حتى يحصل استخدام ذاكرة أعظمي،عند هذه النقطة ستتوقف دون قيود. حتى قبل حصول التوقف، سيسبب الاستخدام المرتفع للذاكرة أداءًا ضعيفًا لجامع القمامة (منظّف الذاكرة) وارتفاع مجموعة الذاكرة المُقِيمة (RSS) [والذي لن يعاد استرجاعه للنظام بشكل طبيعي، حتى بعد عدم الحاجة للذاكرة]. بما أن المقابس TCP قد لا تنضب أبدًا إذا لم يقرأ الند (peer) البعيد البيانات، ربما تقود الكتابة على مقبس لا يُصرّف لحصول ثغرة قابلة للاستغلال (exploitable vulnerability).

كتابة البيانات على مجرى لا ينفد هو معضلة خصوصًا مع مجاري التحويل ( Transform)، لأنّ مجاري Transform  تتوقف افتراضيًا حتى تنقل تُتصل بمجرى آخر أو يضاف معالج للحدث 'data' أو الحدث 'readable'.

اذا أمكن توليد البيانات التي ستُكتب أو جلبها عند الطلب، فمن المستحسن تغليفها في نفس المجرى ثم تحويلها إلى المجرى القابل للقراءة عبر استخدام التابع stream.pipe()‎.

ولكن إذا كان رُجّج استدعاء write()‎، فمن الممكن مراعاة الضغط العائد وتجنب مشاكل الذاكرة باستخدام الحدث 'drain':

function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb);
  } else {
    process.nextTick(cb);
  }
}


الانتظار حتى يُستدعى cb قبل القيام بأي كتابة أخرى. // 

write('hello', () => {
  console.log('write completed, do more writes now');
});

سوف يهمل المجرى Writable في نمط الكائن الوسيط encoding دائمًا.

المجاري القابلة للقراءة

المجاري القابلة للقراءة هي فكرة مجرّدة عن مصدرٍ تُستهلك منه البيانات.

أمثلة عن المجاري المقروءة هي:

  • استجابة لطلبيات HTTP من طرف العميل.
  • طلبيات HTTP من طرف الخادم.
  • مجاري الوحدة fs القابلة للقراءة.
  • مجاري الوحدة zlib.
  • مجاري الوحدة crypto.
  • مقابس TCP.
  • المجرى stdout والمجرى stderr للعملية الابن.
  • المجرى Process.stdin للعملية الحالية.

كل المجاري  القابلة للقراءة (Readable) تطبّق الواجهة المعرّفة من قبل الصنف stream.Readable.

نمطا القراءة

تعمل المجاري القابلة للقراءة بفاعلية في أحد النمطين التاليين: نمط التدفق (flowing mode) ونمط التوقف المؤقتً (paused mode). هذه الأنماط منفصلة عن نمط الكائن. يمكن أن يكون المجرى المقروء في نمط الكائن أو لا، بغض النظر عن إذا كان في نمط التدفق أو نمط التوقف المؤقت.

  • في نمط التدفق، تُقرأ البيانات من النظام الرئيسي بشكل تلقائي وتُقدّم للتطبيق بأسرع ما يمكن باستخدام الأحداث عبر الواجهة EventEmitter.
  • في نمط التوقف المؤقت، يجب أن يُستدعى stream.read()‎ بشكل صريح لقراءة قطع البيانات من المجرى.

تبدأ كل مجاري في نمط التوقف المؤقت ولكن يمكن تبديلها إلى نمط التدفق بإحدى الطرق التالية:

  • إضافة معالج للحدث 'data'.
  • استدعاء التابع stream.resume()‎.
  • استدعاء التابع stream.pipe()‎ لإرسال البيانات إلى  المجرى القابل للكتابة (Writable).

يمكن الانتقال بالعكس إلى نمط التوقف باستخدام أحد التالي:

  • إذا لم يكن هناك أنبوب متصل بالوجهات، فيمكن ذلك عبر استدعاء التابع stream.pause()‎.
  • إذا كان هناك أنبوب متصل بالوجهات، فيمكن ذلك عبر إزالة كل أنابيب الوجهات. قد تُزال العديد من أنابيب الوجهات باستدعاء التابع stream.unpipe()‎.

المبدأ الأساسي الذي يجب تذكره دومًا هو أنّ المجرى القابل للقراءة (Readable) لن يولّد بيانات حتى تتوافر آلية إمّا لاستهلاك تلك البيانات أو تجاهلها. إذا كانت آلية الاستهلاك معطلة أو مُستبعدة، سوف يحاول Readable إيقاف توليد البيانات.

من أجل التوافقية مع الإصدارات السابقة، لن توقف ازالة معالج حدث 'data' المجرى تلقائيًا. أيضًا، إذا كان هنالك ممرات للوجهات، فحين ذلك  لن يضمَن استدعاء stream.pause()‎ أن يبقى المجرى متوقفًا حالما تفرغ هذه الوجهات وتسأل عن المزيد من البيانات.

إذا قُلب Readable إلى نمط التدفق ولم يتوفر مستهلكون لمعالجة البيانات، ستضيع تلك البيانات. يمكن أن يحصل هذا على سبيل المثال، عندما يُستدعى تابع readable.resume()‎ دون منصت مرفق بحدث 'data'، أو عندما يُزال معالج حدث 'data' من المجرى.

إضافة معالج حدث 'readable' يجعل توقف تدفق المجرى تلقائيًا، وتُتستهلك البيانات بواسطة readable.read()‎. إذا أُزيل معالج حدث 'readable'، حينذاك سيبدأ المجرى بالتدفق مُجدّدًا إذا كان هناك معالج حدث 'data' موجود.

الحالات الثلاث

هناك "وضعا" تشغيل للمجرى القابل للقراءة (Readable)، هما تلخيص مبسّط لإدارة حالات داخلية كثر تعقيدًا والتي تحصل خلال تطبيق مجرى قابل للقراءة (Readable).

على وجه التحديد وفي أي نقطة معطاة من الزمن يأخذ كل مجرى القابل للقراءة (Readable) إحدى الحالات الثلاث التالية:

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

عندما تكون readable.readableFlowing هي null ، لن تكون هناك آلية مُقدمة لاستهلاك بيانات المجرى. لذلك، لن يولد المجرى بيانات. بينما في هذه الحالة،  سيبدل إرفاق مُنصت إلى الحدث 'data' أو استدعاء التابع readable.pipe()‎ أو استدعاء التابع readable.resume()‎ قيمة الخاصية readable.readableFlowing

إلى القيمة true، متسببًا ببدء  Readable بإطلاق الأحداث بفاعلية حين توّلد البيانات.

سوف يتسبب استدعاء readable.pause()‎‎ أو readable.unpipe()‎ أو استقبال الضغط العائد بضبط قيمة الخاصية readable.readableFlowing إلى false، قاطعًا تدفق الأحداث مؤقتًا ولكن غير قاطعٍ لتدفق البيانات. بينما لن يقلب إرفاق منصت بالحدث 'data' قيمة الخاصية  readable.readableFlowing إلى true في هذه الحالة.

const { PassThrough, Writable } = require('stream');
const pass = new PassThrough();
const writable = new Writable();

pass.pipe(writable);
pass.unpipe(writable);


//التدفق القابل للقراءة متوقف الآن

pass.on('data', (chunk) => { console.log(chunk.toString()); });
pass.write('ok');  

لن تطلق الحدث 'data'//


pass.resume();    

يجب أن تُستدعى لجعل المجرى يطلق الحدث 'data'//

بينما تكون قيمة readable.readableFlowing هي false، قد تتراكم البيانات داخل الذاكرة المؤقتة الداخلية للمجرى.

اختر أسلوب واجهة برمجية وحيد

تطورت الواجهات البرمجية (API) للمجرى القابل للقراءة (Readable) عبر عدة إصدارات Node.js وقدّمت عدّة توابع لاستهلاك بيانات المجرى. بشكل عام، ينبغي أن يختار المطورون واحدة من توابع استهلاك البيانات ولا ينبغي استخدام عدة توابع لاستهلاك البيانات من مجرى واحد. بشكل خاص، قد يقود استخدام مزيج من on('data')‎ أو on('readable')‎‎ أو pipe()‎ أو توابع تكرارية غير متزامنة إلى سلوك غير متوقع.

ينصح باستخدام التابع readable.pipe()‎ لأغلب المستخدمين بما أنه نُفّذ لتقديم أسهل طريقة لاستهلاك بيانات المجرى.

يمكن أن يستخدم المطورون الذين يطلبون المزيد من التحكم الدقيق على نقل وتوليد البيانات الصنف EventEmitter  و readable.on('readable')/readable.read()‎ أو الواجهات readable.pause()/readable.resume()‎.

الصنف stream.Readable

أُضيف في الإصدار: 0.9.4.

الحدث:'close'

أُضيف في الإصدار: 0.9.4.

يُطلق الحدث'close' عندما يكون المجرى أو أحد موارده الأساسية (واصف الملف مثلًا) قد أُغلق. يشير الحدث أنّه لن تُطلق مزيد من الأحداث ولن يحصل المزيد من العمليات المتعلقة بالمجرى.

لا تُطلق كل المجاري القابلة للقراءة الحدث 'close'.

الحدث:'data'

أُضيف في الإصدار: 0.9.4.

  • ‎<Buffer> | <string> | <any> ‎:‎chunk قطع البيانات. من أجل المجاري التي لا تعمل في نمط الكائن، ستكون القطعة إمّا سلسلة نصية أو كائنًا من النوع Buffer. من أجل المجاري في نمط الكائن، يمكن أن تكون القطعة أي نوع من أنواع البيانات باستثناء null.

سيُطلق الحدث 'data' كلما تخلى المجرى عن حيازة قطعة من البيانات إلى المستهلك. قد يحصل هذا كلّما بُدِّل المجرى في نمط الكائن باستدعاء readable.pipe()‎ أو readable.resume()‎ أو بإرفاق رد نداء منصت للحدث 'data'. سوف يُطلق الحدث 'data' أيضًا كلّما استُدعي التابع readable.read()‎ وكانت قطعة من البيانات متوفرة لتُعاد.

إرفاق مُنصت بالحدث 'data'  لمجرىً لم يُوقَف بشكل صريح سوف يقلب المجرى إلى نمط التدفق. ستُمرر البيانات بعدئذٍ حالما تكون متوفرة.

سوف يمرر ردُ نداء المُنصت قطعةَ البيانات كسلسلة نصية إذا كان التشفير الافتراضي مُحدّدًا باستخدام التابع readable.setEncoding()‎‎، وإلّا ستمرر البيانات ضمن كائن من النوع Buffer.

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});

الحدث: 'end'

أُضيف في الإصدار: 0.9.4.

يُطلق الحدث 'end' عندما لا يكون هناك المزيد من البيانات لتُقرَأ من المجرى.

لن يُطلق الحدث 'end' مالم تُستهلك البيانات بالكامل. يمكن أن يُحقَّق ذلك بقلب المجرى إلى نمط التدفق، أو باستدعاء stream.read()‎ مرارًا حتى يتم استهلاك كل البيانات.

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
  console.log('There will be no more data.');
});

الحدث: 'error'

أُضيف في الإصدار: 0.9.4.

  • ‎<Error>‎

ربما يُطلق الحدث 'error' من قبل إجراء مجرى قابل للقراءة (Readable) في أي وقت. عادةً، قد يحصل ذلك إذا كان المجرى الأساسي غير قادر على توليد بيانات بسبب فشل داخلي أساسي، أو عندما يحاول إجراء المجرى دفع قطعة بيانات غير صالحة.

سيُمرَّر إلى دالة رد نداء المُنصت كائنٌ من النوع Error فقط.

الحدث: 'readable'

سجل التغييرات

الإصدار التغييرات
10.0.0 يُطلق الحدث 'readable' دائمًا في النبضة التالية بعد أن يُستدعى ‎.push()‎
10.0.0 يتطلب استخدام 'readable' استدعاءَ ‎.read()‎
0.9.4 أُضيف في الإصدار: 0.9.4.

يُطلق التابع 'readable' عندما يكون هناك بيانات متوفرة للقراءة من المجرى. في بعض الحالات، سوف يسبب ربط مُنصت بالحدث 'readable' أن تُقرَأ  بعض كمية البيانات إلى ذاكرة مؤقتة داخلية.

const readable = getReadableStreamSomehow();
readable.on('readable', function() {
 
يوجد بعض البيانات لتُقرأ الآن.//

  let data;

  while (data = this.read()) {
    console.log(data);
  }
});

سوف يُطلق الحدث 'readable' أيضًا حالما يتم التوصل إلى نهاية بيانات المجرى ولكن قبل أن يُطلق الحدث 'end'. فعليًا، يشير الحدث 'readable' أن المجرى يملك معلومات جديدة وهي: إمّا بيانات جديدة متوفرة أو تم الوصول إلى نهاية المجرى. في الحالة الأولى، سوف يعيد stream.read()‎ البيانات المتوفرة. في الحالة الثانية، سوف يعيد stream.read()‎ القيمة null . على سبيل المثال، في المثال التالي، foo.txt هو ملف فارغ:

const fs = require('fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
  console.log('end');
});

خرج تشغيل هذا السكربت هو:

$ node test.js
readable: null
end

بشكل عام، فإنّ  آليات readable.pipe()‎ والحدث  'data' هي أسهل للفهم من الحدث 'readable'. ولكن، معالجة 'readable' قد تنتج زيادة بمعدل الانتاجية .

إذا استُخدم  كلا الحدثين 'readable' و 'data'  في نفس الوقت، فإن الحدث  'readable' يأخذ الأولوية في التحكم بالتدفق؛  أي أنّ الحدث 'data' سوف يُطلق فقط إذا استُدعي stream.read()‎. قد تصبح قيمة الخاصية  readableFlowing هي false . إذا كان هناك مُنصتات للحدث 'data' عندما يُزال الحدث 'readable'، سيبدأ المجرى بالتدفق؛ أي أن الأحداث 'data' سوف تُطلق دون استدعاء ‎.resume()‎

readable.destroy([error]‎)‎

أُضيف في الإصدار: 8.0.0.

  • Error>‎ :‎error> الخطأ الحاصل الذي سيُمرَّر عند تحميل الحدث 'error'.
  • القيمة المُعادة: ‎<this>‎‎

يهدم التابع المجرى ويطلق الحدث 'error' و 'close'. بعد هذا الاستدعاء، سوف يحرر المجرى القابل للقراءة أيّة موارد داخلية وسوف تُتجاهل الاستدعاءات اللاحقة للتابع push()‎ . لا ينبغي على المنفذين إعادة تعريف هذا التابع، ولكن يمكنهم تطبيق readable._destroy()‎‎ بدلًا عنه.

readable.isPaused()‎

أُضيف في الإصدار: 0.11.14.

  • القيمة المُعادة: ‎<boolean>‎‎

يعيد التابع readable.isPaused()‎ حالة التشغيل الحالية للمجرى Readable . يُستخدم هذا التابع بالمقام الأول من قبل آليات ترتكز على التابع readable.pipe()‎. في معظم الحالات الطبيعية، لا يوجد حاجة لاستخدام هذا التابع بشكل مباشر.

const readable = new stream.Readable();

readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false

readable.pause()‎

أضيف في الإصدار: 0.9.4.

القيمة المُعادة: ‎<this>‎

سوف يسبب التابع readable.pause()‎ وقف إطلاق أحداث 'data' في نمط التدفق، مع إخراج المجرى من نمط التدفق. أية بيانات تصبح متوفرة ستبقى في الذاكرة المؤقتة الداخلية.

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  readable.pause();
  console.log('There will be no additional data for 1 second.');
  setTimeout(() => {
    console.log('Now data will start flowing again.');
    readable.resume();
  }, 1000);
});

لا يملك التابع readable.pause()‎ أي تأثير إذا كان هناك مُنصت للحدث 'readable'.

readable.pipe(destination[, options]‎)‎

أُضيف في الإصدار: 0.9.4.

  • ‎<stream.Writable>‎ :destination الوجهة التي ستُكتَب فيها البيانات.
  • Object> :options‎> خيارات النقل.
    • boolean>‎ :‎end> إنهاء الكاتب عندما ينتهي القارئ. القيمة الافتراضية: true.
  • القيمة المُعادة: ‎<stream.Writable>‎ تعاد الوجهة destination سامحةً بسلسلة من الأنابيب والممرات إذا كانت المجاري من النوع المزدوج (Duplex) أو مجاري تحويل (Transform).

يربط التابع readable.pipe()‎ مجرًى قابلًا للكتابة مع مجرًى قابلٍ للقراءة مسببًا قلبه تلقائيّا إلى نمط التدفق ودفع جميع بياناته إلى المجرى القابل للكتابة المرتبط به. أي كأن هذا التابع يضع أنبوبًا بين المجريين لتتدفق البيانات من أحدهما إلى الآخر عبره. سيضبط تدفق البيانات تلقائيًا لذلك لن يُغمر مجرى Writable الهدف من قبل مجرى قابل للقراءة(Readable) أسرع منه.

ينقل المثال التالي كل البيانات من المجرى readable إلى ملف اسمه file.txt:

const fs = require('fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');



تذهب كل البيانات من المجرى القابل للقراءة إلى 'file.txt' //

readable.pipe(writable);

من الممكن ربط عدة مجاري قابلة للكتابة مع مجرى واحد قابل للقراءة. يعيد التابع readable.pipe()‎ مرجعًا إلى مجرى الوجهة جاعلًا من الممكن إقامة سلسلة من المجاري المتصلة ببعضها بأنابيب:

const fs = require('fs');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

بشكل افتراضي، يُستدعى stream.end()‎ على مجرى الوجهة القابل للكتابة (Writable) عندما يطلق مجرى Readable المصدر الحدث 'end'. لذلك لن تكون الوجهة قابلةً للكتابة بعد الآن. لتعطيل هذا السلوك الافتراضي، يمكن أن تمرير القيمة false إلى الخيار end مسبّبًا بقاء الوجهة مفتوحة:

reader.pipe(writer, { end: false });
reader.on('end', () => {
  writer.end('Goodbye\n');
});

أحد التحذيرات المهمة أنّه إذا  أطلق المجرى Readable خطأً أثناء المعالجة، لن يُغلق مجرى الوجهة Writable تلقائيًا، لذا من الضروري في هذه الحالة إغلاق كل مجرى يدويًا لمنع التسريب في الذاكرة.

لن يغلق المجريان process.stderr و process.stdout القابلان للكتابة (Writable) أبدًا حتى تنتهي عملية Node.js بغض النظر عن الخيارات المحدّدة.

readable.read([size]‎)‎

أضيف في الإصدار: 0.9.4.

  • ‎<number>‎ :size وسيط اختياري يحدد كمية البيانات للقراءة.
  • القيمة المعادة: ‎<string>‎‎ | <Buffer>‎ | <null>‎ | <any>‎

يسحب التابع readable.read()‎ بعض البيانات من الذاكرة المؤقتة الداخلية ويعيدها. إذا لم يكن هناك بيانات متوافرة للقراءة، يعيد التابع القيمة null. بشكل افتراضي، ستعاد البيانات في كائن من النوع Buffer  إلّا إذا حُدد الترميز باستخدام التابع readable.setEncoding()‎ أو إذا كان المجرى يشتغل في نمط الكائن.

يحدد الوسيط الاختياري size عدد البايتات المحدد للقراءة. إذا لم يكن هنالك بايتات بالحجم size متوافرة للقراءة، ستُعاد القيمة null إلّا إذا كان المجرى قد انتهى. في هذه الحالة، ستُعاد كل البيانات المتبقية في ذاكرة التخزين المؤقت الداخلية.

إذا لم يكن الوسيط size محدّدًا، ستُعاد كل البيانات المحتواة في ذاكرة التخزين المؤقت الداخلية.

ينبغي أن يُستدعى التابع readable.read()‎ على مجاري Readable المُشغّلة في نمط التوقف المؤقت فقط، في نمط التدفق، يُستدعى readable.read()‎ تلقائيًا حتى فراغ ذاكرة التخزين المؤقت الداخلية بالكامل.

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  let chunk;
  while (null !== (chunk = readable.read())) {
    console.log(`Received ${chunk.length} bytes of data.`);
  }
})

سيعيد المجرى Readable في نمط الكائن عنصرًا وحيدًا دائمًا من استدعاء readable.read(size)‎ بغض النظر عن عن قيمة الوسيط size.

إذا أعاد التابع readable.read()‎ قطعة من البيانات، سيُطلق أيضًا الحدث 'data'.

استدعاء stream.read([size]‎)‎ بعد أن يكون الحدث 'end' قد أُطلق سيعيد null. لن تُطلَق أي أخطاء وقت التشغيل (runtime).

readable.readableHighWaterMark

أضيفت في الإصدار: 9.3.0.

  • القيمة المعادة: ‎<number>‎‎

تعيد قيمة الخاصية highWaterMark المُمرّرة عند إنشاء مجرى Readable هذا.

readable.readableLength

أُضيفت في الإصدار:9.4.0.

  • القيمة المعادة:‎<number>‎‎

تحوي هذه الخاصية عدد البايتات (أو الكائنات) الجاهزة للقراءة في الطابور. تقدم هذه القيمة البيانات الداخلية المخزنة بما يتوافق مع حالة highWaterMark.

readable.resume()‎

سجل التغييرات

الإصدار التغييرات
10.0.0 لا يملك resume()‎‎ تأثير إذا كان هناك استماع للحدث  'readable'.
0.9.4 أُضيف في الإصدار 0.9.4.
  • القيمة المعادة: ‎<this>‎

يسبب التابع readable.resume()‎ إيقاف صريح لمجرى Readable  لاستئناف إطلاق أحداث 'data'، قالبًا المجرى إلى نمط التدفق.

يمكن أن يُستخدم التابع readable.resume()‎ لاستهلاك كامل للبيانات من المجرى دون معالجة أي منها فعليًا.

getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Reached the end, but did not read anything.');
  });

لا يملك التابع readable.resume()‎ تأثير إذا كان هناك مُنصت للحدث  'readable'.

readable.setEncoding(encoding)‎

أُضيف في الإصدار: 0.9.4.

  • ‎‎<string>‎ :encoding الترميز المراد استعماله.
  • القيمة المعادة: ‎<this>‎

يضبط التابع readable.setEncoding()‎ ترميز المحارف للبيانات المقروءة من المجرى Readable.

بشكل افتراضي، لا يُخصص أي ترميز وستعاد بيانات المجرى في كائنات من النوع Buffer، ضبط الترميز يتسبب بأن تُعاد بيانات المجرى كسلسلة نصية مرمّزة بالترميز المحدد بدلًا من إعادتها في كائنات من النوع Buffer. على سبيل المثال، سيسسب استدعاء readable.setEncoding('utf8')‎ بأن تُفسر بيانات الخرج كبيانات مرمزة بالترميز UTF-8، وتُمرر كسلاسل نصية. سوف يسبب استدعاء readable.setEncoding('hex')‎ أن تُرمّز البيانات بشكل سلسلة نصية ستة عشرية.

سيعالج المجرى Readable المحارف متعددة البايتات المُستلمة خلال المجرى بصورة صحيحة وإلّا قد تصبح مرمّزة بشكل غير صحيح إذا سُحبت من المجرى ببساطة في كائنات من النوع Buffer.

const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
  assert.equal(typeof chunk, 'string');
  console.log('got %d characters of string data', chunk.length);
});

readable.unpipe([destination]‎)‎

أضيف في الإصدار:0.9.4.

  • ‎<stream.Writable>‎ :‎destination مجرى محدد اختياريًا
  • القيمة المُعادة: ‎<this>‎

يفصل التابع readable.unpipe()‎ مجرى Writable المربوط مؤخرًا باستخدام التابع  stream.pipe()‎.

إذا لم يُحدد مجرى الوجهة (destination)، ستفصل حينذاك كل الأنابيب(الممرات).

إذا كان destination محددًا، ولكن لا يوجد أنابيب(قنوات) مُنصّبة عليه، لن يفعل التابع حينذاك شيئًا.

const fs = require('fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');



تذهب كل البيانات من المجرى القابل للقراءة readable إلى 'file.txt'// 
ولكن فقط للثانية الأولى.//

readable.pipe(writable);
setTimeout(() => {
  console.log('Stop writing to file.txt');
  readable.unpipe(writable);
  console.log('Manually close the file stream');
  writable.end();
}, 1000);