Chapter 13: Messaging and Integration Patterns
Summary
If scalability (Ch12) is about distributing systems, integration is about connecting them. This final chapter explores how distributed Node.js applications communicate using messaging patterns beyond direct API calls. The book references Enterprise Integration Patterns by Hohpe and Woolf (the "Bible" of messaging, 65 patterns in 700+ pages) and covers the most important ones from Node.js's perspective.
Three messaging technologies are compared throughout: Redis (simple broker with pub/sub and streams), ZeroMQ (brokerless peer-to-peer), and RabbitMQ/AMQP (full-featured broker with delivery guarantees).
Key Concepts
Messaging Fundamentals
Four fundamental elements of messaging systems:
- Direction: one-way or request/reply
- Purpose: determines message content (command, event, document)
- Timing: synchronous (phone call) or asynchronous (SMS)
- Delivery: peer-to-peer or via a broker
Message Types
- Command -- serialized Command object (Ch9), triggers an action. RESTful verbs are commands.
- Event -- notifies something happened. Used in distributed Observer pattern.
- Document -- carries data for processing, no instruction on what to do.
Asynchronous messaging is like SMS: doesn't require the recipient to be connected. Messages can be stored in a queue (consumed once, then removed) or a stream/log (append-only, persists after consumption, supports replay).
Peer-to-Peer vs Broker-Based
Peer-to-peer With a message broker
┌────────┐ ┌────────┐
│ Node │<──>│ Node │ │ Node │──>┌────────┐──>│ Node │
│ │<──>│ Node │ │ Node │──>│ Broker │──>│ Node │
└────────┘ └────────┘ └────────┘
P2P advantages: no SPOF, no broker to scale, lower latency. Broker advantages: decoupling, persistence, routing, protocol bridging. RabbitMQ supports AMQP, MQTT (IoT), and STOMP (text-based).
Publish/Subscribe Pattern
Pub/Sub is a distributed Observer pattern. Publishers emit messages without knowing receivers. Subscribers register interest in topics.
Building a Real-Time Chat App
The book builds a minimal chat with the ws WebSocket package:
import { createServer } from 'http'
import staticHandler from 'serve-handler'
import ws from 'ws'
const server = createServer((req, res) => {
return staticHandler(req, res, { public: 'www' })
})
const wss = new ws.Server({ server })
wss.on('connection', client => {
console.log('Client connected')
client.on('message', msg => {
console.log(`Message: ${msg}`)
broadcast(msg)
})
})
function broadcast (msg) {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
server.listen(process.argv[2] || 8080)
The Scaling Problem
Running two instances (node index.js 8080 and node index.js 8081), clients
on different instances can't exchange messages. broadcast() only iterates
wss.clients for that instance. The servers are not talking to each other.
Redis as a Simple Message Broker
The solution: integrate instances via Redis pub/sub. Each instance is both publisher and subscriber.
import Redis from 'ioredis'
const redisSub = new Redis() // separate connection for subscribing
const redisPub = new Redis() // separate connection for publishing
// When a client sends a message, publish to Redis
client.on('message', msg => {
redisPub.publish('chat', msg) // publish to all instances
})
// Subscribe to receive messages from other instances
redisSub.subscribe('chat')
redisSub.on('message', (channel, msg) => {
broadcast(msg) // broadcast to local WebSocket clients
})
Redis Pub/Sub Limitations Redis pub/sub is fire-and-forget. Messages are not persisted. If a subscriber is disconnected when a message is published, that message is lost forever. For reliable delivery, use Redis Streams or AMQP.
Why Two Redis Connections?
A Redis client in subscriber mode can only receive messages -- it cannot
execute PUBLISH or any other commands. Hence redisSub and redisPub are
separate connections.
ZeroMQ PUB/SUB (Peer-to-Peer)
import zmq from 'zeromq'
// Publisher
const pub = new zmq.Publisher()
await pub.bind('tcp://127.0.0.1:5000')
await pub.send(['chat', JSON.stringify({ user: 'alice', text: 'hello' })])
// Subscriber
const sub = new zmq.Subscriber()
sub.connect('tcp://127.0.0.1:5000')
sub.subscribe('chat')
for await (const [topic, message] of sub) {
console.log(topic.toString(), JSON.parse(message))
}
Key difference: no broker. pub.bind() and sub.connect(). Messages dropped
if no subscribers connected. Subscribers filter by topic prefix on the client side.
Reliable Message Delivery with Queues (AMQP)
AMQP via RabbitMQ provides reliable messaging:
Producer -> Exchange -> Binding -> Queue -> Consumer
Exchange Types
| Type | Routing Behavior |
|---|---|
| Direct | Exact routing key match |
| Fanout | Broadcast to all bound queues |
| Topic | Pattern matching: * (one word), # (zero or more) |
| Headers | Match on message header attributes |
Durable Subscribers
import amqplib from 'amqplib'
const conn = await amqplib.connect('amqp://localhost')
const ch = await conn.createChannel()
await ch.assertExchange('chat', 'fanout', { durable: true })
const { queue } = await ch.assertQueue('service-chat-queue', { durable: true })
await ch.bindQueue(queue, 'chat', '')
ch.publish('chat', '', Buffer.from(JSON.stringify(msg)), {
persistent: true
})
ch.consume(queue, (msg) => {
console.log(JSON.parse(msg.content.toString()))
ch.ack(msg) // acknowledge after processing
})
Durability Checklist
For truly reliable delivery: (1) durable exchange, (2) durable queue with a
stable name, (3) persistent messages (persistent: true), (4) manual
acknowledgment (ch.ack(msg) after successful processing).
Reliable Messaging with Streams
Streams vs Queues
| Property | Queue | Stream |
|---|---|---|
| After consumption | Message removed | Message persists |
| Multiple consumers | Competing (one gets it) | Each group reads independently |
| Replay | Not possible | Possible (seek by ID or timestamp) |
| Ordering | Per-queue FIFO | Global append order with IDs |
| Use case | Task distribution | Event sourcing, audit logs, replay |
Redis Streams
// Producer: append to stream
await redis.xAdd('events', '*', { type: 'order', data: '...' })
// Simple consumer: read new entries
const entries = await redis.xRead(
{ key: 'events', id: '$' }, // '$' = only new messages
{ COUNT: 10, BLOCK: 5000 }
)
// Consumer group: competing consumers within a group
await redis.xGroupCreate('events', 'workers', '0', { MKSTREAM: true })
const entries = await redis.xReadGroup('workers', 'worker-1',
{ key: 'events', id: '>' }, // '>' = undelivered messages
{ COUNT: 10, BLOCK: 5000 }
)
await redis.xAck('events', 'workers', messageId)
Task Distribution Patterns
ZeroMQ Fan-out/Fan-in (PUSH/PULL)
┌─── PULL worker-1 ───┐
PUSH ventilator──── PULL worker-2 ────── PUSH -> PULL collector (sink)
└─── PULL worker-3 ───┘
The book builds a distributed hashsum cracker: the ventilator splits the keyspace into chunks, workers try each chunk, and the sink collects the result. PUSH distributes tasks round-robin. PULL collects results.
AMQP Competing Consumers
Multiple consumers bind to the same queue. RabbitMQ distributes round-robin. Each message processed by exactly one consumer:
ch.prefetch(1) // one unacked message at a time (fairness)
ch.consume(taskQueue, async (msg) => {
await processTask(JSON.parse(msg.content.toString()))
ch.ack(msg)
})
Redis Streams Consumer Groups
Consumer groups combine both patterns: competing consumers within a group (task distribution) and independent reading across groups (fan-out):
await redis.xGroupCreate('tasks', 'workers', '0', { MKSTREAM: true })
// Worker reads from group (each gets different messages)
const tasks = await redis.xReadGroup('workers', 'worker-1',
{ key: 'tasks', id: '>' },
{ COUNT: 1, BLOCK: 5000 }
)
await redis.xAck('tasks', 'workers', taskId)
Request/Reply Patterns
Implementing request/reply over async messaging requires two sub-patterns:
Correlation Identifier
import { randomUUID } from 'crypto'
const correlationId = randomUUID()
const pending = new Map() // correlationId -> { resolve, reject }
pending.set(correlationId, { resolve, reject })
ch.publish('requests', '', Buffer.from(JSON.stringify({
correlationId,
data: { ... }
})))
// On reply channel
ch.consume(replyQueue, (msg) => {
const { correlationId, result } = JSON.parse(msg.content.toString())
const handler = pending.get(correlationId)
if (handler) {
handler.resolve(result)
pending.delete(correlationId)
}
})
Return Address
// Requester: create exclusive reply queue
const { queue: replyQueue } = await ch.assertQueue('', { exclusive: true })
ch.sendToQueue('requests', Buffer.from(data), {
correlationId: randomUUID(),
replyTo: replyQueue // return address
})
// Responder: read replyTo from incoming message
ch.consume('requests', (msg) => {
const result = processRequest(msg.content)
ch.sendToQueue(msg.properties.replyTo, Buffer.from(result), {
correlationId: msg.properties.correlationId
})
ch.ack(msg)
})
AMQP's Built-in Support
AMQP natively supports correlationId and replyTo as message properties,
making it well-suited for request/reply over async messaging.
Technology Comparison
| Feature | Redis Pub/Sub | ZeroMQ PUB/SUB | RabbitMQ (AMQP) | Redis Streams |
|---|---|---|---|---|
| Architecture | Broker | Peer-to-peer | Broker | Broker |
| Persistence | No | No | Yes (durable) | Yes (append-only) |
| Delivery guarantee | At-most-once | At-most-once | At-least-once | At-least-once |
| Consumer groups | No | No | Competing consumers | Yes (XREADGROUP) |
| Replay | No | No | No | Yes |
| Complexity | Low | Low | Medium-High | Medium |
Mind Map
Connections
- Previous: Chapter 12 -- Scalability and Architectural Patterns
- Message broker integration pattern from Ch12's microservice section
- Command messages reference the Command pattern from Chapter 9
- Pub/Sub is distributed Observer pattern from Chapter 3