← All Chapters

Chapter 6

Coding with Streams

Pages 159-230nodejsstreamsreadablewritabletransform
Ch 1Ch 2Ch 3Ch 4Ch 5Ch 6Ch 7Ch 8Ch 9Ch 10Ch 11Ch 12Ch 13

Chapter 6: Coding with Streams

Summary

"Stream all the things!" -- Streams are one of the most powerful features in Node.js. They let you process data piece-by-piece instead of loading everything into memory, providing both spatial efficiency (the book shows gzip-buffer.js failing on large files due to V8's ~2GB buffer limit, while gzip-stream.js handles them with constant memory via pipe) and time efficiency (Figure 6.2 shows the assembly-line effect: reading, compressing, and writing happen concurrently on different chunks). Streams are also inherently composable -- adding encryption to the gzip pipeline is just adding .pipe(createCipheriv()).

This chapter covers all four stream types (Readable, Writable, Duplex, Transform), consuming streams in flowing mode, paused mode, and with async iterators, implementing custom streams (_read(), _write(), _transform(), _flush()), backpressure mechanics (highWaterMark and 'drain'), error handling with pipe() vs pipeline(), and advanced piping patterns (combining, forking, merging, multiplexing/demultiplexing).

Key Concepts

Why Streams Matter: Buffering vs Streaming

ℹ️Info

Buffering vs Streaming Buffering (Figure 6.1): collect all data into memory, then process it. Simple, but fails for large datasets (V8 heap limit ~1.5-2 GB). The book's gzip-buffer.js crashes with RangeError on large files.

Streaming (Figure 6.2): process data chunk-by-chunk as it arrives. Constant memory usage regardless of total data size. gzip-stream.js works on any file size.

Buffering:   [====== read ALL ======] -> [====== process ALL ======] -> [====== write ALL ======]

Streaming:   [r1][r2][r3]...   (concurrent with processing and writing)
              [p1][p2][p3]...
               [w1][w2][w3]...

Three key advantages:

  1. Spatial efficiency -- process data larger than available memory
  2. Time efficiency -- assembly-line concurrency, start processing immediately
  3. Composability -- connect processing steps with pipe: read | gzip | encrypt | write

Composability: Adding Encryption

// Original: read -> gzip -> write
fs.createReadStream(inputFile)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(outputFile))

// Adding encryption is one line:
fs.createReadStream(inputFile)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipheriv('aes-256-cbc', key, iv))   // just add this
  .pipe(fs.createWriteStream(outputFile))

The Four Stream Types

TypeDescriptionExamplesImplement
ReadableSource of datafs.createReadStream, http.IncomingMessage_read()
WritableDestination for datafs.createWriteStream, http.ServerResponse_write()
DuplexBoth readable and writable (independent)net.Socket, WebSocket_read() + _write()
TransformDuplex that transforms input to outputzlib.createGzip, crypto.createCipheriv_transform() + _flush()

All streams are EventEmitters. They work in binary mode (Buffer/string) by default, or object mode (any JS value) when objectMode: true.

Readable Streams

Three ways to consume:

Flowing mode (push) -- data pushed as fast as the consumer can handle:

readable.on('data', (chunk) => { /* process chunk */ })
// Triggered by: 'data' listener, .resume(), or .pipe()

Paused mode (pull) -- data pulled on demand:

readable.on('readable', () => {
  let chunk
  while ((chunk = readable.read()) !== null) {
    // process chunk
  }
})

Async iterators (modern approach):

for await (const chunk of readable) {
  // process chunk -- backpressure handled automatically
}
💡Tip

Async iterators are the cleanest approach for await...of handles backpressure automatically and integrates cleanly with async/await code. This is the book's recommended modern approach.

Implementing a custom Readable:

// Full class approach
class MyReadable extends Readable {
  _read(size) {
    this.push(someData)
    this.push(null) // signal EOF
  }
}

// Simplified construction
const readable = new Readable({
  read() {
    this.push('data')
    this.push(null)
  }
})

// From an iterable
const readable = Readable.from(['a', 'b', 'c'])

Writable Streams

const ok = writable.write(chunk, encoding, callback) // returns boolean
writable.end(finalChunk, encoding, callback)
⚠️Warning

Backpressure When write() returns false, the internal buffer has exceeded highWaterMark. Stop writing until the 'drain' event fires. Ignoring this causes unbounded memory growth.

if (!writable.write(chunk)) {
  await once(writable, 'drain')
}

Implementing a custom Writable:

class MyWritable extends Writable {
  _write(chunk, encoding, cb) {
    // Process chunk (save to DB, send to API, etc.)
    doSomethingAsync(chunk).then(() => cb()).catch(cb)
  }
  _final(cb) {
    // Cleanup when stream ends
    cb()
  }
}

Duplex Streams

A Duplex has independent readable and writable sides. Data written to the writable side has no inherent relationship to data read from the readable side. Example: a TCP net.Socket -- you write request bytes, you read response bytes.

Transform Streams

A Transform is a special Duplex where the readable output is derived from the writable input. Implement _transform() for per-chunk processing and _flush() for any final data when the input ends.

class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, cb) {
    this.push(chunk.toString().toUpperCase())
    cb()
  }
}
💡Tip

Filtering and Aggregating with Transform Filtering: skip chunks by calling cb() without this.push(). Aggregating: accumulate data in _transform(), emit the result in _flush():

_flush(cb) {
  this.push(JSON.stringify(this.accumulated))
  cb()
}

PassThrough Streams

A Transform that passes data through unchanged. Surprisingly useful:

  • Observability -- attach event listeners to monitor data without modifying it
  • Late piping -- buffer data in a PassThrough until the real destination is ready
  • Lazy streams -- return a PassThrough immediately, pipe the real source later

Connecting Streams: pipe() vs pipeline()

pipe() -- simple but limited:

readable.pipe(transform).pipe(writable)
// Backpressure: handled automatically
// Errors: NOT propagated -- must attach error handlers to EACH stream
// Cleanup: does NOT destroy streams on error

pipeline() -- the modern, safe way:

import { pipeline } from 'node:stream/promises'

await pipeline(
  fs.createReadStream('input.gz'),
  zlib.createGunzip(),
  fs.createWriteStream('output.txt')
)
// Errors: propagated automatically
// Cleanup: ALL streams destroyed on error
⚠️Warning

Always prefer pipeline() over pipe() pipe() is a footgun for error handling. If any stream errors, the other streams are NOT cleaned up, leading to memory leaks and dangling file descriptors. pipeline() solves all of this.

Piping Patterns

Combining -- wrap a pipeline into a single Duplex stream. Write to the first stream, read from the last. The combined stream is a black box.

Forking -- one Readable piped to multiple Writables. Each gets the same data. The slowest consumer throttles the source via backpressure.

Merging -- multiple Readables piped to one Writable. Must track when ALL sources end before ending the destination. Data interleaves non-deterministically.

Multiplexing/Demultiplexing -- combine multiple logical channels into one physical stream. Each chunk is prepended with a header (channel ID + length). The demux side reads headers and routes to the correct output stream.

Mind Map

Connections

  • Previous: Chapter 5 -- Promises and Async/Await
  • Next: Chapter 7 -- Creational Design Patterns
  • Streams build on the event system from Chapter 3 -- all streams are EventEmitters
  • The reactor pattern from Chapter 1 underpins stream I/O

25 quiz · 30 cards · 2 exercises · Ch 6 of 13