In-process stream
createStreamDrain) — for subscribers running in the same Node process. For browser tabs, CLIs, or external devtools that connect over HTTP, see Stream server.createStreamDrain() exposes the events flowing through evlog as an in-process pub/sub. It's the primitive any local consumer can subscribe to without re-implementing a drain.
import { createStreamDrain } from 'evlog/stream'
const stream = createStreamDrain({ buffer: 200 })
// Register as a normal evlog drain (Nitro hook or plugin runner):
nitroApp.hooks.hook('evlog:drain', stream.drain)
Subscribing
Two consumption styles are supported.
Sync listener
const unsubscribe = stream.subscribe((event) => {
if (event.level === 'error') notifyOps(event)
})
// Later:
unsubscribe()
Listener errors are caught and logged — they never affect other subscribers or the drain.
Async iterator
for await (const event of stream.events()) {
console.log(event.timestamp, event.action ?? event.message)
if (shouldStop(event)) break // breaking cleanly unsubscribes
}
Each call to events() returns a fresh independent iterator. Past buffered events are not replayed; pair with stream.recent() to seed history.
Replay buffer
stream.recent() returns a defensive copy of the most recent events (oldest first). The default buffer holds 500 events; pass buffer: 0 to disable, or set it explicitly:
const stream = createStreamDrain({ buffer: 1000 })
const initial = stream.recent()
for (const past of initial) seedDashboard(past)
stream.subscribe(liveEvent => updateDashboard(liveEvent))
Backpressure
A slow async-iterator consumer never blocks the drain. Each iterator has a per-subscriber queue (default 1000); when it overflows, the oldest queued events are dropped and stream.droppedCount increments.
Filter
Events that fail the optional filter predicate are not buffered nor delivered:
const errors = createStreamDrain({
filter: event => event.level === 'error' || event.status >= 500,
})
Default singleton
When several pieces of code in the same process need to share a single stream — typically a framework integration that wires the drain on one side and the stream server on the other — use the singleton accessors:
import { getDefaultStream, setDefaultStream } from 'evlog/stream'
// Lazily creates a singleton on first call
const stream = getDefaultStream({ buffer: 500 })
// Reset (mostly useful in tests)
setDefaultStream(null)
The mini stream server uses this singleton internally, so anything draining into getDefaultStream() automatically reaches all SSE clients.
Going further
- Network bridge — expose this stream over HTTP for browser tabs / CLIs / external devtools. See the Stream server.
- Recipes — concrete consumer patterns (devtool, replay-then-live, aggregation). See Consumer recipes.
Overview
Live event observation, custom drains, plugins, custom enrichers, identity headers, framework integration — everything you can wire on top of the evlog pipeline.
Fanout
Send the same wide event to several destinations in parallel — Axiom plus Datadog plus Sentry plus the local fs drain — through a single pipeline.