Build on top

Fanout and multi-drain

Send the same wide event to several destinations in parallel — Axiom plus Datadog plus Sentry plus the local fs drain — through a single pipeline.

In production, the same wide event often needs to reach more than one destination: a long-term store (Axiom), a metrics tool (Datadog), an error tracker (Sentry), and a local fs drain for incident replay. evlog's drain pipeline accepts multiple drains in one composition — the same batch is fanned out to all of them in parallel.

The recipe

import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
import { createDatadogDrain } from 'evlog/datadog'
import { createSentryDrain } from 'evlog/sentry'
import { createFsDrain } from 'evlog/fs'
import type { DrainContext } from 'evlog'

const pipeline = createDrainPipeline<DrainContext>({
  batch: { size: 50, intervalMs: 2000 },
  retry: { maxAttempts: 3 },
  bufferSize: 1000,
})

export const drain = pipeline(
  createAxiomDrain(),
  createDatadogDrain(),
  createSentryDrain({ minLevel: 'error' }),  // Sentry only gets errors
  createFsDrain({ dir: '.evlog/logs', maxFiles: 14 }),
)

Then register drain wherever your framework integration takes a drain — nitroApp.hooks.hook('evlog:drain', drain) for Nitro, initLogger({ drain }) for Next.js / standalone, etc.

What you get

  • Parallel dispatch — every batch hits all four destinations concurrently
  • Independent failures — if Datadog's API is down, Axiom and Sentry still receive events; the failed batch is retried for Datadog only
  • Shared backpressure — the buffer is sized once for the whole pipeline; if any drain falls behind, the oldest events drop together (consistent across destinations)

Per-drain filtering

Wrap a drain to filter events before they reach it:

function onlyErrors(inner: typeof createSentryDrain): typeof createSentryDrain {
  return (opts) => {
    const wrapped = inner(opts)
    return async (ctx) => {
      const filtered = (Array.isArray(ctx) ? ctx : [ctx]).filter(c => c.event?.level === 'error')
      if (filtered.length > 0) await wrapped(filtered as never)
    }
  }
}

export const drain = pipeline(
  createAxiomDrain(),                                     // all events
  onlyErrors(createSentryDrain)({ dsn: process.env.SENTRY_DSN! }),  // errors only
)

Most built-in drains expose minLevel directly, so you only need this pattern for non-level filters (path, custom field, etc.).

What not to do

  • Don't create one pipeline per drain — defeats the batch / retry sharing
  • Don't forget drain.flush() on shutdown — events buffered for fanout are lost on abrupt exit
  • Don't fan out to a serverless-incompatible target without checking — the stream server reaches every connected client through the in-process stream; it's not a drain

Going further

  • Drain pipeline — the full reference for createDrainPipeline() options
  • Custom drains — fanout works with any drain, including ones you write