Drain Pipeline
In production, sending one HTTP request per log event is wasteful. The drain pipeline buffers events and sends them in batches, retries on transient failures, and drops the oldest events when the buffer overflows.
Add the drain pipeline (batch + retry + fan-out)
Quick Start
The pipeline wraps any drain. The wiring depends on your framework — pick the tab that matches yours; every other example below uses the same shape.
// server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
export default defineNitroPlugin((nitroApp) => {
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())
nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
// lib/evlog.ts
import type { DrainContext } from 'evlog'
import { createEvlog } from 'evlog/next'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())
export const { withEvlog, useLogger, log, createError } = createEvlog({
service: 'my-app',
drain,
})
// Flush before shutdown (e.g. from your custom server or a teardown hook)
export const flushEvlog = () => drain.flush()
// Same pattern — pass `drain` to the framework's evlog middleware/module
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())
app.use(evlog({ drain })) // Hono / Express / Elysia
// await app.register(evlog, { drain }) // Fastify
// EvlogModule.forRoot({ drain }) // NestJS
// Flush on shutdown
process.on('SIGTERM', () => drain.flush())
// index.ts — plain TypeScript / Bun / Node script
import type { DrainContext } from 'evlog'
import { initLogger } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())
initLogger({ drain })
// Flush before exit
await drain.flush()
drain.flush()). On Nitro use the close hook; on standalone scripts call it before process.exit; on serverless runtimes use waitUntil(drain.flush()).How It Works
Events are buffered as they arrive on evlog:drain. A batch flushes when either batch.size is reached or batch.intervalMs expires (whichever comes first). On failure, the same batch is retried with the configured backoff; once retry.maxAttempts is exhausted, onDropped is called with the lost events. The buffer is bounded by maxBufferSize — once full, the oldest events are dropped to keep memory flat.
Configuration
The options below apply to any framework — wire the resulting drain the same way you did in Quick Start.
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
const pipeline = createDrainPipeline<DrainContext>({
batch: {
size: 50, // Flush every 50 events
intervalMs: 5000, // Or every 5 seconds, whichever comes first
},
retry: {
maxAttempts: 3,
backoff: 'exponential',
initialDelayMs: 1000,
maxDelayMs: 30000,
},
maxBufferSize: 1000,
onDropped: (events, error) => {
console.error(`[evlog] Dropped ${events.length} events:`, error?.message)
},
})
export const drain = pipeline(createAxiomDrain())
// Then wire `drain` to your framework — see Quick Start above.
Options Reference
| Option | Default | Description |
|---|---|---|
batch.size | 50 | Maximum events per batch |
batch.intervalMs | 5000 | Max time (ms) before flushing a partial batch |
retry.maxAttempts | 3 | Total attempts including the initial one |
retry.backoff | 'exponential' | 'exponential' | 'linear' | 'fixed' |
retry.initialDelayMs | 1000 | Base delay for the first retry |
retry.maxDelayMs | 30000 | Upper bound for any retry delay |
maxBufferSize | 1000 | Max buffered events before dropping oldest |
onDropped | - | Callback when events are dropped (overflow or retry exhaustion) |
Backoff Strategies
| Strategy | Delay Pattern | Use Case |
|---|---|---|
exponential | 1s, 2s, 4s, 8s... | Default. Best for transient failures that may need time to recover |
linear | 1s, 2s, 3s, 4s... | Predictable delay growth |
fixed | 1s, 1s, 1s, 1s... | Same delay every time. Useful for rate-limited APIs |
Returned Drain Function
The function returned by pipeline(drain) is hook-compatible and exposes:
| Property | Type | Description |
|---|---|---|
drain(ctx) | (ctx: T) => void | Push a single event into the buffer |
drain.flush() | () => Promise<void> | Force-flush all buffered events |
drain.pending | number | Number of events currently buffered |
Multiple Destinations
Wrap multiple adapters with a single pipeline (one batch flushed in parallel to every destination):
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
import { createOTLPDrain } from 'evlog/otlp'
const axiom = createAxiomDrain()
const otlp = createOTLPDrain()
const pipeline = createDrainPipeline<DrainContext>()
export const drain = pipeline(async (batch) => {
await Promise.allSettled([axiom(batch), otlp(batch)])
})
// Wire `drain` exactly like in Quick Start — Nitro hook, framework middleware, or initLogger.
Custom Drain Function
You don't need an adapter. Pass any async function that accepts a batch:
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
const pipeline = createDrainPipeline<DrainContext>({ batch: { size: 100 } })
export const drain = pipeline(async (batch) => {
await fetch('https://your-service.com/logs', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(batch.map(ctx => ctx.event)),
})
})
// Wire `drain` to your framework — see Quick Start above.
Next Steps
- Adapters Overview - Available built-in adapters
- Custom Adapters - Build your own drain function
- Best Practices - Security and production tips
NuxtHub
Self-hosted log retention for evlog using NuxtHub database storage. Store, query, and automatically clean up your structured logs with zero external dependencies.
HTTP
Framework-agnostic HTTP log transport for sending client-side logs to your server via fetch or sendBeacon. Works in the browser or any environment with fetch. Use the `evlog/http` entry point.