Class InMemoryOutboxStore
- All Implemented Interfaces:
OutboxStore
OutboxStore for unit testing without JDBC.
All events are stored in a ConcurrentHashMap keyed by event ID.
Supports all outbox store operations including poll, claim, mark transitions,
and dead event management.
The Connection parameter is ignored in all methods since no
database is involved.
Example
var store = new InMemoryOutboxStore();
var txContext = new StubTxContext();
var writer = new DefaultOutboxWriter(txContext, store);
writer.write(EventEnvelope.ofJson("OrderPlaced", "{}"));
assertEquals(1, store.size());
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionall()Returns all stored events as a list.claimPending(Connection conn, String ownerId, Instant now, Instant lockExpiry, Duration skipRecent, int limit) Claims and returns pending events with owner-based locking for multi-instance deployments.voidclear()Clears all stored events.intcountDead(Connection conn, String eventType) Counts events in DEAD status, optionally filtered by event type.voidinsertNew(Connection conn, EventEnvelope event) Inserts a new event with status NEW.intmarkDead(Connection conn, String eventId, String error) Marks an event as DEAD (permanently failed, no more retries).intmarkDeferred(Connection conn, String eventId, Instant nextAt) Marks an event as deferred (handler requested retry-after) without incrementing the attempt count or recording an error.intmarkDone(Connection conn, String eventId) Marks an event as DONE (successfully processed).intmarkRetry(Connection conn, String eventId, Instant nextAt, String error) Marks an event for retry with a scheduled next-attempt time.pollPending(Connection conn, Instant now, Duration skipRecent, int limit) Retrieves pending events eligible for processing (no locking).queryDead(Connection conn, String eventType, String aggregateType, int limit) Queries events in DEAD status with optional filters.intreplayDead(Connection conn, String eventId) Replays a DEAD event by resetting it to NEW status with zero attempts.intsize()Returns the total number of stored events.Returns the status of a specific event.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface io.outbox.spi.OutboxStore
insertBatch
-
Constructor Details
-
InMemoryOutboxStore
public InMemoryOutboxStore()
-
-
Method Details
-
insertNew
Description copied from interface:OutboxStoreInserts a new event with status NEW.- Specified by:
insertNewin interfaceOutboxStore- Parameters:
conn- the JDBC connection (typically within a transaction)event- the event envelope to persist
-
markDone
Description copied from interface:OutboxStoreMarks an event as DONE (successfully processed).- Specified by:
markDonein interfaceOutboxStore- Parameters:
conn- the JDBC connectioneventId- the event ID to update- Returns:
- the number of rows updated (0 or 1)
-
markRetry
Description copied from interface:OutboxStoreMarks an event for retry with a scheduled next-attempt time.Implementations must increment the event's
attemptscolumn as part of this operation. The dispatcher relies on the stored attempt count to determine whenmaxAttemptshas been reached.- Specified by:
markRetryin interfaceOutboxStore- Parameters:
conn- the JDBC connectioneventId- the event ID to updatenextAt- earliest time for the next attempterror- error message from the failed attempt (may benull)- Returns:
- the number of rows updated (0 or 1)
-
markDead
Description copied from interface:OutboxStoreMarks an event as DEAD (permanently failed, no more retries).- Specified by:
markDeadin interfaceOutboxStore- Parameters:
conn- the JDBC connectioneventId- the event ID to updateerror- error message describing the failure (may benull)- Returns:
- the number of rows updated (0 or 1)
-
markDeferred
Description copied from interface:OutboxStoreMarks an event as deferred (handler requested retry-after) without incrementing the attempt count or recording an error.This is used when a handler returns
DispatchResult.RetryAfterto reschedule delivery without penalising the event's retry budget.Default implementation falls back to
OutboxStore.markRetry(java.sql.Connection, java.lang.String, java.time.Instant, java.lang.String)which does increment attempts. JDBC implementations should override this with a proper implementation that preserves the attempt count.- Specified by:
markDeferredin interfaceOutboxStore- Parameters:
conn- the JDBC connectioneventId- the event ID to updatenextAt- earliest time for the next delivery attempt- Returns:
- the number of rows updated (0 or 1)
-
pollPending
Description copied from interface:OutboxStoreRetrieves pending events eligible for processing (no locking).- Specified by:
pollPendingin interfaceOutboxStore- Parameters:
conn- the JDBC connectionnow- current timestamp for evaluating retry delaysskipRecent- duration to skip recently-created events (avoids racing with in-flight hot-path)limit- maximum number of events to return- Returns:
- list of pending events, oldest first
-
claimPending
public List<OutboxEvent> claimPending(Connection conn, String ownerId, Instant now, Instant lockExpiry, Duration skipRecent, int limit) Description copied from interface:OutboxStoreClaims and returns pending events with owner-based locking for multi-instance deployments.Default falls back to
OutboxStore.pollPending(java.sql.Connection, java.time.Instant, java.time.Duration, int)(no locking). Database-specific subclasses override this with row-level locking (e.g.FOR UPDATE SKIP LOCKED).- Specified by:
claimPendingin interfaceOutboxStore- Parameters:
conn- the JDBC connectionownerId- unique identifier for the claiming poller instancenow- current timestamplockExpiry- timestamp before which existing claims are considered expiredskipRecent- duration to skip recently-created eventslimit- maximum number of events to claim- Returns:
- list of claimed events
-
queryDead
public List<OutboxEvent> queryDead(Connection conn, String eventType, String aggregateType, int limit) Description copied from interface:OutboxStoreQueries events in DEAD status with optional filters.- Specified by:
queryDeadin interfaceOutboxStore- Parameters:
conn- the JDBC connectioneventType- optional event type filter (nullfor all)aggregateType- optional aggregate type filter (nullfor all)limit- maximum number of events to return- Returns:
- list of dead events, oldest first
-
replayDead
Description copied from interface:OutboxStoreReplays a DEAD event by resetting it to NEW status with zero attempts.Only events currently in DEAD status are affected. Returns 0 if the event does not exist or is not DEAD (idempotent).
- Specified by:
replayDeadin interfaceOutboxStore- Parameters:
conn- the JDBC connectioneventId- the event ID to replay- Returns:
- the number of rows updated (0 or 1)
-
countDead
Description copied from interface:OutboxStoreCounts events in DEAD status, optionally filtered by event type.- Specified by:
countDeadin interfaceOutboxStore- Parameters:
conn- the JDBC connectioneventType- optional event type filter (nullfor all)- Returns:
- the number of dead events matching the filter
-
size
public int size()Returns the total number of stored events.- Returns:
- event count
-
clear
public void clear()Clears all stored events. -
all
Returns all stored events as a list.- Returns:
- list of all events
-
statusOf
Returns the status of a specific event.- Parameters:
eventId- the event ID- Returns:
- the event status, or
nullif not found
-