The Transactional Outbox Pattern in Go: Building Reliable Event-Driven Microservices

If you’ve ever built a microservice that needs to both write to a database and publish an event, you’ve hit the dual-write problem. The database transaction commits—but the message broker call fails. Or vice versa. Now your system is in an inconsistent state, and debugging it at 2 AM is nobody’s idea of fun.

The Transactional Outbox Pattern is the industry-standard solution. Instead of writing to the DB and publishing to a message broker as two separate operations, you write the event to an outbox table in the same database transaction. A separate process then reads the outbox and publishes the events. Simple in theory—but the implementation details matter.

In this post, I’ll walk through a production-grade Go implementation using PostgreSQL as both the application database and the outbox store, with a polling-based relay that publishes to NATS JetStream.

The Dual-Write Problem

Here’s the naive approach that will bite you:

// DON'T DO THIS - dual write problem
func CreateOrder(ctx context.Context, order Order) error {
    // Step 1: Save to database
    if err := db.SaveOrder(ctx, &order); err != nil {
        return err
    }
    // Step 2: Publish event - what if this fails?
    // Order is saved but nobody knows about it!
    return nats.Publish("order.created", order)
}

If the NATS publish fails after the DB commit, you’ve lost an event. If you retry and the DB write had actually failed silently, you’ve got a phantom event. Neither scenario is acceptable in production.

The Outbox Table

First, define the outbox table in your PostgreSQL schema:

CREATE TABLE outbox_events (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_id TEXT NOT NULL,
    event_type  TEXT NOT NULL,
    payload     JSONB NOT NULL,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT now(),
    published   BOOLEAN NOT NULL DEFAULT FALSE,
    published_at TIMESTAMPTZ
);

CREATE INDEX idx_outbox_unpublished ON outbox_events (created_at)
    WHERE published = FALSE;

The partial index on WHERE published = FALSE is crucial—it keeps the index small since published events are excluded, making polling fast even with millions of rows.

Writing to the Outbox Atomically

The key insight: write the event to the outbox in the same transaction as your business data. Both succeed or both roll back:

func (s *OrderService) CreateOrder(ctx context.Context, req CreateOrderRequest) (string, error) {
    orderID := uuid.New().String()
    eventID := uuid.New().String()

    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return "", fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    // Step 1: Insert the order
    _, err = tx.ExecContext(ctx,
        `INSERT INTO orders (id, customer_id, total, status)
         VALUES ($1, $2, $3, 'pending')`,
        orderID, req.CustomerID, req.Total)
    if err != nil {
        return "", fmt.Errorf("insert order: %w", err)
    }

    // Step 2: Insert the outbox event in the SAME transaction
    payload, _ := json.Marshal(OrderCreatedEvent{
        OrderID:    orderID,
        CustomerID: req.CustomerID,
        Total:      req.Total,
    })
    _, err = tx.ExecContext(ctx,
        `INSERT INTO outbox_events (id, aggregate_id, event_type, payload)
         VALUES ($1, $2, 'order.created', $3)`,
        eventID, orderID, payload)
    if err != nil {
        return "", fmt.Errorf("insert outbox: %w", err)
    }

    if err := tx.Commit(); err != nil {
        return "", fmt.Errorf("commit: %w", err)
    }

    return orderID, nil
}

The Outbox Relay

The relay is a background goroutine that polls the outbox table and publishes unpublished events. Here’s a production-grade implementation with proper concurrency control:

type OutboxRelay struct {
    db        *sql.DB
    nc        *nats.Conn
    logger    *slog.Logger
    batch     int
    interval  time.Duration
}

func (r *OutboxRelay) Run(ctx context.Context) error {
    ticker := time.NewTicker(r.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox relay error", "error", err)
            }
        }
    }
}

func (r *OutboxRelay) processBatch(ctx context.Context) error {
    tx, err := r.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // Fetch unpublished events with FOR UPDATE SKIP LOCKED
    // This prevents concurrent relays from picking the same rows
    rows, err := tx.QueryContext(ctx,
        `SELECT id, aggregate_id, event_type, payload
         FROM outbox_events
         WHERE published = FALSE
         ORDER BY created_at ASC
         LIMIT $1
         FOR UPDATE SKIP LOCKED`, r.batch)
    if err != nil {
        return fmt.Errorf("query outbox: %w", err)
    }
    defer rows.Close()

    var events []OutboxEvent
    for rows.Next() {
        var e OutboxEvent
        if err := rows.Scan(&e.ID, &e.AggregateID, &e.EventType, &e.Payload); err != nil {
            return err
        }
        events = append(events, e)
    }

    if len(events) == 0 {
        return nil
    }

    // Publish each event to NATS
    for _, e := range events {
        subject := fmt.Sprintf("events.%s.%s", e.AggregateID, e.EventType)
        if err := r.nc.Publish(subject, e.Payload); err != nil {
            return fmt.Errorf("publish %s: %w", e.ID, err)
        }

        // Mark as published in the same transaction
        _, err := tx.ExecContext(ctx,
            `UPDATE outbox_events
             SET published = TRUE, published_at = now()
             WHERE id = $1`, e.ID)
        if err != nil {
            return fmt.Errorf("mark published %s: %w", e.ID, err)
        }
    }

    return tx.Commit()
}

Why FOR UPDATE SKIP LOCKED Matters

If you run multiple instances of your service (and you should for HA), you need to prevent two relays from publishing the same event. FOR UPDATE SKIP LOCKED is PostgreSQL’s built-in solution—it locks the selected rows and skips any rows already locked by another transaction. No external coordination needed.

This is simpler than using advisory locks or a separate coordination service, and it works reliably at scale. Each relay gets its own disjoint set of events to process.

Handling Failures: At-Least-Once Delivery

This implementation provides at-least-once delivery. If the relay crashes after publishing but before committing, the event will be republished on the next poll. Your consumers must be idempotent:

// Consumer with idempotency check
func (c *OrderConsumer) HandleOrderCreated(ctx context.Context, msg OrderCreatedEvent) error {
    // Check if already processed (use the event ID as dedup key)
    processed, err := c.cache.IsProcessed(ctx, msg.EventID)
    if err != nil {
        return err
    }
    if processed {
        c.logger.Info("skipping duplicate event", "event_id", msg.EventID)
        return nil
    }

    // Process the event
    if err := c.processOrder(ctx, msg); err != nil {
        return err
    }

    // Mark as processed
    return c.cache.MarkProcessed(ctx, msg.EventID, 24*time.Hour)
}

When to Use (and Not Use) This Pattern

Use the outbox pattern when:

  • You need strong consistency between DB writes and event publishing
  • You’re building event-driven microservices with eventual consistency between services
  • You can tolerate at-least-once delivery (with idempotent consumers)
  • Your event volume fits within polling throughput (up to ~10K events/sec with tuned polling)

Consider alternatives when:

  • You need exactly-once semantics—look at Kafka with transactional consumers
  • Your latency requirements can’t tolerate the polling interval—use PostgreSQL logical replication (CDC) with Debezium instead
  • You’re already running a streaming platform like Kafka—use Kafka transactions directly

Cleanup: Don’t Let the Outbox Grow Forever

// Run as a daily cleanup job
func (r *OutboxRelay) Cleanup(ctx context.Context, retention time.Duration) error {
    cutoff := time.Now().Add(-retention)
    result, err := r.db.ExecContext(ctx,
        `DELETE FROM outbox_events
         WHERE published = TRUE AND published_at < $1`, cutoff)
    if err != nil {
        return err
    }
    deleted, _ := result.RowsAffected()
    r.logger.Info("outbox cleanup", "deleted", deleted)
    return nil
}

Wrapping Up

The Transactional Outbox Pattern is one of those things that seems obvious once you see it, but the implementation details—atomic transactions, FOR UPDATE SKIP LOCKED, idempotent consumers, and cleanup—are what separate a prototype from production-grade code. If you’re building event-driven services in Go and not using this pattern, you’re almost certainly losing events under failure conditions.

The full code from this post is available on my GitHub. For the CDC-based alternative using PostgreSQL logical replication, check out Debezium—it’s the next evolution when polling throughput becomes a bottleneck.

Leave a Reply

Your email address will not be published. Required fields are marked *