Documentation ¶
Overview ¶
Package watcher is a generated GoMock package.
Index ¶
- Constants
- Variables
- type CheckpointError
- type DeserializationError
- type Event
- type EventFilter
- type EventHandler
- type EventHandlingError
- type EventIterator
- type EventIteratorType
- type EventStore
- type GetEventsRequest
- type GetEventsResponse
- type JSONSerializer
- type Manager
- type MockEventHandler
- type MockEventHandlerMockRecorder
- type MockEventStore
- func (m *MockEventStore) Close(ctx context.Context) error
- func (m *MockEventStore) EXPECT() *MockEventStoreMockRecorder
- func (m *MockEventStore) GetCheckpoint(ctx context.Context, watcherID string) (uint64, error)
- func (m *MockEventStore) GetEvents(ctx context.Context, request GetEventsRequest) (*GetEventsResponse, error)
- func (m *MockEventStore) GetLatestEventNum(ctx context.Context) (uint64, error)
- func (m *MockEventStore) StoreCheckpoint(ctx context.Context, watcherID string, eventSeqNum uint64) error
- func (m *MockEventStore) StoreEvent(ctx context.Context, request StoreEventRequest) error
- type MockEventStoreMockRecorder
- func (mr *MockEventStoreMockRecorder) Close(ctx interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) GetCheckpoint(ctx, watcherID interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) GetEvents(ctx, request interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) GetLatestEventNum(ctx interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) StoreCheckpoint(ctx, watcherID, eventSeqNum interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) StoreEvent(ctx, request interface{}) *gomock.Call
- type MockManager
- func (m *MockManager) Create(ctx context.Context, watcherID string, opts ...WatchOption) (Watcher, error)
- func (m *MockManager) EXPECT() *MockManagerMockRecorder
- func (m *MockManager) Lookup(ctx context.Context, watcherID string) (Watcher, error)
- func (m *MockManager) Stop(ctx context.Context) error
- type MockManagerMockRecorder
- type MockSerializer
- type MockSerializerMockRecorder
- type MockWatcher
- func (m *MockWatcher) Checkpoint(ctx context.Context, eventSeqNum uint64) error
- func (m *MockWatcher) EXPECT() *MockWatcherMockRecorder
- func (m *MockWatcher) ID() string
- func (m *MockWatcher) SeekToOffset(ctx context.Context, eventSeqNum uint64) error
- func (m *MockWatcher) SetHandler(handler EventHandler) error
- func (m *MockWatcher) Start(ctx context.Context) error
- func (m *MockWatcher) Stats() Stats
- func (m *MockWatcher) Stop(ctx context.Context)
- type MockWatcherMockRecorder
- func (mr *MockWatcherMockRecorder) Checkpoint(ctx, eventSeqNum interface{}) *gomock.Call
- func (mr *MockWatcherMockRecorder) ID() *gomock.Call
- func (mr *MockWatcherMockRecorder) SeekToOffset(ctx, eventSeqNum interface{}) *gomock.Call
- func (mr *MockWatcherMockRecorder) SetHandler(handler interface{}) *gomock.Call
- func (mr *MockWatcherMockRecorder) Start(ctx interface{}) *gomock.Call
- func (mr *MockWatcherMockRecorder) Stats() *gomock.Call
- func (mr *MockWatcherMockRecorder) Stop(ctx interface{}) *gomock.Call
- type Operation
- type RetryStrategy
- type SerializationError
- type Serializer
- type State
- type Stats
- type StoreEventRequest
- type WatchOption
- func WithAutoStart() WatchOption
- func WithBatchSize(size int) WatchOption
- func WithEphemeral() WatchOption
- func WithFilter(filter EventFilter) WatchOption
- func WithHandler(handler EventHandler) WatchOption
- func WithInitialBackoff(backoff time.Duration) WatchOption
- func WithInitialEventIterator(iterator EventIterator) WatchOption
- func WithMaxBackoff(backoff time.Duration) WatchOption
- func WithMaxRetries(maxRetries int) WatchOption
- func WithRetryStrategy(strategy RetryStrategy) WatchOption
- type Watcher
- type WatcherError
Constants ¶
const (
DefaultShutdownTimeout = 30 * time.Second
)
Variables ¶
var ( ErrWatcherAlreadyExists = errors.New("watcher already exists") ErrWatcherNotFound = errors.New("watcher not found") ErrCheckpointNotFound = errors.New("checkpoint not found") ErrNoHandler = errors.New("no handler configured") ErrHandlerExists = errors.New("handler already exists") )
Functions ¶
This section is empty.
Types ¶
type CheckpointError ¶
CheckpointError represents an error related to checkpointing
func NewCheckpointError ¶
func NewCheckpointError(watcherID string, err error) *CheckpointError
NewCheckpointError creates a new CheckpointError
func (*CheckpointError) Error ¶
func (e *CheckpointError) Error() string
func (*CheckpointError) Unwrap ¶
func (e *CheckpointError) Unwrap() error
type DeserializationError ¶
type DeserializationError struct {
Err error
}
func NewDeserializationError ¶
func NewDeserializationError(err error) *DeserializationError
func (*DeserializationError) Error ¶
func (e *DeserializationError) Error() string
func (*DeserializationError) Unwrap ¶
func (e *DeserializationError) Unwrap() error
type Event ¶
type Event struct { SeqNum uint64 `json:"seqNum"` Operation Operation `json:"operation"` ObjectType string `json:"objectType"` Object interface{} `json:"object"` Timestamp time.Time `json:"timestamp"` }
Event represents a single event in the event store.
type EventFilter ¶
type EventFilter struct { // ObjectTypes is a list of object types to include in the filter. ObjectTypes []string // Operations is a list of operations to include in the filter. Operations []Operation }
EventFilter defines criteria for filtering events.
type EventHandler ¶
type EventHandler interface { // HandleEvent processes a single event. // It returns an error if the event processing fails. // Implementations MUST honor context cancellation and return immediately when ctx.Done() HandleEvent(ctx context.Context, event Event) error }
EventHandler is an interface for handling events.
type EventHandlingError ¶
EventHandlingError represents an error that occurred during event processing
func NewEventHandlingError ¶
func NewEventHandlingError(watcherID string, eventID uint64, err error) *EventHandlingError
NewEventHandlingError creates a new EventHandlingError
func (*EventHandlingError) Error ¶
func (e *EventHandlingError) Error() string
func (*EventHandlingError) Unwrap ¶
func (e *EventHandlingError) Unwrap() error
type EventIterator ¶
type EventIterator struct { Type EventIteratorType SequenceNumber uint64 }
EventIterator defines the starting position for reading events.
func AfterSequenceNumberIterator ¶
func AfterSequenceNumberIterator(seqNum uint64) EventIterator
AfterSequenceNumberIterator creates an EventIterator that starts after a specific sequence number.
func AtSequenceNumberIterator ¶
func AtSequenceNumberIterator(seqNum uint64) EventIterator
AtSequenceNumberIterator creates an EventIterator that starts at a specific sequence number.
func LatestIterator ¶
func LatestIterator() EventIterator
LatestIterator creates an EventIterator that starts from the latest available event.
func TrimHorizonIterator ¶
func TrimHorizonIterator() EventIterator
TrimHorizonIterator creates an EventIterator that starts from the oldest available event.
func (EventIterator) String ¶
func (sp EventIterator) String() string
String returns a string representation of the EventIterator.
type EventIteratorType ¶
type EventIteratorType int
EventIteratorType defines the type of starting position for reading events.
const ( // EventIteratorTrimHorizon specifies that events should be read from the oldest available event. EventIteratorTrimHorizon EventIteratorType = iota // EventIteratorLatest specifies that events should be read from the latest available event. EventIteratorLatest // EventIteratorAtSequenceNumber specifies that events should be read starting from a specific sequence number. EventIteratorAtSequenceNumber // EventIteratorAfterSequenceNumber specifies that events should be read starting after a specific sequence number. EventIteratorAfterSequenceNumber )
type EventStore ¶
type EventStore interface { // StoreEvent stores a new event in the event store. StoreEvent(ctx context.Context, request StoreEventRequest) error // GetEvents retrieves events based on the provided query parameters. GetEvents(ctx context.Context, request GetEventsRequest) (*GetEventsResponse, error) // GetLatestEventNum returns the sequence number of the latest event. GetLatestEventNum(ctx context.Context) (uint64, error) // StoreCheckpoint saves a checkpoint for a specific watcher. StoreCheckpoint(ctx context.Context, watcherID string, eventSeqNum uint64) error // GetCheckpoint retrieves the checkpoint for a specific watcher. GetCheckpoint(ctx context.Context, watcherID string) (uint64, error) // Close closes the event store. Close(ctx context.Context) error }
EventStore defines the interface for event storage and retrieval.
type GetEventsRequest ¶
type GetEventsRequest struct { // WatcherID optionally specifies the watcher ID for logging and tracking purposes. WatcherID string // EventIterator specifies where to start reading events. EventIterator EventIterator // Limit is the maximum number of events to return. Limit int // Filter specifies criteria for filtering events. Filter EventFilter }
GetEventsRequest defines parameters for querying events.
type GetEventsResponse ¶
type GetEventsResponse struct { Events []Event NextEventIterator EventIterator }
type JSONSerializer ¶
type JSONSerializer struct {
// contains filtered or unexported fields
}
JSONSerializer implements the Serializer interface for JSON serialization
func NewJSONSerializer ¶
func NewJSONSerializer() *JSONSerializer
NewJSONSerializer creates a new instance of JSONSerializer
func (*JSONSerializer) IsTypeRegistered ¶
func (s *JSONSerializer) IsTypeRegistered(name string) bool
IsTypeRegistered checks if a type is registered in the serializer's type manager
func (*JSONSerializer) Marshal ¶
func (s *JSONSerializer) Marshal(v Event) ([]byte, error)
Marshal serializes an Event into a byte slice
func (*JSONSerializer) RegisterType ¶
func (s *JSONSerializer) RegisterType(name string, t reflect.Type) error
RegisterType adds a new type to the serializer's type manager It returns an error if the type is already registered or if the provided type is invalid
type Manager ¶ added in v1.5.2
type Manager interface { // Create creates a new not-started watcher with the given ID and options. // The watcher must be configured with a handler before it can start watching. Create(ctx context.Context, watcherID string, opts ...WatchOption) (Watcher, error) // Lookup retrieves an existing watcher by its ID. Lookup(ctx context.Context, watcherID string) (Watcher, error) // Stop gracefully shuts down the manager and all its watchers. Stop(ctx context.Context) error }
Manager handles lifecycle of multiple watchers with shared resources
func NewManager ¶ added in v1.5.2
func NewManager(store EventStore) Manager
NewManager creates a new Manager with the given EventStore.
Example usage:
store := // initialize your event store manager := NewManager(store) defer manager.Stop(context.Background()) watcher, err := manager.Create(context.Background(), "myWatcher") if err != nil { // handle error }
type MockEventHandler ¶
type MockEventHandler struct {
// contains filtered or unexported fields
}
MockEventHandler is a mock of EventHandler interface.
func NewMockEventHandler ¶
func NewMockEventHandler(ctrl *gomock.Controller) *MockEventHandler
NewMockEventHandler creates a new mock instance.
func (*MockEventHandler) EXPECT ¶
func (m *MockEventHandler) EXPECT() *MockEventHandlerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockEventHandler) HandleEvent ¶
func (m *MockEventHandler) HandleEvent(ctx context.Context, event Event) error
HandleEvent mocks base method.
type MockEventHandlerMockRecorder ¶
type MockEventHandlerMockRecorder struct {
// contains filtered or unexported fields
}
MockEventHandlerMockRecorder is the mock recorder for MockEventHandler.
func (*MockEventHandlerMockRecorder) HandleEvent ¶
func (mr *MockEventHandlerMockRecorder) HandleEvent(ctx, event interface{}) *gomock.Call
HandleEvent indicates an expected call of HandleEvent.
type MockEventStore ¶
type MockEventStore struct {
// contains filtered or unexported fields
}
MockEventStore is a mock of EventStore interface.
func NewMockEventStore ¶
func NewMockEventStore(ctrl *gomock.Controller) *MockEventStore
NewMockEventStore creates a new mock instance.
func (*MockEventStore) Close ¶
func (m *MockEventStore) Close(ctx context.Context) error
Close mocks base method.
func (*MockEventStore) EXPECT ¶
func (m *MockEventStore) EXPECT() *MockEventStoreMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockEventStore) GetCheckpoint ¶
GetCheckpoint mocks base method.
func (*MockEventStore) GetEvents ¶
func (m *MockEventStore) GetEvents(ctx context.Context, request GetEventsRequest) (*GetEventsResponse, error)
GetEvents mocks base method.
func (*MockEventStore) GetLatestEventNum ¶
func (m *MockEventStore) GetLatestEventNum(ctx context.Context) (uint64, error)
GetLatestEventNum mocks base method.
func (*MockEventStore) StoreCheckpoint ¶
func (m *MockEventStore) StoreCheckpoint(ctx context.Context, watcherID string, eventSeqNum uint64) error
StoreCheckpoint mocks base method.
func (*MockEventStore) StoreEvent ¶
func (m *MockEventStore) StoreEvent(ctx context.Context, request StoreEventRequest) error
StoreEvent mocks base method.
type MockEventStoreMockRecorder ¶
type MockEventStoreMockRecorder struct {
// contains filtered or unexported fields
}
MockEventStoreMockRecorder is the mock recorder for MockEventStore.
func (*MockEventStoreMockRecorder) Close ¶
func (mr *MockEventStoreMockRecorder) Close(ctx interface{}) *gomock.Call
Close indicates an expected call of Close.
func (*MockEventStoreMockRecorder) GetCheckpoint ¶
func (mr *MockEventStoreMockRecorder) GetCheckpoint(ctx, watcherID interface{}) *gomock.Call
GetCheckpoint indicates an expected call of GetCheckpoint.
func (*MockEventStoreMockRecorder) GetEvents ¶
func (mr *MockEventStoreMockRecorder) GetEvents(ctx, request interface{}) *gomock.Call
GetEvents indicates an expected call of GetEvents.
func (*MockEventStoreMockRecorder) GetLatestEventNum ¶
func (mr *MockEventStoreMockRecorder) GetLatestEventNum(ctx interface{}) *gomock.Call
GetLatestEventNum indicates an expected call of GetLatestEventNum.
func (*MockEventStoreMockRecorder) StoreCheckpoint ¶
func (mr *MockEventStoreMockRecorder) StoreCheckpoint(ctx, watcherID, eventSeqNum interface{}) *gomock.Call
StoreCheckpoint indicates an expected call of StoreCheckpoint.
func (*MockEventStoreMockRecorder) StoreEvent ¶
func (mr *MockEventStoreMockRecorder) StoreEvent(ctx, request interface{}) *gomock.Call
StoreEvent indicates an expected call of StoreEvent.
type MockManager ¶ added in v1.5.2
type MockManager struct {
// contains filtered or unexported fields
}
MockManager is a mock of Manager interface.
func NewMockManager ¶ added in v1.5.2
func NewMockManager(ctrl *gomock.Controller) *MockManager
NewMockManager creates a new mock instance.
func (*MockManager) Create ¶ added in v1.5.2
func (m *MockManager) Create(ctx context.Context, watcherID string, opts ...WatchOption) (Watcher, error)
Create mocks base method.
func (*MockManager) EXPECT ¶ added in v1.5.2
func (m *MockManager) EXPECT() *MockManagerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
type MockManagerMockRecorder ¶ added in v1.5.2
type MockManagerMockRecorder struct {
// contains filtered or unexported fields
}
MockManagerMockRecorder is the mock recorder for MockManager.
func (*MockManagerMockRecorder) Create ¶ added in v1.5.2
func (mr *MockManagerMockRecorder) Create(ctx, watcherID interface{}, opts ...interface{}) *gomock.Call
Create indicates an expected call of Create.
func (*MockManagerMockRecorder) Lookup ¶ added in v1.5.2
func (mr *MockManagerMockRecorder) Lookup(ctx, watcherID interface{}) *gomock.Call
Lookup indicates an expected call of Lookup.
func (*MockManagerMockRecorder) Stop ¶ added in v1.5.2
func (mr *MockManagerMockRecorder) Stop(ctx interface{}) *gomock.Call
Stop indicates an expected call of Stop.
type MockSerializer ¶
type MockSerializer struct {
// contains filtered or unexported fields
}
MockSerializer is a mock of Serializer interface.
func NewMockSerializer ¶
func NewMockSerializer(ctrl *gomock.Controller) *MockSerializer
NewMockSerializer creates a new mock instance.
func (*MockSerializer) EXPECT ¶
func (m *MockSerializer) EXPECT() *MockSerializerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
type MockSerializerMockRecorder ¶
type MockSerializerMockRecorder struct {
// contains filtered or unexported fields
}
MockSerializerMockRecorder is the mock recorder for MockSerializer.
func (*MockSerializerMockRecorder) Marshal ¶
func (mr *MockSerializerMockRecorder) Marshal(event interface{}) *gomock.Call
Marshal indicates an expected call of Marshal.
func (*MockSerializerMockRecorder) Unmarshal ¶
func (mr *MockSerializerMockRecorder) Unmarshal(data, event interface{}) *gomock.Call
Unmarshal indicates an expected call of Unmarshal.
type MockWatcher ¶
type MockWatcher struct {
// contains filtered or unexported fields
}
MockWatcher is a mock of Watcher interface.
func NewMockWatcher ¶
func NewMockWatcher(ctrl *gomock.Controller) *MockWatcher
NewMockWatcher creates a new mock instance.
func (*MockWatcher) Checkpoint ¶
func (m *MockWatcher) Checkpoint(ctx context.Context, eventSeqNum uint64) error
Checkpoint mocks base method.
func (*MockWatcher) EXPECT ¶
func (m *MockWatcher) EXPECT() *MockWatcherMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockWatcher) SeekToOffset ¶
func (m *MockWatcher) SeekToOffset(ctx context.Context, eventSeqNum uint64) error
SeekToOffset mocks base method.
func (*MockWatcher) SetHandler ¶ added in v1.5.2
func (m *MockWatcher) SetHandler(handler EventHandler) error
SetHandler mocks base method.
type MockWatcherMockRecorder ¶
type MockWatcherMockRecorder struct {
// contains filtered or unexported fields
}
MockWatcherMockRecorder is the mock recorder for MockWatcher.
func (*MockWatcherMockRecorder) Checkpoint ¶
func (mr *MockWatcherMockRecorder) Checkpoint(ctx, eventSeqNum interface{}) *gomock.Call
Checkpoint indicates an expected call of Checkpoint.
func (*MockWatcherMockRecorder) ID ¶
func (mr *MockWatcherMockRecorder) ID() *gomock.Call
ID indicates an expected call of ID.
func (*MockWatcherMockRecorder) SeekToOffset ¶
func (mr *MockWatcherMockRecorder) SeekToOffset(ctx, eventSeqNum interface{}) *gomock.Call
SeekToOffset indicates an expected call of SeekToOffset.
func (*MockWatcherMockRecorder) SetHandler ¶ added in v1.5.2
func (mr *MockWatcherMockRecorder) SetHandler(handler interface{}) *gomock.Call
SetHandler indicates an expected call of SetHandler.
func (*MockWatcherMockRecorder) Start ¶ added in v1.5.2
func (mr *MockWatcherMockRecorder) Start(ctx interface{}) *gomock.Call
Start indicates an expected call of Start.
func (*MockWatcherMockRecorder) Stats ¶
func (mr *MockWatcherMockRecorder) Stats() *gomock.Call
Stats indicates an expected call of Stats.
func (*MockWatcherMockRecorder) Stop ¶
func (mr *MockWatcherMockRecorder) Stop(ctx interface{}) *gomock.Call
Stop indicates an expected call of Stop.
type Operation ¶
type Operation string
Operation represents the type of operation performed in an event.
type RetryStrategy ¶
type RetryStrategy int
const ( RetryStrategyBlock RetryStrategy = iota RetryStrategySkip )
type SerializationError ¶
func NewSerializationError ¶
func NewSerializationError(event Event, err error) *SerializationError
func (*SerializationError) Error ¶
func (e *SerializationError) Error() string
func (*SerializationError) Unwrap ¶
func (e *SerializationError) Unwrap() error
type Serializer ¶
type Serializer interface { // Marshal serializes an Event into a byte slice. Marshal(event Event) ([]byte, error) // Unmarshal deserializes a byte slice into an Event. Unmarshal(data []byte, event *Event) error }
Serializer defines the interface for event serialization and deserialization.
type Stats ¶
type Stats struct { ID string State State NextEventIterator EventIterator // Next event iterator for the watcher CheckpointIterator EventIterator // Checkpoint iterator for the watcher LastProcessedSeqNum uint64 // SeqNum of the last event processed by this watcher LastProcessedEventTime time.Time // timestamp of the last processed event LastListenTime time.Time // timestamp of the last successful listen operation }
type StoreEventRequest ¶ added in v1.5.2
type StoreEventRequest struct { Operation Operation `json:"operation"` ObjectType string `json:"objectType"` Object interface{} `json:"object"` }
StoreEventRequest represents the input for creating an event.
type WatchOption ¶
type WatchOption func(*watchOptions)
WatchOption is a function type for configuring watch options
func WithAutoStart ¶ added in v1.5.2
func WithAutoStart() WatchOption
WithAutoStart enables auto-start for the watcher right after creation
func WithBatchSize ¶
func WithBatchSize(size int) WatchOption
WithBatchSize sets the number of events to fetch in each batch
func WithEphemeral ¶ added in v1.6.0
func WithEphemeral() WatchOption
WithEphemeral sets the watcher to be ephemeral (non-checkpointing)
func WithFilter ¶
func WithFilter(filter EventFilter) WatchOption
WithFilter sets the event filter for watching
func WithHandler ¶ added in v1.5.2
func WithHandler(handler EventHandler) WatchOption
WithHandler sets the event handler for watching
func WithInitialBackoff ¶
func WithInitialBackoff(backoff time.Duration) WatchOption
WithInitialBackoff sets the initial backoff duration for retries
func WithInitialEventIterator ¶
func WithInitialEventIterator(iterator EventIterator) WatchOption
WithInitialEventIterator sets the starting position for watching if no checkpoint is found
func WithMaxBackoff ¶
func WithMaxBackoff(backoff time.Duration) WatchOption
WithMaxBackoff sets the maximum backoff duration for retries
func WithMaxRetries ¶
func WithMaxRetries(maxRetries int) WatchOption
WithMaxRetries sets the maximum number of retries for event handling
func WithRetryStrategy ¶
func WithRetryStrategy(strategy RetryStrategy) WatchOption
WithRetryStrategy sets the retry strategy for event handling
type Watcher ¶
type Watcher interface { // ID returns the unique identifier for the watcher. ID() string // Stats returns the current statistics for the watcher. Stats() Stats // SetHandler sets the event handler for the watcher. Must be set before calling Start. // Will fail if the handler is already set. // Returns error if handler is nil or already configured. SetHandler(handler EventHandler) error // Start begins processing events. // Returns error if no handler configured or already running. Start(ctx context.Context) error // Stop gracefully stops the watcher. Stop(ctx context.Context) // Checkpoint saves the current progress of the watcher. Checkpoint(ctx context.Context, eventSeqNum uint64) error // SeekToOffset moves the watcher to a specific event sequence number. // Will stop and restart the watcher if running. SeekToOffset(ctx context.Context, eventSeqNum uint64) error }
Watcher represents a single event watcher.
func New ¶ added in v1.5.2
func New(ctx context.Context, id string, store EventStore, opts ...WatchOption) (Watcher, error)
New creates a new watcher with the given parameters
type WatcherError ¶
WatcherError represents an error related to a specific watcher
func NewWatcherError ¶
func NewWatcherError(watcherID string, err error) *WatcherError
func (*WatcherError) Error ¶
func (e *WatcherError) Error() string
func (*WatcherError) Unwrap ¶
func (e *WatcherError) Unwrap() error