README
¶
Watcher Library
Overview
The Watcher Library is an internal component of the Bacalhau project that provides a robust event watching and processing system. It's designed to efficiently store, retrieve, and process events. The library ensures events are stored in a durable, ordered manner, allowing for consistent and reliable event processing. It supports features like checkpointing, filtering, and long-polling, while maintaining the ability to replay events from any point in the event history.
Key Features
- Ordered Event Processing: Events are processed in the exact order they were created, ensuring consistency and predictability in event handling.
- Durability: Events are stored persistently in BoltDB, ensuring they survive system restarts or crashes.
- Replayability: The system allows replaying events from any point in history, facilitating data recovery, debugging, and system reconciliation.
- Concurrency: Multiple watchers can process events concurrently, improving system throughput.
- Filtering: Watchers can filter events based on object types and operations, allowing for targeted event processing.
- Checkpointing: Watchers can save their progress and resume from where they left off, enhancing reliability and efficiency.
- Long-polling: Efficient event retrieval with support for long-polling, reducing unnecessary network traffic and database queries.
- Garbage Collection: Automatic cleanup of old events to manage storage while maintaining the ability to replay from critical points.
- Flexible Event Iteration: Different types of iterators for various use cases, including the ability to start from the oldest event, the latest event, or any specific point in the event history.
Key Components
- Registry: Manages multiple watchers and provides methods to create and manage watchers.
- Watcher: Represents a single event watcher that processes events sequentially.
- EventStore: Responsible for storing and retrieving events, with BoltDB as the default implementation.
- EventHandler: Interface for handling individual events.
- Serializer: Handles the serialization and deserialization of events.
Core Concepts
Event
An Event
represents a single occurrence in the system. It has the following properties:
SeqNum
: A unique, sequential identifier for the event.Operation
: The type of operation (Create, Update, Delete).ObjectType
: The type of object the event relates to.Object
: The actual data associated with the event.Timestamp
: When the event occurred.
EventStore
The EventStore
is responsible for persisting events and providing methods to retrieve them. It uses BoltDB as the underlying storage engine and supports features like caching, checkpointing, and garbage collection.
Registry
The Registry
manages multiple watchers. It's the main entry point for components that want to subscribe to events.
Watcher
A Watcher
represents a single subscriber to events. It processes events sequentially and can be configured with filters and checkpoints.
EventIterator
An EventIterator
defines the starting position for reading events. There are four types of iterators:
- TrimHorizonIterator: Starts from the oldest available event.
- LatestIterator: Starts from the latest available event.
- AtSequenceNumberIterator: Starts at a specific sequence number.
- AfterSequenceNumberIterator: Starts after a specific sequence number.
Usage
Here's how you typically use the Watcher library within Bacalhau:
- Create an EventStore:
db, _ := bbolt.Open("events.db", 0600, nil)
store, _ := boltdb.NewEventStore(db)
- Create a Registry:
registry := watcher.NewRegistry(store)
- Implement an EventHandler:
type MyHandler struct{}
func (h *MyHandler) HandleEvent(ctx context.Context, event watcher.Event) error {
// Process the event
return nil
}
- Start watching for events:
watcher, _ := registry.Watch(ctx, "my-watcher", &MyHandler{},
watcher.WithFilter(watcher.EventFilter{
ObjectTypes: []string{"Job", "Execution"},
Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
}),
)
- Store events:
store.StoreEvent(ctx, watcher.OperationCreate, "Job", jobData)
Configuration
Watch Configuration
When creating a watcher, you can configure it with various options:
WithInitialEventIterator(iterator EventIterator)
: Sets the starting position for watching if no checkpoint is found.WithFilter(filter EventFilter)
: Sets the event filter for watching.WithBufferSize(size int)
: Sets the size of the event buffer.WithBatchSize(size int)
: Sets the number of events to fetch in each batch.WithInitialBackoff(backoff time.Duration)
: Sets the initial backoff duration for retries.WithMaxBackoff(backoff time.Duration)
: Sets the maximum backoff duration for retries.WithMaxRetries(maxRetries int)
: Sets the maximum number of retries for event handling.WithRetryStrategy(strategy RetryStrategy)
: Sets the retry strategy for event handling.
Example:
watcher, err := registry.Watch(ctx, "my-watcher", &MyHandler{},
watcher.WithInitialEventIterator(watcher.TrimHorizonIterator()),
watcher.WithFilter(watcher.EventFilter{
ObjectTypes: []string{"Job", "Execution"},
Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
}),
watcher.WithBufferSize(1000),
watcher.WithBatchSize(100),
watcher.WithMaxRetries(3),
watcher.WithRetryStrategy(watcher.RetryStrategyBlock),
)
EventStore Configuration (BoltDB)
The BoltDB EventStore can be configured with various options:
WithEventsBucket(name string)
: Sets the name of the bucket used to store events.WithCheckpointBucket(name string)
: Sets the name of the bucket used to store checkpoints.WithEventSerializer(serializer watcher.Serializer)
: Sets the serializer used for events.WithCacheSize(size int)
: Sets the size of the LRU cache used to store events.WithLongPollingTimeout(timeout time.Duration)
: Sets the timeout duration for long-polling requests.WithGCAgeThreshold(threshold time.Duration)
: Sets the age threshold for event pruning.WithGCCadence(cadence time.Duration)
: Sets the interval at which garbage collection runs.WithGCMaxRecordsPerRun(max int)
: Sets the maximum number of records to process in a single GC run.WithGCMaxDuration(duration time.Duration)
: Sets the maximum duration for a single GC run.
Example:
store, err := boltdb.NewEventStore(db,
boltdb.WithEventsBucket("myEvents"),
boltdb.WithCheckpointBucket("myCheckpoints"),
boltdb.WithCacheSize(1000),
boltdb.WithLongPollingTimeout(10*time.Second),
)
Best Practices
- Use meaningful watcher IDs to easily identify different components subscribing to events.
- Implement error handling in your
EventHandler
to ensure robust event processing. - Use appropriate filters to minimize unnecessary event processing.
- Regularly checkpoint your watchers to enable efficient restarts.
- Monitor watcher stats to ensure they're keeping up with event volume.
Troubleshooting
- If a watcher is falling behind, consider increasing the batch size or optimizing the event handling logic.
- For performance issues, check the BoltDB file size and consider tuning the garbage collection parameters.
Future Improvements
- Enhanced monitoring and metrics.
Documentation
¶
Overview ¶
Package watcher is a generated GoMock package.
Index ¶
- Variables
- type CheckpointError
- type DeserializationError
- type Event
- type EventFilter
- type EventHandler
- type EventHandlingError
- type EventIterator
- type EventIteratorType
- type EventStore
- type GetEventsRequest
- type GetEventsResponse
- type JSONSerializer
- type MockEventHandler
- type MockEventHandlerMockRecorder
- type MockEventStore
- func (m *MockEventStore) Close(ctx context.Context) error
- func (m *MockEventStore) EXPECT() *MockEventStoreMockRecorder
- func (m *MockEventStore) GetCheckpoint(ctx context.Context, watcherID string) (uint64, error)
- func (m *MockEventStore) GetEvents(ctx context.Context, params GetEventsRequest) (*GetEventsResponse, error)
- func (m *MockEventStore) GetLatestEventNum(ctx context.Context) (uint64, error)
- func (m *MockEventStore) StoreCheckpoint(ctx context.Context, watcherID string, eventSeqNum uint64) error
- func (m *MockEventStore) StoreEvent(ctx context.Context, operation Operation, objectType string, ...) error
- type MockEventStoreMockRecorder
- func (mr *MockEventStoreMockRecorder) Close(ctx interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) GetCheckpoint(ctx, watcherID interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) GetEvents(ctx, params interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) GetLatestEventNum(ctx interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) StoreCheckpoint(ctx, watcherID, eventSeqNum interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) StoreEvent(ctx, operation, objectType, object interface{}) *gomock.Call
- type MockRegistry
- type MockRegistryMockRecorder
- type MockSerializer
- type MockSerializerMockRecorder
- type MockWatcher
- func (m *MockWatcher) Checkpoint(ctx context.Context, eventSeqNum uint64) error
- func (m *MockWatcher) EXPECT() *MockWatcherMockRecorder
- func (m *MockWatcher) ID() string
- func (m *MockWatcher) SeekToOffset(ctx context.Context, eventSeqNum uint64) error
- func (m *MockWatcher) Stats() Stats
- func (m *MockWatcher) Stop(ctx context.Context)
- type MockWatcherMockRecorder
- func (mr *MockWatcherMockRecorder) Checkpoint(ctx, eventSeqNum interface{}) *gomock.Call
- func (mr *MockWatcherMockRecorder) ID() *gomock.Call
- func (mr *MockWatcherMockRecorder) SeekToOffset(ctx, eventSeqNum interface{}) *gomock.Call
- func (mr *MockWatcherMockRecorder) Stats() *gomock.Call
- func (mr *MockWatcherMockRecorder) Stop(ctx interface{}) *gomock.Call
- type Operation
- type Registry
- type RetryStrategy
- type SerializationError
- type Serializer
- type State
- type Stats
- type WatchOption
- func WithBatchSize(size int) WatchOption
- func WithBufferSize(size int) WatchOption
- func WithFilter(filter EventFilter) WatchOption
- func WithInitialBackoff(backoff time.Duration) WatchOption
- func WithInitialEventIterator(iterator EventIterator) WatchOption
- func WithMaxBackoff(backoff time.Duration) WatchOption
- func WithMaxRetries(maxRetries int) WatchOption
- func WithRetryStrategy(strategy RetryStrategy) WatchOption
- type Watcher
- type WatcherError
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type CheckpointError ¶
CheckpointError represents an error related to checkpointing
func NewCheckpointError ¶
func NewCheckpointError(watcherID string, err error) *CheckpointError
NewCheckpointError creates a new CheckpointError
func (*CheckpointError) Error ¶
func (e *CheckpointError) Error() string
func (*CheckpointError) Unwrap ¶
func (e *CheckpointError) Unwrap() error
type DeserializationError ¶
type DeserializationError struct {
Err error
}
func NewDeserializationError ¶
func NewDeserializationError(err error) *DeserializationError
func (*DeserializationError) Error ¶
func (e *DeserializationError) Error() string
func (*DeserializationError) Unwrap ¶
func (e *DeserializationError) Unwrap() error
type Event ¶
type Event struct { SeqNum uint64 `json:"seqNum"` Operation Operation `json:"operation"` ObjectType string `json:"objectType"` Object interface{} `json:"object"` Timestamp time.Time `json:"timestamp"` }
Event represents a single event in the event store.
type EventFilter ¶
type EventFilter struct { // ObjectTypes is a list of object types to include in the filter. ObjectTypes []string // Operations is a list of operations to include in the filter. Operations []Operation }
EventFilter defines criteria for filtering events.
type EventHandler ¶
type EventHandler interface { // HandleEvent processes a single event. // It returns an error if the event processing fails. HandleEvent(ctx context.Context, event Event) error }
EventHandler is an interface for handling events.
type EventHandlingError ¶
EventHandlingError represents an error that occurred during event processing
func NewEventHandlingError ¶
func NewEventHandlingError(watcherID string, eventID uint64, err error) *EventHandlingError
NewEventHandlingError creates a new EventHandlingError
func (*EventHandlingError) Error ¶
func (e *EventHandlingError) Error() string
func (*EventHandlingError) Unwrap ¶
func (e *EventHandlingError) Unwrap() error
type EventIterator ¶
type EventIterator struct { Type EventIteratorType SequenceNumber uint64 }
EventIterator defines the starting position for reading events.
func AfterSequenceNumberIterator ¶
func AfterSequenceNumberIterator(seqNum uint64) EventIterator
AfterSequenceNumberIterator creates an EventIterator that starts after a specific sequence number.
func AtSequenceNumberIterator ¶
func AtSequenceNumberIterator(seqNum uint64) EventIterator
AtSequenceNumberIterator creates an EventIterator that starts at a specific sequence number.
func LatestIterator ¶
func LatestIterator() EventIterator
LatestIterator creates an EventIterator that starts from the latest available event.
func TrimHorizonIterator ¶
func TrimHorizonIterator() EventIterator
TrimHorizonIterator creates an EventIterator that starts from the oldest available event.
func (EventIterator) String ¶
func (sp EventIterator) String() string
String returns a string representation of the EventIterator.
type EventIteratorType ¶
type EventIteratorType int
EventIteratorType defines the type of starting position for reading events.
const ( // EventIteratorTrimHorizon specifies that events should be read from the oldest available event. EventIteratorTrimHorizon EventIteratorType = iota // EventIteratorLatest specifies that events should be read from the latest available event. EventIteratorLatest // EventIteratorAtSequenceNumber specifies that events should be read starting from a specific sequence number. EventIteratorAtSequenceNumber // EventIteratorAfterSequenceNumber specifies that events should be read starting after a specific sequence number. EventIteratorAfterSequenceNumber )
type EventStore ¶
type EventStore interface { // StoreEvent stores a new event in the event store. StoreEvent(ctx context.Context, operation Operation, objectType string, object interface{}) error // GetEvents retrieves events based on the provided query parameters. GetEvents(ctx context.Context, params GetEventsRequest) (*GetEventsResponse, error) // GetLatestEventNum returns the sequence number of the latest event. GetLatestEventNum(ctx context.Context) (uint64, error) // StoreCheckpoint saves a checkpoint for a specific watcher. StoreCheckpoint(ctx context.Context, watcherID string, eventSeqNum uint64) error // GetCheckpoint retrieves the checkpoint for a specific watcher. GetCheckpoint(ctx context.Context, watcherID string) (uint64, error) // Close closes the event store. Close(ctx context.Context) error }
EventStore defines the interface for event storage and retrieval.
type GetEventsRequest ¶
type GetEventsRequest struct { // EventIterator specifies where to start reading events. EventIterator EventIterator // Limit is the maximum number of events to return. Limit int // Filter specifies criteria for filtering events. Filter EventFilter }
GetEventsRequest defines parameters for querying events.
type GetEventsResponse ¶
type GetEventsResponse struct { Events []Event NextEventIterator EventIterator }
type JSONSerializer ¶
type JSONSerializer struct {
// contains filtered or unexported fields
}
JSONSerializer implements the Serializer interface for JSON serialization
func NewJSONSerializer ¶
func NewJSONSerializer() *JSONSerializer
NewJSONSerializer creates a new instance of JSONSerializer
func (*JSONSerializer) IsTypeRegistered ¶
func (s *JSONSerializer) IsTypeRegistered(name string) bool
IsTypeRegistered checks if a type is registered in the serializer's type registry
func (*JSONSerializer) Marshal ¶
func (s *JSONSerializer) Marshal(v Event) ([]byte, error)
Marshal serializes an Event into a byte slice
func (*JSONSerializer) RegisterType ¶
func (s *JSONSerializer) RegisterType(name string, t reflect.Type) error
RegisterType adds a new type to the serializer's type registry It returns an error if the type is already registered or if the provided type is invalid
type MockEventHandler ¶
type MockEventHandler struct {
// contains filtered or unexported fields
}
MockEventHandler is a mock of EventHandler interface.
func NewMockEventHandler ¶
func NewMockEventHandler(ctrl *gomock.Controller) *MockEventHandler
NewMockEventHandler creates a new mock instance.
func (*MockEventHandler) EXPECT ¶
func (m *MockEventHandler) EXPECT() *MockEventHandlerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockEventHandler) HandleEvent ¶
func (m *MockEventHandler) HandleEvent(ctx context.Context, event Event) error
HandleEvent mocks base method.
type MockEventHandlerMockRecorder ¶
type MockEventHandlerMockRecorder struct {
// contains filtered or unexported fields
}
MockEventHandlerMockRecorder is the mock recorder for MockEventHandler.
func (*MockEventHandlerMockRecorder) HandleEvent ¶
func (mr *MockEventHandlerMockRecorder) HandleEvent(ctx, event interface{}) *gomock.Call
HandleEvent indicates an expected call of HandleEvent.
type MockEventStore ¶
type MockEventStore struct {
// contains filtered or unexported fields
}
MockEventStore is a mock of EventStore interface.
func NewMockEventStore ¶
func NewMockEventStore(ctrl *gomock.Controller) *MockEventStore
NewMockEventStore creates a new mock instance.
func (*MockEventStore) Close ¶
func (m *MockEventStore) Close(ctx context.Context) error
Close mocks base method.
func (*MockEventStore) EXPECT ¶
func (m *MockEventStore) EXPECT() *MockEventStoreMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockEventStore) GetCheckpoint ¶
GetCheckpoint mocks base method.
func (*MockEventStore) GetEvents ¶
func (m *MockEventStore) GetEvents(ctx context.Context, params GetEventsRequest) (*GetEventsResponse, error)
GetEvents mocks base method.
func (*MockEventStore) GetLatestEventNum ¶
func (m *MockEventStore) GetLatestEventNum(ctx context.Context) (uint64, error)
GetLatestEventNum mocks base method.
func (*MockEventStore) StoreCheckpoint ¶
func (m *MockEventStore) StoreCheckpoint(ctx context.Context, watcherID string, eventSeqNum uint64) error
StoreCheckpoint mocks base method.
func (*MockEventStore) StoreEvent ¶
func (m *MockEventStore) StoreEvent(ctx context.Context, operation Operation, objectType string, object interface{}) error
StoreEvent mocks base method.
type MockEventStoreMockRecorder ¶
type MockEventStoreMockRecorder struct {
// contains filtered or unexported fields
}
MockEventStoreMockRecorder is the mock recorder for MockEventStore.
func (*MockEventStoreMockRecorder) Close ¶
func (mr *MockEventStoreMockRecorder) Close(ctx interface{}) *gomock.Call
Close indicates an expected call of Close.
func (*MockEventStoreMockRecorder) GetCheckpoint ¶
func (mr *MockEventStoreMockRecorder) GetCheckpoint(ctx, watcherID interface{}) *gomock.Call
GetCheckpoint indicates an expected call of GetCheckpoint.
func (*MockEventStoreMockRecorder) GetEvents ¶
func (mr *MockEventStoreMockRecorder) GetEvents(ctx, params interface{}) *gomock.Call
GetEvents indicates an expected call of GetEvents.
func (*MockEventStoreMockRecorder) GetLatestEventNum ¶
func (mr *MockEventStoreMockRecorder) GetLatestEventNum(ctx interface{}) *gomock.Call
GetLatestEventNum indicates an expected call of GetLatestEventNum.
func (*MockEventStoreMockRecorder) StoreCheckpoint ¶
func (mr *MockEventStoreMockRecorder) StoreCheckpoint(ctx, watcherID, eventSeqNum interface{}) *gomock.Call
StoreCheckpoint indicates an expected call of StoreCheckpoint.
func (*MockEventStoreMockRecorder) StoreEvent ¶
func (mr *MockEventStoreMockRecorder) StoreEvent(ctx, operation, objectType, object interface{}) *gomock.Call
StoreEvent indicates an expected call of StoreEvent.
type MockRegistry ¶
type MockRegistry struct {
// contains filtered or unexported fields
}
MockRegistry is a mock of Registry interface.
func NewMockRegistry ¶
func NewMockRegistry(ctrl *gomock.Controller) *MockRegistry
NewMockRegistry creates a new mock instance.
func (*MockRegistry) EXPECT ¶
func (m *MockRegistry) EXPECT() *MockRegistryMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockRegistry) GetWatcher ¶
func (m *MockRegistry) GetWatcher(watcherID string) (Watcher, error)
GetWatcher mocks base method.
func (*MockRegistry) Stop ¶
func (m *MockRegistry) Stop(ctx context.Context) error
Stop mocks base method.
func (*MockRegistry) Watch ¶
func (m *MockRegistry) Watch(ctx context.Context, watcherID string, handler EventHandler, opts ...WatchOption) (Watcher, error)
Watch mocks base method.
type MockRegistryMockRecorder ¶
type MockRegistryMockRecorder struct {
// contains filtered or unexported fields
}
MockRegistryMockRecorder is the mock recorder for MockRegistry.
func (*MockRegistryMockRecorder) GetWatcher ¶
func (mr *MockRegistryMockRecorder) GetWatcher(watcherID interface{}) *gomock.Call
GetWatcher indicates an expected call of GetWatcher.
func (*MockRegistryMockRecorder) Stop ¶
func (mr *MockRegistryMockRecorder) Stop(ctx interface{}) *gomock.Call
Stop indicates an expected call of Stop.
func (*MockRegistryMockRecorder) Watch ¶
func (mr *MockRegistryMockRecorder) Watch(ctx, watcherID, handler interface{}, opts ...interface{}) *gomock.Call
Watch indicates an expected call of Watch.
type MockSerializer ¶
type MockSerializer struct {
// contains filtered or unexported fields
}
MockSerializer is a mock of Serializer interface.
func NewMockSerializer ¶
func NewMockSerializer(ctrl *gomock.Controller) *MockSerializer
NewMockSerializer creates a new mock instance.
func (*MockSerializer) EXPECT ¶
func (m *MockSerializer) EXPECT() *MockSerializerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
type MockSerializerMockRecorder ¶
type MockSerializerMockRecorder struct {
// contains filtered or unexported fields
}
MockSerializerMockRecorder is the mock recorder for MockSerializer.
func (*MockSerializerMockRecorder) Marshal ¶
func (mr *MockSerializerMockRecorder) Marshal(event interface{}) *gomock.Call
Marshal indicates an expected call of Marshal.
func (*MockSerializerMockRecorder) Unmarshal ¶
func (mr *MockSerializerMockRecorder) Unmarshal(data, event interface{}) *gomock.Call
Unmarshal indicates an expected call of Unmarshal.
type MockWatcher ¶
type MockWatcher struct {
// contains filtered or unexported fields
}
MockWatcher is a mock of Watcher interface.
func NewMockWatcher ¶
func NewMockWatcher(ctrl *gomock.Controller) *MockWatcher
NewMockWatcher creates a new mock instance.
func (*MockWatcher) Checkpoint ¶
func (m *MockWatcher) Checkpoint(ctx context.Context, eventSeqNum uint64) error
Checkpoint mocks base method.
func (*MockWatcher) EXPECT ¶
func (m *MockWatcher) EXPECT() *MockWatcherMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockWatcher) SeekToOffset ¶
func (m *MockWatcher) SeekToOffset(ctx context.Context, eventSeqNum uint64) error
SeekToOffset mocks base method.
type MockWatcherMockRecorder ¶
type MockWatcherMockRecorder struct {
// contains filtered or unexported fields
}
MockWatcherMockRecorder is the mock recorder for MockWatcher.
func (*MockWatcherMockRecorder) Checkpoint ¶
func (mr *MockWatcherMockRecorder) Checkpoint(ctx, eventSeqNum interface{}) *gomock.Call
Checkpoint indicates an expected call of Checkpoint.
func (*MockWatcherMockRecorder) ID ¶
func (mr *MockWatcherMockRecorder) ID() *gomock.Call
ID indicates an expected call of ID.
func (*MockWatcherMockRecorder) SeekToOffset ¶
func (mr *MockWatcherMockRecorder) SeekToOffset(ctx, eventSeqNum interface{}) *gomock.Call
SeekToOffset indicates an expected call of SeekToOffset.
func (*MockWatcherMockRecorder) Stats ¶
func (mr *MockWatcherMockRecorder) Stats() *gomock.Call
Stats indicates an expected call of Stats.
func (*MockWatcherMockRecorder) Stop ¶
func (mr *MockWatcherMockRecorder) Stop(ctx interface{}) *gomock.Call
Stop indicates an expected call of Stop.
type Operation ¶
type Operation string
Operation represents the type of operation performed in an event.
type Registry ¶
type Registry interface { // Watch starts watching for events with the given options. // It returns a Watcher that can be used to receive events. Watch(ctx context.Context, watcherID string, handler EventHandler, opts ...WatchOption) (Watcher, error) // GetWatcher retrieves an existing watcher by its ID. GetWatcher(watcherID string) (Watcher, error) // Stop gracefully shuts down the registry and all its watchers. Stop(ctx context.Context) error }
Registry manages multiple event watchers and provides methods to watch for events.
func NewRegistry ¶
func NewRegistry(store EventStore) Registry
NewRegistry creates a new Registry with the given EventStore.
Example usage:
store := // initialize your event store registry := NewRegistry(store) defer registry.Stop(context.Background()) watcher, err := registry.Watch(context.Background(), "myWatcher", myEventHandler) if err != nil { // handle error }
type RetryStrategy ¶
type RetryStrategy int
const ( RetryStrategyBlock RetryStrategy = iota RetryStrategySkip )
type SerializationError ¶
func NewSerializationError ¶
func NewSerializationError(event Event, err error) *SerializationError
func (*SerializationError) Error ¶
func (e *SerializationError) Error() string
func (*SerializationError) Unwrap ¶
func (e *SerializationError) Unwrap() error
type Serializer ¶
type Serializer interface { // Marshal serializes an Event into a byte slice. Marshal(event Event) ([]byte, error) // Unmarshal deserializes a byte slice into an Event. Unmarshal(data []byte, event *Event) error }
Serializer defines the interface for event serialization and deserialization.
type Stats ¶
type Stats struct { ID string State State NextEventIterator EventIterator // Next event iterator for the watcher LastProcessedSeqNum uint64 // SeqNum of the last event processed by this watcher LastProcessedEventTime time.Time // timestamp of the last processed event LastListenTime time.Time // timestamp of the last successful listen operation }
type WatchOption ¶
type WatchOption func(*watchOptions)
WatchOption is a function type for configuring watch options
func WithBatchSize ¶
func WithBatchSize(size int) WatchOption
WithBatchSize sets the number of events to fetch in each batch
func WithBufferSize ¶
func WithBufferSize(size int) WatchOption
WithBufferSize sets the size of the event buffer
func WithFilter ¶
func WithFilter(filter EventFilter) WatchOption
WithFilter sets the event filter for watching
func WithInitialBackoff ¶
func WithInitialBackoff(backoff time.Duration) WatchOption
WithInitialBackoff sets the initial backoff duration for retries
func WithInitialEventIterator ¶
func WithInitialEventIterator(iterator EventIterator) WatchOption
WithInitialEventIterator sets the starting position for watching if no checkpoint is found
func WithMaxBackoff ¶
func WithMaxBackoff(backoff time.Duration) WatchOption
WithMaxBackoff sets the maximum backoff duration for retries
func WithMaxRetries ¶
func WithMaxRetries(maxRetries int) WatchOption
WithMaxRetries sets the maximum number of retries for event handling
func WithRetryStrategy ¶
func WithRetryStrategy(strategy RetryStrategy) WatchOption
WithRetryStrategy sets the retry strategy for event handling
type Watcher ¶
type Watcher interface { // ID returns the unique identifier for the watcher. ID() string // Stats returns the current statistics for the watcher. Stats() Stats // Stop gracefully stops the watcher. Stop(ctx context.Context) // Checkpoint saves the current progress of the watcher. Checkpoint(ctx context.Context, eventSeqNum uint64) error // SeekToOffset moves the watcher to a specific event sequence number. SeekToOffset(ctx context.Context, eventSeqNum uint64) error }
Watcher represents a single event watcher.
type WatcherError ¶
WatcherError represents an error related to a specific watcher
func NewWatcherError ¶
func NewWatcherError(watcherID string, err error) *WatcherError
func (*WatcherError) Error ¶
func (e *WatcherError) Error() string
func (*WatcherError) Unwrap ¶
func (e *WatcherError) Unwrap() error