Documentation ¶
Index ¶
- type EventStore
- func (s *EventStore) Close(ctx context.Context) error
- func (s *EventStore) GetCheckpoint(ctx context.Context, watcherID string) (uint64, error)
- func (s *EventStore) GetEvents(ctx context.Context, params watcher.GetEventsRequest) (*watcher.GetEventsResponse, error)
- func (s *EventStore) GetLatestEventNum(ctx context.Context) (uint64, error)
- func (s *EventStore) StoreCheckpoint(ctx context.Context, watcherID string, eventSeqNum uint64) error
- func (s *EventStore) StoreEvent(ctx context.Context, operation watcher.Operation, objectType string, ...) error
- func (s *EventStore) StoreEventTx(tx *bbolt.Tx, operation watcher.Operation, objectType string, ...) error
- type EventStoreOption
- func WithCacheSize(size int) EventStoreOption
- func WithCheckpointBucket(name string) EventStoreOption
- func WithClock(clock clock.Clock) EventStoreOption
- func WithEventSerializer(serializer watcher.Serializer) EventStoreOption
- func WithEventsBucket(name string) EventStoreOption
- func WithGCAgeThreshold(threshold time.Duration) EventStoreOption
- func WithGCCadence(cadence time.Duration) EventStoreOption
- func WithGCMaxDuration(duration time.Duration) EventStoreOption
- func WithGCMaxRecordsPerRun(max int) EventStoreOption
- func WithLongPollingTimeout(timeout time.Duration) EventStoreOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore implements the EventStore interface using BoltDB as the underlying storage. It provides efficient storage and retrieval of events with support for caching, checkpointing, and garbage collection.
func NewEventStore ¶
func NewEventStore(db *bbolt.DB, opts ...EventStoreOption) (*EventStore, error)
NewEventStore creates a new EventStore with the given options.
It initializes the BoltDB buckets, sets up caching, and starts the garbage collection process. The store uses two buckets: one for events and another for checkpoints.
Example usage:
db, err := bbolt.Open("events.db", 0600, nil) if err != nil { // handle error } defer db.Close() store, err := NewEventStore( db, WithEventsBucket("myEvents"), WithCheckpointBucket("myCheckpoints"), WithEventSerializer(NewJSONSerializer()), ) if err != nil { // handle error }
func (*EventStore) Close ¶
func (s *EventStore) Close(ctx context.Context) error
Close stops the garbage collection process and purges the cache.
func (*EventStore) GetCheckpoint ¶
GetCheckpoint retrieves the checkpoint for a specific watcher. If the checkpoint is not found, it returns a CheckpointError.
func (*EventStore) GetEvents ¶
func (s *EventStore) GetEvents(ctx context.Context, params watcher.GetEventsRequest) (*watcher.GetEventsResponse, error)
GetEvents retrieves events from the store based on the provided query parameters. It supports long-polling: if no events are immediately available, it waits for new events or until the long-polling timeout is reached.
func (*EventStore) GetLatestEventNum ¶
func (s *EventStore) GetLatestEventNum(ctx context.Context) (uint64, error)
GetLatestEventNum retrieves the sequence number of the latest event in the store.
func (*EventStore) StoreCheckpoint ¶
func (s *EventStore) StoreCheckpoint(ctx context.Context, watcherID string, eventSeqNum uint64) error
StoreCheckpoint stores a checkpoint for a specific watcher. Checkpoints are used to track which events have been processed by each watcher.
func (*EventStore) StoreEvent ¶
func (s *EventStore) StoreEvent(ctx context.Context, operation watcher.Operation, objectType string, object interface{}) error
StoreEvent stores a new event in the EventStore. It wraps the storage operation in a BoltDB transaction.
func (*EventStore) StoreEventTx ¶
func (s *EventStore) StoreEventTx(tx *bbolt.Tx, operation watcher.Operation, objectType string, object interface{}) error
StoreEventTx stores a new event within an existing BoltDB transaction. It serializes the event, stores it in the database, adds it to the cache, and notifies watchers of the new event.
type EventStoreOption ¶
type EventStoreOption func(*eventStoreOptions)
EventStoreOption is a function type for configuring an EventStore. It allows for a flexible and extensible way to set options.
func WithCacheSize ¶
func WithCacheSize(size int) EventStoreOption
WithCacheSize sets the size of the LRU cache used to store events. A larger cache can improve performance but uses more memory.
func WithCheckpointBucket ¶
func WithCheckpointBucket(name string) EventStoreOption
WithCheckpointBucket sets the name of the bucket used to store checkpoints.
func WithClock ¶
func WithClock(clock clock.Clock) EventStoreOption
WithClock sets the clock used for time-based operations. This is useful for testing to provide a mockable clock.
func WithEventSerializer ¶
func WithEventSerializer(serializer watcher.Serializer) EventStoreOption
WithEventSerializer sets the serializer used for events. This allows for custom serialization formats if needed.
func WithEventsBucket ¶
func WithEventsBucket(name string) EventStoreOption
WithEventsBucket sets the name of the bucket used to store events.
func WithGCAgeThreshold ¶
func WithGCAgeThreshold(threshold time.Duration) EventStoreOption
WithGCAgeThreshold sets the age threshold for event pruning. Events older than this will be considered for pruning during garbage collection.
func WithGCCadence ¶
func WithGCCadence(cadence time.Duration) EventStoreOption
WithGCCadence sets the interval at which garbage collection runs. More frequent GC can keep the database size down but may impact performance.
func WithGCMaxDuration ¶
func WithGCMaxDuration(duration time.Duration) EventStoreOption
WithGCMaxDuration sets the maximum duration for a single GC run. This provides another way to limit GC operations to avoid long-running transactions.
func WithGCMaxRecordsPerRun ¶
func WithGCMaxRecordsPerRun(max int) EventStoreOption
WithGCMaxRecordsPerRun sets the maximum number of records to process in a single GC run. This helps limit the duration of GC operations to avoid long-running transactions.
func WithLongPollingTimeout ¶
func WithLongPollingTimeout(timeout time.Duration) EventStoreOption
WithLongPollingTimeout sets the timeout duration for long-polling requests. This determines how long a client will wait for new events before the request times out.