Documentation ¶
Overview ¶
Package watcher is a generated GoMock package.
Index ¶
- Variables
- type CheckpointError
- type DeserializationError
- type Event
- type EventFilter
- type EventHandler
- type EventHandlingError
- type EventIterator
- type EventIteratorType
- type EventStore
- type GetEventsRequest
- type GetEventsResponse
- type JSONSerializer
- type MockEventHandler
- type MockEventHandlerMockRecorder
- type MockEventStore
- func (m *MockEventStore) Close(ctx context.Context) error
- func (m *MockEventStore) EXPECT() *MockEventStoreMockRecorder
- func (m *MockEventStore) GetCheckpoint(ctx context.Context, watcherID string) (uint64, error)
- func (m *MockEventStore) GetEvents(ctx context.Context, params GetEventsRequest) (*GetEventsResponse, error)
- func (m *MockEventStore) GetLatestEventNum(ctx context.Context) (uint64, error)
- func (m *MockEventStore) StoreCheckpoint(ctx context.Context, watcherID string, eventSeqNum uint64) error
- func (m *MockEventStore) StoreEvent(ctx context.Context, operation Operation, objectType string, ...) error
- type MockEventStoreMockRecorder
- func (mr *MockEventStoreMockRecorder) Close(ctx interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) GetCheckpoint(ctx, watcherID interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) GetEvents(ctx, params interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) GetLatestEventNum(ctx interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) StoreCheckpoint(ctx, watcherID, eventSeqNum interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) StoreEvent(ctx, operation, objectType, object interface{}) *gomock.Call
- type MockRegistry
- type MockRegistryMockRecorder
- type MockSerializer
- type MockSerializerMockRecorder
- type MockWatcher
- func (m *MockWatcher) Checkpoint(ctx context.Context, eventSeqNum uint64) error
- func (m *MockWatcher) EXPECT() *MockWatcherMockRecorder
- func (m *MockWatcher) ID() string
- func (m *MockWatcher) SeekToOffset(ctx context.Context, eventSeqNum uint64) error
- func (m *MockWatcher) Stats() Stats
- func (m *MockWatcher) Stop(ctx context.Context)
- type MockWatcherMockRecorder
- func (mr *MockWatcherMockRecorder) Checkpoint(ctx, eventSeqNum interface{}) *gomock.Call
- func (mr *MockWatcherMockRecorder) ID() *gomock.Call
- func (mr *MockWatcherMockRecorder) SeekToOffset(ctx, eventSeqNum interface{}) *gomock.Call
- func (mr *MockWatcherMockRecorder) Stats() *gomock.Call
- func (mr *MockWatcherMockRecorder) Stop(ctx interface{}) *gomock.Call
- type Operation
- type Registry
- type RetryStrategy
- type SerializationError
- type Serializer
- type State
- type Stats
- type WatchOption
- func WithBatchSize(size int) WatchOption
- func WithBufferSize(size int) WatchOption
- func WithFilter(filter EventFilter) WatchOption
- func WithInitialBackoff(backoff time.Duration) WatchOption
- func WithInitialEventIterator(iterator EventIterator) WatchOption
- func WithMaxBackoff(backoff time.Duration) WatchOption
- func WithMaxRetries(maxRetries int) WatchOption
- func WithRetryStrategy(strategy RetryStrategy) WatchOption
- type Watcher
- type WatcherError
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type CheckpointError ¶
CheckpointError represents an error related to checkpointing
func NewCheckpointError ¶
func NewCheckpointError(watcherID string, err error) *CheckpointError
NewCheckpointError creates a new CheckpointError
func (*CheckpointError) Error ¶
func (e *CheckpointError) Error() string
func (*CheckpointError) Unwrap ¶
func (e *CheckpointError) Unwrap() error
type DeserializationError ¶
type DeserializationError struct {
Err error
}
func NewDeserializationError ¶
func NewDeserializationError(err error) *DeserializationError
func (*DeserializationError) Error ¶
func (e *DeserializationError) Error() string
func (*DeserializationError) Unwrap ¶
func (e *DeserializationError) Unwrap() error
type Event ¶
type Event struct { SeqNum uint64 `json:"seqNum"` Operation Operation `json:"operation"` ObjectType string `json:"objectType"` Object interface{} `json:"object"` Timestamp time.Time `json:"timestamp"` }
Event represents a single event in the event store.
type EventFilter ¶
type EventFilter struct { // ObjectTypes is a list of object types to include in the filter. ObjectTypes []string // Operations is a list of operations to include in the filter. Operations []Operation }
EventFilter defines criteria for filtering events.
type EventHandler ¶
type EventHandler interface { // HandleEvent processes a single event. // It returns an error if the event processing fails. HandleEvent(ctx context.Context, event Event) error }
EventHandler is an interface for handling events.
type EventHandlingError ¶
EventHandlingError represents an error that occurred during event processing
func NewEventHandlingError ¶
func NewEventHandlingError(watcherID string, eventID uint64, err error) *EventHandlingError
NewEventHandlingError creates a new EventHandlingError
func (*EventHandlingError) Error ¶
func (e *EventHandlingError) Error() string
func (*EventHandlingError) Unwrap ¶
func (e *EventHandlingError) Unwrap() error
type EventIterator ¶
type EventIterator struct { Type EventIteratorType SequenceNumber uint64 }
EventIterator defines the starting position for reading events.
func AfterSequenceNumberIterator ¶
func AfterSequenceNumberIterator(seqNum uint64) EventIterator
AfterSequenceNumberIterator creates an EventIterator that starts after a specific sequence number.
func AtSequenceNumberIterator ¶
func AtSequenceNumberIterator(seqNum uint64) EventIterator
AtSequenceNumberIterator creates an EventIterator that starts at a specific sequence number.
func LatestIterator ¶
func LatestIterator() EventIterator
LatestIterator creates an EventIterator that starts from the latest available event.
func TrimHorizonIterator ¶
func TrimHorizonIterator() EventIterator
TrimHorizonIterator creates an EventIterator that starts from the oldest available event.
func (EventIterator) String ¶
func (sp EventIterator) String() string
String returns a string representation of the EventIterator.
type EventIteratorType ¶
type EventIteratorType int
EventIteratorType defines the type of starting position for reading events.
const ( // EventIteratorTrimHorizon specifies that events should be read from the oldest available event. EventIteratorTrimHorizon EventIteratorType = iota // EventIteratorLatest specifies that events should be read from the latest available event. EventIteratorLatest // EventIteratorAtSequenceNumber specifies that events should be read starting from a specific sequence number. EventIteratorAtSequenceNumber // EventIteratorAfterSequenceNumber specifies that events should be read starting after a specific sequence number. EventIteratorAfterSequenceNumber )
type EventStore ¶
type EventStore interface { // StoreEvent stores a new event in the event store. StoreEvent(ctx context.Context, operation Operation, objectType string, object interface{}) error // GetEvents retrieves events based on the provided query parameters. GetEvents(ctx context.Context, params GetEventsRequest) (*GetEventsResponse, error) // GetLatestEventNum returns the sequence number of the latest event. GetLatestEventNum(ctx context.Context) (uint64, error) // StoreCheckpoint saves a checkpoint for a specific watcher. StoreCheckpoint(ctx context.Context, watcherID string, eventSeqNum uint64) error // GetCheckpoint retrieves the checkpoint for a specific watcher. GetCheckpoint(ctx context.Context, watcherID string) (uint64, error) // Close closes the event store. Close(ctx context.Context) error }
EventStore defines the interface for event storage and retrieval.
type GetEventsRequest ¶
type GetEventsRequest struct { // EventIterator specifies where to start reading events. EventIterator EventIterator // Limit is the maximum number of events to return. Limit int // Filter specifies criteria for filtering events. Filter EventFilter }
GetEventsRequest defines parameters for querying events.
type GetEventsResponse ¶
type GetEventsResponse struct { Events []Event NextEventIterator EventIterator }
type JSONSerializer ¶
type JSONSerializer struct {
// contains filtered or unexported fields
}
JSONSerializer implements the Serializer interface for JSON serialization
func NewJSONSerializer ¶
func NewJSONSerializer() *JSONSerializer
NewJSONSerializer creates a new instance of JSONSerializer
func (*JSONSerializer) IsTypeRegistered ¶
func (s *JSONSerializer) IsTypeRegistered(name string) bool
IsTypeRegistered checks if a type is registered in the serializer's type registry
func (*JSONSerializer) Marshal ¶
func (s *JSONSerializer) Marshal(v Event) ([]byte, error)
Marshal serializes an Event into a byte slice
func (*JSONSerializer) RegisterType ¶
func (s *JSONSerializer) RegisterType(name string, t reflect.Type) error
RegisterType adds a new type to the serializer's type registry It returns an error if the type is already registered or if the provided type is invalid
type MockEventHandler ¶
type MockEventHandler struct {
// contains filtered or unexported fields
}
MockEventHandler is a mock of EventHandler interface.
func NewMockEventHandler ¶
func NewMockEventHandler(ctrl *gomock.Controller) *MockEventHandler
NewMockEventHandler creates a new mock instance.
func (*MockEventHandler) EXPECT ¶
func (m *MockEventHandler) EXPECT() *MockEventHandlerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockEventHandler) HandleEvent ¶
func (m *MockEventHandler) HandleEvent(ctx context.Context, event Event) error
HandleEvent mocks base method.
type MockEventHandlerMockRecorder ¶
type MockEventHandlerMockRecorder struct {
// contains filtered or unexported fields
}
MockEventHandlerMockRecorder is the mock recorder for MockEventHandler.
func (*MockEventHandlerMockRecorder) HandleEvent ¶
func (mr *MockEventHandlerMockRecorder) HandleEvent(ctx, event interface{}) *gomock.Call
HandleEvent indicates an expected call of HandleEvent.
type MockEventStore ¶
type MockEventStore struct {
// contains filtered or unexported fields
}
MockEventStore is a mock of EventStore interface.
func NewMockEventStore ¶
func NewMockEventStore(ctrl *gomock.Controller) *MockEventStore
NewMockEventStore creates a new mock instance.
func (*MockEventStore) Close ¶
func (m *MockEventStore) Close(ctx context.Context) error
Close mocks base method.
func (*MockEventStore) EXPECT ¶
func (m *MockEventStore) EXPECT() *MockEventStoreMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockEventStore) GetCheckpoint ¶
GetCheckpoint mocks base method.
func (*MockEventStore) GetEvents ¶
func (m *MockEventStore) GetEvents(ctx context.Context, params GetEventsRequest) (*GetEventsResponse, error)
GetEvents mocks base method.
func (*MockEventStore) GetLatestEventNum ¶
func (m *MockEventStore) GetLatestEventNum(ctx context.Context) (uint64, error)
GetLatestEventNum mocks base method.
func (*MockEventStore) StoreCheckpoint ¶
func (m *MockEventStore) StoreCheckpoint(ctx context.Context, watcherID string, eventSeqNum uint64) error
StoreCheckpoint mocks base method.
func (*MockEventStore) StoreEvent ¶
func (m *MockEventStore) StoreEvent(ctx context.Context, operation Operation, objectType string, object interface{}) error
StoreEvent mocks base method.
type MockEventStoreMockRecorder ¶
type MockEventStoreMockRecorder struct {
// contains filtered or unexported fields
}
MockEventStoreMockRecorder is the mock recorder for MockEventStore.
func (*MockEventStoreMockRecorder) Close ¶
func (mr *MockEventStoreMockRecorder) Close(ctx interface{}) *gomock.Call
Close indicates an expected call of Close.
func (*MockEventStoreMockRecorder) GetCheckpoint ¶
func (mr *MockEventStoreMockRecorder) GetCheckpoint(ctx, watcherID interface{}) *gomock.Call
GetCheckpoint indicates an expected call of GetCheckpoint.
func (*MockEventStoreMockRecorder) GetEvents ¶
func (mr *MockEventStoreMockRecorder) GetEvents(ctx, params interface{}) *gomock.Call
GetEvents indicates an expected call of GetEvents.
func (*MockEventStoreMockRecorder) GetLatestEventNum ¶
func (mr *MockEventStoreMockRecorder) GetLatestEventNum(ctx interface{}) *gomock.Call
GetLatestEventNum indicates an expected call of GetLatestEventNum.
func (*MockEventStoreMockRecorder) StoreCheckpoint ¶
func (mr *MockEventStoreMockRecorder) StoreCheckpoint(ctx, watcherID, eventSeqNum interface{}) *gomock.Call
StoreCheckpoint indicates an expected call of StoreCheckpoint.
func (*MockEventStoreMockRecorder) StoreEvent ¶
func (mr *MockEventStoreMockRecorder) StoreEvent(ctx, operation, objectType, object interface{}) *gomock.Call
StoreEvent indicates an expected call of StoreEvent.
type MockRegistry ¶
type MockRegistry struct {
// contains filtered or unexported fields
}
MockRegistry is a mock of Registry interface.
func NewMockRegistry ¶
func NewMockRegistry(ctrl *gomock.Controller) *MockRegistry
NewMockRegistry creates a new mock instance.
func (*MockRegistry) EXPECT ¶
func (m *MockRegistry) EXPECT() *MockRegistryMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockRegistry) GetWatcher ¶
func (m *MockRegistry) GetWatcher(watcherID string) (Watcher, error)
GetWatcher mocks base method.
func (*MockRegistry) Stop ¶
func (m *MockRegistry) Stop(ctx context.Context) error
Stop mocks base method.
func (*MockRegistry) Watch ¶
func (m *MockRegistry) Watch(ctx context.Context, watcherID string, handler EventHandler, opts ...WatchOption) (Watcher, error)
Watch mocks base method.
type MockRegistryMockRecorder ¶
type MockRegistryMockRecorder struct {
// contains filtered or unexported fields
}
MockRegistryMockRecorder is the mock recorder for MockRegistry.
func (*MockRegistryMockRecorder) GetWatcher ¶
func (mr *MockRegistryMockRecorder) GetWatcher(watcherID interface{}) *gomock.Call
GetWatcher indicates an expected call of GetWatcher.
func (*MockRegistryMockRecorder) Stop ¶
func (mr *MockRegistryMockRecorder) Stop(ctx interface{}) *gomock.Call
Stop indicates an expected call of Stop.
func (*MockRegistryMockRecorder) Watch ¶
func (mr *MockRegistryMockRecorder) Watch(ctx, watcherID, handler interface{}, opts ...interface{}) *gomock.Call
Watch indicates an expected call of Watch.
type MockSerializer ¶
type MockSerializer struct {
// contains filtered or unexported fields
}
MockSerializer is a mock of Serializer interface.
func NewMockSerializer ¶
func NewMockSerializer(ctrl *gomock.Controller) *MockSerializer
NewMockSerializer creates a new mock instance.
func (*MockSerializer) EXPECT ¶
func (m *MockSerializer) EXPECT() *MockSerializerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
type MockSerializerMockRecorder ¶
type MockSerializerMockRecorder struct {
// contains filtered or unexported fields
}
MockSerializerMockRecorder is the mock recorder for MockSerializer.
func (*MockSerializerMockRecorder) Marshal ¶
func (mr *MockSerializerMockRecorder) Marshal(event interface{}) *gomock.Call
Marshal indicates an expected call of Marshal.
func (*MockSerializerMockRecorder) Unmarshal ¶
func (mr *MockSerializerMockRecorder) Unmarshal(data, event interface{}) *gomock.Call
Unmarshal indicates an expected call of Unmarshal.
type MockWatcher ¶
type MockWatcher struct {
// contains filtered or unexported fields
}
MockWatcher is a mock of Watcher interface.
func NewMockWatcher ¶
func NewMockWatcher(ctrl *gomock.Controller) *MockWatcher
NewMockWatcher creates a new mock instance.
func (*MockWatcher) Checkpoint ¶
func (m *MockWatcher) Checkpoint(ctx context.Context, eventSeqNum uint64) error
Checkpoint mocks base method.
func (*MockWatcher) EXPECT ¶
func (m *MockWatcher) EXPECT() *MockWatcherMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockWatcher) SeekToOffset ¶
func (m *MockWatcher) SeekToOffset(ctx context.Context, eventSeqNum uint64) error
SeekToOffset mocks base method.
type MockWatcherMockRecorder ¶
type MockWatcherMockRecorder struct {
// contains filtered or unexported fields
}
MockWatcherMockRecorder is the mock recorder for MockWatcher.
func (*MockWatcherMockRecorder) Checkpoint ¶
func (mr *MockWatcherMockRecorder) Checkpoint(ctx, eventSeqNum interface{}) *gomock.Call
Checkpoint indicates an expected call of Checkpoint.
func (*MockWatcherMockRecorder) ID ¶
func (mr *MockWatcherMockRecorder) ID() *gomock.Call
ID indicates an expected call of ID.
func (*MockWatcherMockRecorder) SeekToOffset ¶
func (mr *MockWatcherMockRecorder) SeekToOffset(ctx, eventSeqNum interface{}) *gomock.Call
SeekToOffset indicates an expected call of SeekToOffset.
func (*MockWatcherMockRecorder) Stats ¶
func (mr *MockWatcherMockRecorder) Stats() *gomock.Call
Stats indicates an expected call of Stats.
func (*MockWatcherMockRecorder) Stop ¶
func (mr *MockWatcherMockRecorder) Stop(ctx interface{}) *gomock.Call
Stop indicates an expected call of Stop.
type Operation ¶
type Operation string
Operation represents the type of operation performed in an event.
type Registry ¶
type Registry interface { // Watch starts watching for events with the given options. // It returns a Watcher that can be used to receive events. Watch(ctx context.Context, watcherID string, handler EventHandler, opts ...WatchOption) (Watcher, error) // GetWatcher retrieves an existing watcher by its ID. GetWatcher(watcherID string) (Watcher, error) // Stop gracefully shuts down the registry and all its watchers. Stop(ctx context.Context) error }
Registry manages multiple event watchers and provides methods to watch for events.
func NewRegistry ¶
func NewRegistry(store EventStore) Registry
NewRegistry creates a new Registry with the given EventStore.
Example usage:
store := // initialize your event store registry := NewRegistry(store) defer registry.Stop(context.Background()) watcher, err := registry.Watch(context.Background(), "myWatcher", myEventHandler) if err != nil { // handle error }
type RetryStrategy ¶
type RetryStrategy int
const ( RetryStrategyBlock RetryStrategy = iota RetryStrategySkip )
type SerializationError ¶
func NewSerializationError ¶
func NewSerializationError(event Event, err error) *SerializationError
func (*SerializationError) Error ¶
func (e *SerializationError) Error() string
func (*SerializationError) Unwrap ¶
func (e *SerializationError) Unwrap() error
type Serializer ¶
type Serializer interface { // Marshal serializes an Event into a byte slice. Marshal(event Event) ([]byte, error) // Unmarshal deserializes a byte slice into an Event. Unmarshal(data []byte, event *Event) error }
Serializer defines the interface for event serialization and deserialization.
type Stats ¶
type Stats struct { ID string State State NextEventIterator EventIterator // Next event iterator for the watcher LastProcessedSeqNum uint64 // SeqNum of the last event processed by this watcher LastProcessedEventTime time.Time // timestamp of the last processed event LastListenTime time.Time // timestamp of the last successful listen operation }
type WatchOption ¶
type WatchOption func(*watchOptions)
WatchOption is a function type for configuring watch options
func WithBatchSize ¶
func WithBatchSize(size int) WatchOption
WithBatchSize sets the number of events to fetch in each batch
func WithBufferSize ¶
func WithBufferSize(size int) WatchOption
WithBufferSize sets the size of the event buffer
func WithFilter ¶
func WithFilter(filter EventFilter) WatchOption
WithFilter sets the event filter for watching
func WithInitialBackoff ¶
func WithInitialBackoff(backoff time.Duration) WatchOption
WithInitialBackoff sets the initial backoff duration for retries
func WithInitialEventIterator ¶
func WithInitialEventIterator(iterator EventIterator) WatchOption
WithInitialEventIterator sets the starting position for watching if no checkpoint is found
func WithMaxBackoff ¶
func WithMaxBackoff(backoff time.Duration) WatchOption
WithMaxBackoff sets the maximum backoff duration for retries
func WithMaxRetries ¶
func WithMaxRetries(maxRetries int) WatchOption
WithMaxRetries sets the maximum number of retries for event handling
func WithRetryStrategy ¶
func WithRetryStrategy(strategy RetryStrategy) WatchOption
WithRetryStrategy sets the retry strategy for event handling
type Watcher ¶
type Watcher interface { // ID returns the unique identifier for the watcher. ID() string // Stats returns the current statistics for the watcher. Stats() Stats // Stop gracefully stops the watcher. Stop(ctx context.Context) // Checkpoint saves the current progress of the watcher. Checkpoint(ctx context.Context, eventSeqNum uint64) error // SeekToOffset moves the watcher to a specific event sequence number. SeekToOffset(ctx context.Context, eventSeqNum uint64) error }
Watcher represents a single event watcher.
type WatcherError ¶
WatcherError represents an error related to a specific watcher
func NewWatcherError ¶
func NewWatcherError(watcherID string, err error) *WatcherError
func (*WatcherError) Error ¶
func (e *WatcherError) Error() string
func (*WatcherError) Unwrap ¶
func (e *WatcherError) Unwrap() error