← All Chapters

Chapter 13

Messaging and Integration Patterns

Pages 541-612messagingpubsubqueuesstreamsamqp
Ch 1Ch 2Ch 3Ch 4Ch 5Ch 6Ch 7Ch 8Ch 9Ch 10Ch 11Ch 12Ch 13

Chapter 13: Messaging and Integration Patterns

Summary

If scalability (Ch12) is about distributing systems, integration is about connecting them. This final chapter explores how distributed Node.js applications communicate using messaging patterns beyond direct API calls. The book references Enterprise Integration Patterns by Hohpe and Woolf (the "Bible" of messaging, 65 patterns in 700+ pages) and covers the most important ones from Node.js's perspective.

Three messaging technologies are compared throughout: Redis (simple broker with pub/sub and streams), ZeroMQ (brokerless peer-to-peer), and RabbitMQ/AMQP (full-featured broker with delivery guarantees).

Key Concepts

Messaging Fundamentals

Four fundamental elements of messaging systems:

  1. Direction: one-way or request/reply
  2. Purpose: determines message content (command, event, document)
  3. Timing: synchronous (phone call) or asynchronous (SMS)
  4. Delivery: peer-to-peer or via a broker
ℹ️Info

Message Types

  1. Command -- serialized Command object (Ch9), triggers an action. RESTful verbs are commands.
  2. Event -- notifies something happened. Used in distributed Observer pattern.
  3. Document -- carries data for processing, no instruction on what to do.

Asynchronous messaging is like SMS: doesn't require the recipient to be connected. Messages can be stored in a queue (consumed once, then removed) or a stream/log (append-only, persists after consumption, supports replay).

Peer-to-Peer vs Broker-Based

  Peer-to-peer              With a message broker
  ┌────────┐                ┌────────┐
  │  Node  │<──>│  Node  │  │  Node  │──>┌────────┐──>│  Node  │
  │        │<──>│  Node  │  │  Node  │──>│ Broker │──>│  Node  │
  └────────┘                └────────┘   └────────┘

P2P advantages: no SPOF, no broker to scale, lower latency. Broker advantages: decoupling, persistence, routing, protocol bridging. RabbitMQ supports AMQP, MQTT (IoT), and STOMP (text-based).

Publish/Subscribe Pattern

Pub/Sub is a distributed Observer pattern. Publishers emit messages without knowing receivers. Subscribers register interest in topics.

Building a Real-Time Chat App

The book builds a minimal chat with the ws WebSocket package:

import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'

const server = createServer((req, res) => {
  return staticHandler(req, res, { public: 'www' })
})

const wss = new ws.Server({ server })
wss.on('connection', client => {
  console.log('Client connected')
  client.on('message', msg => {
    console.log(`Message: ${msg}`)
    broadcast(msg)
  })
})

function broadcast (msg) {
  for (const client of wss.clients) {
    if (client.readyState === ws.OPEN) {
      client.send(msg)
    }
  }
}

server.listen(process.argv[2] || 8080)
⚠️Warning

The Scaling Problem Running two instances (node index.js 8080 and node index.js 8081), clients on different instances can't exchange messages. broadcast() only iterates wss.clients for that instance. The servers are not talking to each other.

Redis as a Simple Message Broker

The solution: integrate instances via Redis pub/sub. Each instance is both publisher and subscriber.

import Redis from 'ioredis'

const redisSub = new Redis()         // separate connection for subscribing
const redisPub = new Redis()         // separate connection for publishing

// When a client sends a message, publish to Redis
client.on('message', msg => {
  redisPub.publish('chat', msg)      // publish to all instances
})

// Subscribe to receive messages from other instances
redisSub.subscribe('chat')
redisSub.on('message', (channel, msg) => {
  broadcast(msg)                     // broadcast to local WebSocket clients
})
⚠️Warning

Redis Pub/Sub Limitations Redis pub/sub is fire-and-forget. Messages are not persisted. If a subscriber is disconnected when a message is published, that message is lost forever. For reliable delivery, use Redis Streams or AMQP.

💡Tip

Why Two Redis Connections? A Redis client in subscriber mode can only receive messages -- it cannot execute PUBLISH or any other commands. Hence redisSub and redisPub are separate connections.

ZeroMQ PUB/SUB (Peer-to-Peer)

import zmq from 'zeromq'

// Publisher
const pub = new zmq.Publisher()
await pub.bind('tcp://127.0.0.1:5000')
await pub.send(['chat', JSON.stringify({ user: 'alice', text: 'hello' })])

// Subscriber
const sub = new zmq.Subscriber()
sub.connect('tcp://127.0.0.1:5000')
sub.subscribe('chat')
for await (const [topic, message] of sub) {
  console.log(topic.toString(), JSON.parse(message))
}

Key difference: no broker. pub.bind() and sub.connect(). Messages dropped if no subscribers connected. Subscribers filter by topic prefix on the client side.

Reliable Message Delivery with Queues (AMQP)

AMQP via RabbitMQ provides reliable messaging:

Producer -> Exchange -> Binding -> Queue -> Consumer

Exchange Types

TypeRouting Behavior
DirectExact routing key match
FanoutBroadcast to all bound queues
TopicPattern matching: * (one word), # (zero or more)
HeadersMatch on message header attributes

Durable Subscribers

import amqplib from 'amqplib'

const conn = await amqplib.connect('amqp://localhost')
const ch = await conn.createChannel()

await ch.assertExchange('chat', 'fanout', { durable: true })
const { queue } = await ch.assertQueue('service-chat-queue', { durable: true })
await ch.bindQueue(queue, 'chat', '')

ch.publish('chat', '', Buffer.from(JSON.stringify(msg)), {
  persistent: true
})

ch.consume(queue, (msg) => {
  console.log(JSON.parse(msg.content.toString()))
  ch.ack(msg)   // acknowledge after processing
})
💡Tip

Durability Checklist For truly reliable delivery: (1) durable exchange, (2) durable queue with a stable name, (3) persistent messages (persistent: true), (4) manual acknowledgment (ch.ack(msg) after successful processing).

Reliable Messaging with Streams

Streams vs Queues

PropertyQueueStream
After consumptionMessage removedMessage persists
Multiple consumersCompeting (one gets it)Each group reads independently
ReplayNot possiblePossible (seek by ID or timestamp)
OrderingPer-queue FIFOGlobal append order with IDs
Use caseTask distributionEvent sourcing, audit logs, replay

Redis Streams

// Producer: append to stream
await redis.xAdd('events', '*', { type: 'order', data: '...' })

// Simple consumer: read new entries
const entries = await redis.xRead(
  { key: 'events', id: '$' },        // '$' = only new messages
  { COUNT: 10, BLOCK: 5000 }
)

// Consumer group: competing consumers within a group
await redis.xGroupCreate('events', 'workers', '0', { MKSTREAM: true })
const entries = await redis.xReadGroup('workers', 'worker-1',
  { key: 'events', id: '>' },        // '>' = undelivered messages
  { COUNT: 10, BLOCK: 5000 }
)
await redis.xAck('events', 'workers', messageId)

Task Distribution Patterns

ZeroMQ Fan-out/Fan-in (PUSH/PULL)

              ┌─── PULL worker-1 ───┐
PUSH ventilator──── PULL worker-2 ────── PUSH -> PULL collector (sink)
              └─── PULL worker-3 ───┘

The book builds a distributed hashsum cracker: the ventilator splits the keyspace into chunks, workers try each chunk, and the sink collects the result. PUSH distributes tasks round-robin. PULL collects results.

AMQP Competing Consumers

Multiple consumers bind to the same queue. RabbitMQ distributes round-robin. Each message processed by exactly one consumer:

ch.prefetch(1)   // one unacked message at a time (fairness)
ch.consume(taskQueue, async (msg) => {
  await processTask(JSON.parse(msg.content.toString()))
  ch.ack(msg)
})

Redis Streams Consumer Groups

Consumer groups combine both patterns: competing consumers within a group (task distribution) and independent reading across groups (fan-out):

await redis.xGroupCreate('tasks', 'workers', '0', { MKSTREAM: true })

// Worker reads from group (each gets different messages)
const tasks = await redis.xReadGroup('workers', 'worker-1',
  { key: 'tasks', id: '>' },
  { COUNT: 1, BLOCK: 5000 }
)

await redis.xAck('tasks', 'workers', taskId)

Request/Reply Patterns

Implementing request/reply over async messaging requires two sub-patterns:

Correlation Identifier

import { randomUUID } from 'crypto'

const correlationId = randomUUID()
const pending = new Map()   // correlationId -> { resolve, reject }

pending.set(correlationId, { resolve, reject })
ch.publish('requests', '', Buffer.from(JSON.stringify({
  correlationId,
  data: { ... }
})))

// On reply channel
ch.consume(replyQueue, (msg) => {
  const { correlationId, result } = JSON.parse(msg.content.toString())
  const handler = pending.get(correlationId)
  if (handler) {
    handler.resolve(result)
    pending.delete(correlationId)
  }
})

Return Address

// Requester: create exclusive reply queue
const { queue: replyQueue } = await ch.assertQueue('', { exclusive: true })

ch.sendToQueue('requests', Buffer.from(data), {
  correlationId: randomUUID(),
  replyTo: replyQueue              // return address
})

// Responder: read replyTo from incoming message
ch.consume('requests', (msg) => {
  const result = processRequest(msg.content)
  ch.sendToQueue(msg.properties.replyTo, Buffer.from(result), {
    correlationId: msg.properties.correlationId
  })
  ch.ack(msg)
})
ℹ️Info

AMQP's Built-in Support AMQP natively supports correlationId and replyTo as message properties, making it well-suited for request/reply over async messaging.

Technology Comparison

FeatureRedis Pub/SubZeroMQ PUB/SUBRabbitMQ (AMQP)Redis Streams
ArchitectureBrokerPeer-to-peerBrokerBroker
PersistenceNoNoYes (durable)Yes (append-only)
Delivery guaranteeAt-most-onceAt-most-onceAt-least-onceAt-least-once
Consumer groupsNoNoCompeting consumersYes (XREADGROUP)
ReplayNoNoNoYes
ComplexityLowLowMedium-HighMedium

Mind Map

Connections

  • Previous: Chapter 12 -- Scalability and Architectural Patterns
  • Message broker integration pattern from Ch12's microservice section
  • Command messages reference the Command pattern from Chapter 9
  • Pub/Sub is distributed Observer pattern from Chapter 3

25 quiz · 30 cards · 2 exercises · Ch 13 of 13