Most applications store data the same way: overwrite the current state and move on. A customer changes their address? Update the row. An order gets cancelled? Set the status column to “cancelled.” Simple, fast, and for the vast majority of systems, exactly right.
But every so often, someone asks a question that the database can’t answer. “What did this order look like last Thursday at 3 PM?” “How many times did this customer change their shipping address?” “Why was this item removed from the cart?” In a CRUD system, those answers are gone — overwritten the moment the next update landed.
Event sourcing is a different approach to persistence. Instead of storing the current state, you store every state-changing event as an immutable record. The current state becomes a derived value — something you compute by replaying the event history. It’s a powerful pattern, but one that comes with genuine complexity. Let’s look at how it works, when it makes sense, and how to implement it without painting yourself into a corner.
The Core Idea: Events as Source of Truth
In a traditional CRUD system, an order in your database might look like a single row with columns for status, total, shipping address, and so on. Each update destroys the previous state. In an event-sourced system, that same order is represented as a sequence of events:
OrderCreated— the order was placedItemAdded— a product was added to the orderItemRemoved— a product was removedShippingAddressUpdated— the customer changed the delivery addressOrderConfirmed— the order was submitted for processingOrderShipped— the order left the warehouse
Each event is an immutable fact that happened at a specific point in time. You never update or delete events. To determine the current state of the order, you replay all events in sequence and apply them to a blank slate — a process called rehydration.
Building Blocks: Aggregates, Event Stores, and Projections
An event-sourced system has three key components:
- Aggregate — The business entity (Order, Customer, Account) whose state evolves over time. The aggregate processes commands, emits events, and applies events to itself to rebuild state.
- Event Store — An append-only log that stores every event. This is your system of record. PostgreSQL, EventStoreDB (now Kurrent), or even Apache Kafka can serve as an event store.
- Projections — Read-optimized views built from the event stream. Since querying the event store directly is expensive (you’d have to replay events for every read), projections pre-compute the current state into queryable tables or materialized views.
This separation naturally leads to CQRS (Command Query Responsibility Segregation) — commands write events to the store, and queries read from projections. The two sides can be optimized independently.
A Practical Implementation in Python
Let’s build a minimal event-sourced order system. We’ll use PostgreSQL as the event store and Python for the domain logic.
Step 1: Define Events and the Aggregate
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import List
from enum import Enum
import uuid
# --- Events ---
@dataclass
class Event:
aggregate_id: str
timestamp: str
event_type: str
@dataclass
class OrderCreated(Event):
customer_id: str = ""
@dataclass
class ItemAdded(Event):
product_id: str = ""
quantity: int = 0
price: float = 0.0
@dataclass
class ItemRemoved(Event):
product_id: str = ""
@dataclass
class OrderConfirmed(Event):
pass
@dataclass
class OrderCancelled(Event):
reason: str = ""
# --- Aggregate ---
class OrderStatus(Enum):
DRAFT = "draft"
CONFIRMED = "confirmed"
CANCELLED = "cancelled"
class Order:
def __init__(self, aggregate_id: str):
self.aggregate_id = aggregate_id
self.version = 0
self.status = OrderStatus.DRAFT
self.customer_id = ""
self.items: dict = {} # product_id -> {quantity, price}
self._pending_events: List[Event] = []
@property
def total(self) -> float:
return sum(
item["quantity"] * item["price"]
for item in self.items.values()
)
# --- Command methods (produce events) ---
def create(self, customer_id: str):
event = OrderCreated(
aggregate_id=self.aggregate_id,
timestamp=datetime.now(timezone.utc).isoformat(),
event_type="OrderCreated",
customer_id=customer_id,
)
self._apply(event)
self._pending_events.append(event)
def add_item(self, product_id: str, quantity: int, price: float):
if self.status != OrderStatus.DRAFT:
raise ValueError("Can only add items to a draft order")
event = ItemAdded(
aggregate_id=self.aggregate_id,
timestamp=datetime.now(timezone.utc).isoformat(),
event_type="ItemAdded",
product_id=product_id,
quantity=quantity,
price=price,
)
self._apply(event)
self._pending_events.append(event)
def remove_item(self, product_id: str):
if product_id not in self.items:
raise ValueError(f"Product {product_id} not in order")
event = ItemRemoved(
aggregate_id=self.aggregate_id,
timestamp=datetime.now(timezone.utc).isoformat(),
event_type="ItemRemoved",
product_id=product_id,
)
self._apply(event)
self._pending_events.append(event)
def confirm(self):
if not self.items:
raise ValueError("Cannot confirm an empty order")
event = OrderConfirmed(
aggregate_id=self.aggregate_id,
timestamp=datetime.now(timezone.utc).isoformat(),
event_type="OrderConfirmed",
)
self._apply(event)
self._pending_events.append(event)
def cancel(self, reason: str = ""):
event = OrderCancelled(
aggregate_id=self.aggregate_id,
timestamp=datetime.now(timezone.utc).isoformat(),
event_type="OrderCancelled",
reason=reason,
)
self._apply(event)
self._pending_events.append(event)
# --- Apply events (mutate state) ---
def _apply(self, event: Event):
if isinstance(event, OrderCreated):
self.customer_id = event.customer_id
self.status = OrderStatus.DRAFT
elif isinstance(event, ItemAdded):
pid = event.product_id
if pid in self.items:
self.items[pid]["quantity"] += event.quantity
else:
self.items[pid] = {
"quantity": event.quantity,
"price": event.price,
}
elif isinstance(event, ItemRemoved):
self.items.pop(event.product_id, None)
elif isinstance(event, OrderConfirmed):
self.status = OrderStatus.CONFIRMED
elif isinstance(event, OrderCancelled):
self.status = OrderStatus.CANCELLED
self.version += 1
# --- Rebuild from event history ---
@classmethod
def from_events(cls, aggregate_id: str, events: List[Event]):
order = cls(aggregate_id)
for event in events:
order._apply(event)
order._pending_events = []
return order
def get_pending_events(self) -> List[Event]:
events = self._pending_events[:]
self._pending_events = []
return events
Step 2: The Event Store with PostgreSQL
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_data JSONB NOT NULL,
version BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (aggregate_id, version)
);
CREATE INDEX idx_events_aggregate ON events (aggregate_id, version);
The unique constraint on (aggregate_id, version) gives us optimistic concurrency for free. If two processes try to append events with the same version number, one of them will get a constraint violation — exactly the behavior you want to prevent lost updates.
import json
import psycopg
class PostgresEventStore:
def __init__(self, dsn: str):
self.conn = psycopg.connect(dsn)
def load_events(self, aggregate_id: str) -> list:
with self.conn.cursor() as cur:
cur.execute(
"SELECT event_type, event_data, version "
"FROM events WHERE aggregate_id = %s "
"ORDER BY version ASC",
(aggregate_id,),
)
rows = cur.fetchall()
return [self._deserialize(row) for row in rows]
def append(self, aggregate_id: str, events: list, expected_version: int):
"""Append events with optimistic concurrency check."""
with self.conn.cursor() as cur:
for i, event in enumerate(events):
version = expected_version + i + 1
cur.execute(
"INSERT INTO events "
"(aggregate_id, event_type, event_data, version) "
"VALUES (%s, %s, %s, %s)",
(
aggregate_id,
event.event_type,
json.dumps({
k: v for k, v in vars(event).items()
if k != "_pending_events"
}),
version,
),
)
self.conn.commit()
def _deserialize(self, row):
event_type, data, version = row
event_map = {
"OrderCreated": OrderCreated,
"ItemAdded": ItemAdded,
"ItemRemoved": ItemRemoved,
"OrderConfirmed": OrderConfirmed,
"OrderCancelled": OrderCancelled,
}
cls = event_map.get(event_type, Event)
data["aggregate_id"] = data.get("aggregate_id", "")
data["timestamp"] = data.get("timestamp", "")
data["event_type"] = event_type
return cls(**{k: v for k, v in data.items()
if k in cls.__dataclass_fields__})
Step 3: Building Projections for Reads
Replaying events on every read doesn’t scale. Instead, build a projection — a denormalized read model that gets updated whenever a new event is persisted.
CREATE TABLE order_summary (
order_id UUID PRIMARY KEY,
customer_id UUID,
status VARCHAR(20),
total NUMERIC(10, 2),
item_count INT,
updated_at TIMESTAMPTZ
);
class OrderProjection:
"""Keeps the order_summary table in sync with events."""
def __init__(self, dsn: str):
self.conn = psycopg.connect(dsn)
def handle(self, event: Event):
if isinstance(event, OrderCreated):
self.conn.execute(
"INSERT INTO order_summary "
"(order_id, customer_id, status, total, item_count, updated_at) "
"VALUES (%s, %s, 'draft', 0, 0, %s)",
(event.aggregate_id, event.customer_id, event.timestamp),
)
elif isinstance(event, ItemAdded):
self.conn.execute(
"UPDATE order_summary SET "
"total = total + %s, item_count = item_count + %s, "
"updated_at = %s WHERE order_id = %s",
(
event.quantity * event.price,
event.quantity,
event.timestamp,
event.aggregate_id,
),
)
elif isinstance(event, ItemRemoved):
# Look up the item's value from the event store
# to subtract it from the total
row = self.conn.execute(
"SELECT (event_data->>'quantity')::int "
"* (event_data->>'price')::numeric AS item_total "
"FROM events "
"WHERE aggregate_id = %s AND event_type = 'ItemAdded' "
"AND event_data->>'product_id' = %s "
"ORDER BY version DESC LIMIT 1",
(event.aggregate_id, event.product_id),
).fetchone()
item_total = float(row[0]) if row else 0
self.conn.execute(
"UPDATE order_summary SET "
"total = GREATEST(total - %s, 0), "
"item_count = GREATEST(item_count - 1, 0), "
"updated_at = %s WHERE order_id = %s",
(item_total, event.timestamp, event.aggregate_id),
)
elif isinstance(event, OrderConfirmed):
self.conn.execute(
"UPDATE order_summary SET status = 'confirmed', "
"updated_at = %s WHERE order_id = %s",
(event.timestamp, event.aggregate_id),
)
self.conn.commit()
Snapshots: Speeding Up Rehydration
For aggregates with long event histories (a customer account with thousands of events), replaying everything gets slow. The solution is snapshots — periodic saved copies of the aggregate’s current state. To rehydrate, you load the latest snapshot and only replay events that occurred after it.
CREATE TABLE snapshots (
aggregate_id UUID PRIMARY KEY,
version BIGINT NOT NULL,
state JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
class SnapshotStore:
def __init__(self, dsn: str):
self.conn = psycopg.connect(dsn)
def save(self, aggregate_id: str, version: int, state: dict):
self.conn.execute(
"INSERT INTO snapshots (aggregate_id, version, state) "
"VALUES (%s, %s, %s) "
"ON CONFLICT (aggregate_id) DO UPDATE "
"SET version = EXCLUDED.version, state = EXCLUDED.state",
(aggregate_id, version, json.dumps(state)),
)
self.conn.commit()
def load(self, aggregate_id: str):
cur = self.conn.execute(
"SELECT version, state FROM snapshots "
"WHERE aggregate_id = %s", (aggregate_id,)
)
row = cur.fetchone()
if row:
return row[0], json.loads(row[1])
return 0, None
A common strategy is to take a snapshot every 100 or 1,000 events. The rehydration logic becomes: load snapshot → load events with version > snapshot version → apply those events. This keeps read times bounded regardless of how many events an aggregate has accumulated.
When Event Sourcing Makes Sense (And When It Doesn’t)
Event sourcing is not a default architecture choice. It adds real complexity: event schema evolution, eventual consistency between the event store and projections, and a steeper learning curve for the team. Here’s when it genuinely shines:
- Audit trails are a hard requirement — Financial systems, healthcare records, and compliance-heavy domains where you need to prove exactly what happened and when.
- Temporal queries matter — “What was the state of this entity at this specific point in time?” is a question your system needs to answer regularly.
- Complex business logic with many state transitions — When the rules around how an entity can change are intricate, modeling those transitions as events makes the domain logic explicit and testable.
- You need reliable event publishing — Event sourcing solves the dual-write problem (writing to a database and publishing to a message broker) because the event store is the source of truth. Events are persisted and published atomically.
And here’s when you should think twice:
- Simple CRUD with no audit needs — If you’re building a blog CMS or a basic inventory tracker, event sourcing is overkill.
- Fast-paced prototyping — The upfront cost of defining events, building projections, and handling schema evolution slows down iteration speed.
- Team unfamiliarity — Event sourcing changes how everyone thinks about data. It’s not something to adopt halfway — the team needs to commit to the mental model shift.
Event Schema Evolution: The Hard Part Nobody Warns You About
Events are immutable, but your understanding of the domain isn’t. Eventually, you’ll need to change an event’s structure — add a field, rename something, split one event into two. Since you can’t go back and rewrite history, you need a strategy.
The most common approaches:
- Versioned events — Append a version suffix (
ItemAddedV2) and write an upgrader that transforms old events during rehydration. - Lazy migration via projections — Keep old events as-is and handle the transformation in the projection layer. New projections understand both old and new event formats.
- Weak schema (JSONB) — Store events as JSON with a flexible schema. New code simply checks for the presence of optional fields. This is the pragmatic choice for PostgreSQL’s JSONB-backed event stores.
The JSONB approach in PostgreSQL is particularly practical. Since events are stored as JSONB, you can add new fields without breaking old events — your deserialization code just uses .get() with sensible defaults for missing fields. The tradeoff is that you lose some type safety, but for most teams, this is a worthy exchange.
Wrapping Up
Event sourcing is a powerful pattern for systems where auditability, temporal queries, and reliable event publishing are first-class requirements. The core idea is simple — store what happened, not what is — but the implications ripple through your entire architecture: CQRS for separating reads, projections for query performance, snapshots for fast rehydration, and schema evolution strategies for long-lived events.
Start with a bounded context that genuinely benefits from the pattern — a financial ledger, an order management system, or a user permission model. Implement the event store, one aggregate, and one projection. Let the team get comfortable with the mental model before expanding further. And if you’re using PostgreSQL, you already have everything you need — no special database required.