Chapter 6: Coding with Streams
Summary
"Stream all the things!" -- Streams are one of the most powerful features in Node.js.
They let you process data piece-by-piece instead of loading everything into memory,
providing both spatial efficiency (the book shows gzip-buffer.js failing on large
files due to V8's ~2GB buffer limit, while gzip-stream.js handles them with constant
memory via pipe) and time efficiency (Figure 6.2 shows the assembly-line effect:
reading, compressing, and writing happen concurrently on different chunks). Streams
are also inherently composable -- adding encryption to the gzip pipeline is just
adding .pipe(createCipheriv()).
This chapter covers all four stream types (Readable, Writable, Duplex, Transform),
consuming streams in flowing mode, paused mode, and with async iterators, implementing
custom streams (_read(), _write(), _transform(), _flush()), backpressure
mechanics (highWaterMark and 'drain'), error handling with pipe() vs pipeline(),
and advanced piping patterns (combining, forking, merging, multiplexing/demultiplexing).
Key Concepts
Why Streams Matter: Buffering vs Streaming
Buffering vs Streaming
Buffering (Figure 6.1): collect all data into memory, then process it. Simple,
but fails for large datasets (V8 heap limit ~1.5-2 GB). The book's gzip-buffer.js
crashes with RangeError on large files.
Streaming (Figure 6.2): process data chunk-by-chunk as it arrives. Constant memory usage regardless of total data size. gzip-stream.js works on any file size.
Buffering: [====== read ALL ======] -> [====== process ALL ======] -> [====== write ALL ======]
Streaming: [r1][r2][r3]... (concurrent with processing and writing)
[p1][p2][p3]...
[w1][w2][w3]...
Three key advantages:
- Spatial efficiency -- process data larger than available memory
- Time efficiency -- assembly-line concurrency, start processing immediately
- Composability -- connect processing steps with pipe:
read | gzip | encrypt | write
Composability: Adding Encryption
// Original: read -> gzip -> write
fs.createReadStream(inputFile)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(outputFile))
// Adding encryption is one line:
fs.createReadStream(inputFile)
.pipe(zlib.createGzip())
.pipe(crypto.createCipheriv('aes-256-cbc', key, iv)) // just add this
.pipe(fs.createWriteStream(outputFile))
The Four Stream Types
| Type | Description | Examples | Implement |
|---|---|---|---|
| Readable | Source of data | fs.createReadStream, http.IncomingMessage | _read() |
| Writable | Destination for data | fs.createWriteStream, http.ServerResponse | _write() |
| Duplex | Both readable and writable (independent) | net.Socket, WebSocket | _read() + _write() |
| Transform | Duplex that transforms input to output | zlib.createGzip, crypto.createCipheriv | _transform() + _flush() |
All streams are EventEmitters. They work in binary mode (Buffer/string) by default,
or object mode (any JS value) when objectMode: true.
Readable Streams
Three ways to consume:
Flowing mode (push) -- data pushed as fast as the consumer can handle:
readable.on('data', (chunk) => { /* process chunk */ })
// Triggered by: 'data' listener, .resume(), or .pipe()
Paused mode (pull) -- data pulled on demand:
readable.on('readable', () => {
let chunk
while ((chunk = readable.read()) !== null) {
// process chunk
}
})
Async iterators (modern approach):
for await (const chunk of readable) {
// process chunk -- backpressure handled automatically
}
Async iterators are the cleanest approach
for await...of handles backpressure automatically and integrates cleanly
with async/await code. This is the book's recommended modern approach.
Implementing a custom Readable:
// Full class approach
class MyReadable extends Readable {
_read(size) {
this.push(someData)
this.push(null) // signal EOF
}
}
// Simplified construction
const readable = new Readable({
read() {
this.push('data')
this.push(null)
}
})
// From an iterable
const readable = Readable.from(['a', 'b', 'c'])
Writable Streams
const ok = writable.write(chunk, encoding, callback) // returns boolean
writable.end(finalChunk, encoding, callback)
Backpressure
When write() returns false, the internal buffer has exceeded highWaterMark.
Stop writing until the 'drain' event fires. Ignoring this causes unbounded
memory growth.
if (!writable.write(chunk)) {
await once(writable, 'drain')
}
Implementing a custom Writable:
class MyWritable extends Writable {
_write(chunk, encoding, cb) {
// Process chunk (save to DB, send to API, etc.)
doSomethingAsync(chunk).then(() => cb()).catch(cb)
}
_final(cb) {
// Cleanup when stream ends
cb()
}
}
Duplex Streams
A Duplex has independent readable and writable sides. Data written to the
writable side has no inherent relationship to data read from the readable side.
Example: a TCP net.Socket -- you write request bytes, you read response bytes.
Transform Streams
A Transform is a special Duplex where the readable output is derived from the
writable input. Implement _transform() for per-chunk processing and _flush()
for any final data when the input ends.
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, cb) {
this.push(chunk.toString().toUpperCase())
cb()
}
}
Filtering and Aggregating with Transform
Filtering: skip chunks by calling cb() without this.push().
Aggregating: accumulate data in _transform(), emit the result in _flush():
_flush(cb) {
this.push(JSON.stringify(this.accumulated))
cb()
}
PassThrough Streams
A Transform that passes data through unchanged. Surprisingly useful:
- Observability -- attach event listeners to monitor data without modifying it
- Late piping -- buffer data in a PassThrough until the real destination is ready
- Lazy streams -- return a PassThrough immediately, pipe the real source later
Connecting Streams: pipe() vs pipeline()
pipe() -- simple but limited:
readable.pipe(transform).pipe(writable)
// Backpressure: handled automatically
// Errors: NOT propagated -- must attach error handlers to EACH stream
// Cleanup: does NOT destroy streams on error
pipeline() -- the modern, safe way:
import { pipeline } from 'node:stream/promises'
await pipeline(
fs.createReadStream('input.gz'),
zlib.createGunzip(),
fs.createWriteStream('output.txt')
)
// Errors: propagated automatically
// Cleanup: ALL streams destroyed on error
Always prefer pipeline() over pipe()
pipe() is a footgun for error handling. If any stream errors, the other
streams are NOT cleaned up, leading to memory leaks and dangling file
descriptors. pipeline() solves all of this.
Piping Patterns
Combining -- wrap a pipeline into a single Duplex stream. Write to the first stream, read from the last. The combined stream is a black box.
Forking -- one Readable piped to multiple Writables. Each gets the same data. The slowest consumer throttles the source via backpressure.
Merging -- multiple Readables piped to one Writable. Must track when ALL sources end before ending the destination. Data interleaves non-deterministically.
Multiplexing/Demultiplexing -- combine multiple logical channels into one physical stream. Each chunk is prepended with a header (channel ID + length). The demux side reads headers and routes to the correct output stream.
Mind Map
Connections
- Previous: Chapter 5 -- Promises and Async/Await
- Next: Chapter 7 -- Creational Design Patterns
- Streams build on the event system from Chapter 3 -- all streams are EventEmitters
- The reactor pattern from Chapter 1 underpins stream I/O