โ† All Chapters

Chapter 11

Advanced Recipes

Pages 427-473nodejsasync-initbatchingcachingcancellation
Ch 1Ch 2Ch 3Ch 4Ch 5Ch 6Ch 7Ch 8Ch 9Ch 10Ch 11Ch 12Ch 13

Chapter 11: Advanced Recipes

Summary

This chapter takes a problem-solution approach, presenting four practical recipes for common Node.js challenges. Each arises from the tension between async programming and the single-threaded event loop: components that need async initialization before accepting calls, optimizing throughput via request batching and caching, providing cancellation for async operations, and running CPU-intensive work without blocking I/O.

The thread running through all four recipes is the same: Node.js's single-threaded, event-loop architecture creates both constraints and opportunities. These patterns work with the architecture rather than fighting it.

Key Concepts

Dealing with Async Initialized Components

The book introduces a DB class that extends EventEmitter with a connected property. The connect() method simulates an async connection (via setTimeout), sets connected = true, and emits 'connected'. The query() method throws 'Not connected yet' if called before the connection is established.

This is a common problem with database drivers, message queue clients, and any component requiring network handshakes during initialization.

โ„น๏ธInfo

Three Approaches (plus a State Pattern refinement)

  1. Local initialization check -- await once(db, 'connected') before every query
  2. Delayed startup -- initialize().then(() => { /* use db */ })
  3. Pre-initialization queue -- buffer commands, replay after connect
  4. State pattern -- InitializedState / QueuingState classes

Local Initialization Check

import { once } from 'events'
import { db } from './db.js'

db.connect()

async function updateLastAccess () {
  if (!db.connected) {
    await once(db, 'connected')
  }
  await db.query(`INSERT (${Date.now()}) INTO "LastAccesses"`)
}

Every caller must check. Burden on the consumer. Fragile when you forget.

Delayed Startup

async function initialize () {
  db.connect()
  await once(db, 'connected')
}

initialize().then(() => {
  updateLastAccess()
  setTimeout(() => { updateLastAccess() }, 600)
})

Must know all consumers in advance. Adds startup delay. Does not handle reinitialization.

Pre-Initialization Queue

class DB extends EventEmitter {
  connected = false
  commandsQueue = []

  async query (queryString) {
    if (!this.connected) {
      console.log(`Request queued: ${queryString}`)
      return new Promise((resolve, reject) => {      // (1)
        const command = () => {
          this.query(queryString)
            .then(resolve, reject)
        }
        this.commandsQueue.push(command)
      })
    }
    console.log(`Query executed: ${queryString}`)
  }

  connect () {
    setTimeout(() => {
      this.connected = true
      this.emit('connected')
      this.commandsQueue.forEach(command => command()) // (2)
      this.commandsQueue = []
    }, 500)
  }
}
๐Ÿ’กTip

Best of Both Worlds The pre-initialization queue makes the component appear immediately available. Callers don't need to know about initialization state. The command closure re-invokes this.query() after connection, forwarding the result to the original promise via resolve/reject.

State Pattern Refinement

The book applies the State pattern from Chapter 9 with two state classes:

class InitializedState {
  async query (queryString) {
    console.log(`Query executed: ${queryString}`)
  }
}

const METHODS_REQUIRING_CONNECTION = ['query']
const deactivate = Symbol('deactivate')

class QueuingState {
  constructor (db) {
    this.db = db
    this.commandsQueue = []

    METHODS_REQUIRING_CONNECTION.forEach(methodName => {
      this[methodName] = function (...args) {
        console.log('Command queued:', methodName, args)
        return new Promise((resolve, reject) => {
          const command = () => {
            db[methodName](...args).then(resolve, reject)
          }
          this.commandsQueue.push(command)
        })
      }
    })
  }

  [deactivate] () {
    this.commandsQueue.forEach(command => command())
    this.commandsQueue = []
  }
}

class DB extends EventEmitter {
  constructor () {
    super()
    this.state = new QueuingState(this)          // (1)
  }

  async query (queryString) {
    return this.state.query(queryString)          // (2)
  }

  connect () {
    setTimeout(() => {
      this.connected = true
      this.emit('connected')
      const oldState = this.state
      this.state = new InitializedState(this)    // (3)
      oldState[deactivate] && oldState[deactivate]()
    }, 500)
  }
}
โ„น๏ธInfo

In the Wild Mongoose (MongoDB ORM): queues operations before connection opens, executes on connect. pg (PostgreSQL client): queues every query regardless of state, immediately tries to execute the queue. Both provide excellent developer experience (DX).

Async Request Batching and Caching

The book builds an e-commerce API server with a totalSales(product) function that scans 100,000 transactions in a LevelDB database. The query is intentionally slow to demonstrate the impact of batching and caching.

Batching with Promises

Two key properties of promises make batching elegant:

  • Multiple then() listeners can attach to the same promise
  • then() is guaranteed to be invoked asynchronously, even if the promise is already resolved
const runningRequests = new Map()

export async function totalSales (product) {
  if (runningRequests.has(product)) {
    return runningRequests.get(product)    // piggyback on in-flight request
  }
  const promise = actualTotalSales(product)
  runningRequests.set(product, promise)
  try {
    return await promise
  } finally {
    runningRequests.delete(product)        // allow fresh requests after completion
  }
}

Caching (Combined with Batching)

const cache = new Map()
const runningRequests = new Map()

export async function totalSales (product) {
  if (cache.has(product)) {
    return cache.get(product)              // serve from cache
  }
  if (runningRequests.has(product)) {
    return runningRequests.get(product)     // batch with in-flight request
  }
  const promise = actualTotalSales(product)
  runningRequests.set(product, promise)
  try {
    const result = await promise
    cache.set(product, result)
    setTimeout(() => cache.delete(product), ttlMs)  // TTL-based expiry
    return result
  } finally {
    runningRequests.delete(product)
  }
}
โš ๏ธWarning

Zalgo Anti-Pattern Since we are dealing with async APIs, always return cached values asynchronously, even when accessing the cache is synchronous. This avoids the Zalgo anti-pattern from Chapter 3 -- a function that sometimes behaves synchronously and sometimes asynchronously.

Canceling Async Operations

Most async I/O cannot be truly aborted at the OS level. Cancellation is cooperative: the wrapper discards the result rather than stopping the underlying operation.

The cancelObj Pattern

async function cancelableOperation(cancelObj) {
  const result1 = await step1()
  if (cancelObj.cancelRequested) throw new Error('Canceled')

  const result2 = await step2(result1)
  if (cancelObj.cancelRequested) throw new Error('Canceled')

  return result2
}

// Usage
const cancelObj = { cancelRequested: false }
const promise = cancelableOperation(cancelObj)
// Later...
cancelObj.cancelRequested = true

AbortController (Standard API)

const controller = new AbortController()

// Pass signal to APIs that support it
const response = await fetch(url, { signal: controller.signal })

// Cancel from anywhere
controller.abort()
โ„น๏ธInfo

AbortController Support Natively supported by fetch, fs.readFile, setTimeout, stream.pipeline, and many Node.js core APIs since v15+. Libraries like Axios also accept it.

Running CPU-Bound Tasks

๐Ÿ”ดDanger

The Core Problem Any synchronous computation longer than a few milliseconds blocks the event loop. During that time, zero I/O events are processed. A 2-second computation means all connected clients experience a 2-second freeze.

The book demonstrates three solutions using the subset sum problem (NP-complete, exponential worst case) as a realistic CPU-bound workload.

Strategy 1: setImmediate Interleaving

Break computation into chunks. Yield to the event loop between chunks.

function subsetSumInterleaved(set, target, callback) {
  let index = 0

  function step() {
    const startTime = Date.now()
    // Process for ~10ms, then yield
    while (index < totalCombinations && Date.now() - startTime < 10) {
      // check one combination
      index++
    }
    if (index < totalCombinations) {
      setImmediate(step)     // yield to event loop, then continue
    } else {
      callback(null, results)
    }
  }

  step()
}

Tradeoff: server stays responsive, but total computation takes longer.

Strategy 2: External Process (child_process.fork)

// main.js
const { fork } = require('child_process')
const worker = fork('./subset-sum-worker.js')

worker.send({ set: [1, 2, 3, 7, 8], target: 10 })
worker.on('message', result => {
  console.log('Subsets:', result)
})

// subset-sum-worker.js
process.on('message', ({ set, target }) => {
  const result = computeSubsetSum(set, target)
  process.send(result)
})

Full process isolation. The main event loop is completely unaffected. Cost: higher memory (separate V8 heap + OS process) and IPC serialization.

Strategy 3: Worker Threads

// main.js
const { Worker } = require('worker_threads')

const worker = new Worker('./subset-sum-worker.js', {
  workerData: { set: [1, 2, 3, 7, 8], target: 10 }
})

worker.on('message', result => console.log('Subsets:', result))
worker.on('error', err => console.error(err))

// subset-sum-worker.js
const { workerData, parentPort } = require('worker_threads')
const result = computeSubsetSum(workerData.set, workerData.target)
parentPort.postMessage(result)
๐Ÿ’กTip

Worker Threads vs Child Processes Worker threads run in the same OS process with their own V8 isolate and event loop. They are lighter than child processes, support SharedArrayBuffer for true shared memory, and transferable objects for zero-copy data transfer. Use them for CPU-bound work in production.

Production Pattern: Thread Pool

const { Worker } = require('worker_threads')

class ThreadPool {
  constructor(workerScript, size = 4) {
    this.workers = Array.from({ length: size }, () => ({
      worker: new Worker(workerScript),
      busy: false
    }))
    this.queue = []
  }

  run(data) {
    return new Promise((resolve, reject) => {
      const available = this.workers.find(w => !w.busy)
      if (available) {
        this._dispatch(available, data, resolve, reject)
      } else {
        this.queue.push({ data, resolve, reject })
      }
    })
  }

  _dispatch(entry, data, resolve, reject) {
    entry.busy = true
    entry.worker.postMessage(data)
    entry.worker.once('message', result => {
      entry.busy = false
      resolve(result)
      if (this.queue.length > 0) {
        const next = this.queue.shift()
        this._dispatch(entry, next.data, next.resolve, next.reject)
      }
    })
    entry.worker.once('error', reject)
  }
}

Mind Map

Zalgo anti-pattern from: Chapter 3 -- Callbacks and Events

  • Worker threads connect to: Chapter 12 -- Scalability (multi-process patterns)

20 quiz ยท 24 cards ยท 2 exercises ยท Ch 11 of 13