Class InMemoryOutboxStore

java.lang.Object
io.outbox.testing.InMemoryOutboxStore
All Implemented Interfaces:
OutboxStore

public class InMemoryOutboxStore extends Object implements OutboxStore
In-memory OutboxStore for unit testing without JDBC.

All events are stored in a ConcurrentHashMap keyed by event ID. Supports all outbox store operations including poll, claim, mark transitions, and dead event management.

The Connection parameter is ignored in all methods since no database is involved.

Example


 var store = new InMemoryOutboxStore();
 var txContext = new StubTxContext();
 var writer = new DefaultOutboxWriter(txContext, store);
 writer.write(EventEnvelope.ofJson("OrderPlaced", "{}"));
 assertEquals(1, store.size());
 
  • Constructor Details

    • InMemoryOutboxStore

      public InMemoryOutboxStore()
  • Method Details

    • insertNew

      public void insertNew(Connection conn, EventEnvelope event)
      Description copied from interface: OutboxStore
      Inserts a new event with status NEW.
      Specified by:
      insertNew in interface OutboxStore
      Parameters:
      conn - the JDBC connection (typically within a transaction)
      event - the event envelope to persist
    • markDone

      public int markDone(Connection conn, String eventId)
      Description copied from interface: OutboxStore
      Marks an event as DONE (successfully processed).
      Specified by:
      markDone in interface OutboxStore
      Parameters:
      conn - the JDBC connection
      eventId - the event ID to update
      Returns:
      the number of rows updated (0 or 1)
    • markRetry

      public int markRetry(Connection conn, String eventId, Instant nextAt, String error)
      Description copied from interface: OutboxStore
      Marks an event for retry with a scheduled next-attempt time.

      Implementations must increment the event's attempts column as part of this operation. The dispatcher relies on the stored attempt count to determine when maxAttempts has been reached.

      Specified by:
      markRetry in interface OutboxStore
      Parameters:
      conn - the JDBC connection
      eventId - the event ID to update
      nextAt - earliest time for the next attempt
      error - error message from the failed attempt (may be null)
      Returns:
      the number of rows updated (0 or 1)
    • markDead

      public int markDead(Connection conn, String eventId, String error)
      Description copied from interface: OutboxStore
      Marks an event as DEAD (permanently failed, no more retries).
      Specified by:
      markDead in interface OutboxStore
      Parameters:
      conn - the JDBC connection
      eventId - the event ID to update
      error - error message describing the failure (may be null)
      Returns:
      the number of rows updated (0 or 1)
    • markDeferred

      public int markDeferred(Connection conn, String eventId, Instant nextAt)
      Description copied from interface: OutboxStore
      Marks an event as deferred (handler requested retry-after) without incrementing the attempt count or recording an error.

      This is used when a handler returns DispatchResult.RetryAfter to reschedule delivery without penalising the event's retry budget.

      Default implementation falls back to OutboxStore.markRetry(java.sql.Connection, java.lang.String, java.time.Instant, java.lang.String) which does increment attempts. JDBC implementations should override this with a proper implementation that preserves the attempt count.

      Specified by:
      markDeferred in interface OutboxStore
      Parameters:
      conn - the JDBC connection
      eventId - the event ID to update
      nextAt - earliest time for the next delivery attempt
      Returns:
      the number of rows updated (0 or 1)
    • pollPending

      public List<OutboxEvent> pollPending(Connection conn, Instant now, Duration skipRecent, int limit)
      Description copied from interface: OutboxStore
      Retrieves pending events eligible for processing (no locking).
      Specified by:
      pollPending in interface OutboxStore
      Parameters:
      conn - the JDBC connection
      now - current timestamp for evaluating retry delays
      skipRecent - duration to skip recently-created events (avoids racing with in-flight hot-path)
      limit - maximum number of events to return
      Returns:
      list of pending events, oldest first
    • claimPending

      public List<OutboxEvent> claimPending(Connection conn, String ownerId, Instant now, Instant lockExpiry, Duration skipRecent, int limit)
      Description copied from interface: OutboxStore
      Claims and returns pending events with owner-based locking for multi-instance deployments.

      Default falls back to OutboxStore.pollPending(java.sql.Connection, java.time.Instant, java.time.Duration, int) (no locking). Database-specific subclasses override this with row-level locking (e.g. FOR UPDATE SKIP LOCKED).

      Specified by:
      claimPending in interface OutboxStore
      Parameters:
      conn - the JDBC connection
      ownerId - unique identifier for the claiming poller instance
      now - current timestamp
      lockExpiry - timestamp before which existing claims are considered expired
      skipRecent - duration to skip recently-created events
      limit - maximum number of events to claim
      Returns:
      list of claimed events
    • queryDead

      public List<OutboxEvent> queryDead(Connection conn, String eventType, String aggregateType, int limit)
      Description copied from interface: OutboxStore
      Queries events in DEAD status with optional filters.
      Specified by:
      queryDead in interface OutboxStore
      Parameters:
      conn - the JDBC connection
      eventType - optional event type filter (null for all)
      aggregateType - optional aggregate type filter (null for all)
      limit - maximum number of events to return
      Returns:
      list of dead events, oldest first
    • replayDead

      public int replayDead(Connection conn, String eventId)
      Description copied from interface: OutboxStore
      Replays a DEAD event by resetting it to NEW status with zero attempts.

      Only events currently in DEAD status are affected. Returns 0 if the event does not exist or is not DEAD (idempotent).

      Specified by:
      replayDead in interface OutboxStore
      Parameters:
      conn - the JDBC connection
      eventId - the event ID to replay
      Returns:
      the number of rows updated (0 or 1)
    • countDead

      public int countDead(Connection conn, String eventType)
      Description copied from interface: OutboxStore
      Counts events in DEAD status, optionally filtered by event type.
      Specified by:
      countDead in interface OutboxStore
      Parameters:
      conn - the JDBC connection
      eventType - optional event type filter (null for all)
      Returns:
      the number of dead events matching the filter
    • size

      public int size()
      Returns the total number of stored events.
      Returns:
      event count
    • clear

      public void clear()
      Clears all stored events.
    • all

      public List<OutboxEvent> all()
      Returns all stored events as a list.
      Returns:
      list of all events
    • statusOf

      public EventStatus statusOf(String eventId)
      Returns the status of a specific event.
      Parameters:
      eventId - the event ID
      Returns:
      the event status, or null if not found