watcher

package
v1.5.0-alpha14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

README

Watcher Library

Overview

The Watcher Library is an internal component of the Bacalhau project that provides a robust event watching and processing system. It's designed to efficiently store, retrieve, and process events. The library ensures events are stored in a durable, ordered manner, allowing for consistent and reliable event processing. It supports features like checkpointing, filtering, and long-polling, while maintaining the ability to replay events from any point in the event history.

Key Features

  1. Ordered Event Processing: Events are processed in the exact order they were created, ensuring consistency and predictability in event handling.
  2. Durability: Events are stored persistently in BoltDB, ensuring they survive system restarts or crashes.
  3. Replayability: The system allows replaying events from any point in history, facilitating data recovery, debugging, and system reconciliation.
  4. Concurrency: Multiple watchers can process events concurrently, improving system throughput.
  5. Filtering: Watchers can filter events based on object types and operations, allowing for targeted event processing.
  6. Checkpointing: Watchers can save their progress and resume from where they left off, enhancing reliability and efficiency.
  7. Long-polling: Efficient event retrieval with support for long-polling, reducing unnecessary network traffic and database queries.
  8. Garbage Collection: Automatic cleanup of old events to manage storage while maintaining the ability to replay from critical points.
  9. Flexible Event Iteration: Different types of iterators for various use cases, including the ability to start from the oldest event, the latest event, or any specific point in the event history.

Key Components

  1. Registry: Manages multiple watchers and provides methods to create and manage watchers.
  2. Watcher: Represents a single event watcher that processes events sequentially.
  3. EventStore: Responsible for storing and retrieving events, with BoltDB as the default implementation.
  4. EventHandler: Interface for handling individual events.
  5. Serializer: Handles the serialization and deserialization of events.

Core Concepts

Event

An Event represents a single occurrence in the system. It has the following properties:

  • SeqNum: A unique, sequential identifier for the event.
  • Operation: The type of operation (Create, Update, Delete).
  • ObjectType: The type of object the event relates to.
  • Object: The actual data associated with the event.
  • Timestamp: When the event occurred.
EventStore

The EventStore is responsible for persisting events and providing methods to retrieve them. It uses BoltDB as the underlying storage engine and supports features like caching, checkpointing, and garbage collection.

Registry

The Registry manages multiple watchers. It's the main entry point for components that want to subscribe to events.

Watcher

A Watcher represents a single subscriber to events. It processes events sequentially and can be configured with filters and checkpoints.

EventIterator

An EventIterator defines the starting position for reading events. There are four types of iterators:

  1. TrimHorizonIterator: Starts from the oldest available event.
  2. LatestIterator: Starts from the latest available event.
  3. AtSequenceNumberIterator: Starts at a specific sequence number.
  4. AfterSequenceNumberIterator: Starts after a specific sequence number.

Usage

Here's how you typically use the Watcher library within Bacalhau:

  1. Create an EventStore:
db, _ := bbolt.Open("events.db", 0600, nil)
store, _ := boltdb.NewEventStore(db)
  1. Create a Registry:
registry := watcher.NewRegistry(store)
  1. Implement an EventHandler:
type MyHandler struct{}

func (h *MyHandler) HandleEvent(ctx context.Context, event watcher.Event) error {
    // Process the event
    return nil
}
  1. Start watching for events:
watcher, _ := registry.Watch(ctx, "my-watcher", &MyHandler{}, 
    watcher.WithFilter(watcher.EventFilter{
        ObjectTypes: []string{"Job", "Execution"},
        Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
    }),
)
  1. Store events:
store.StoreEvent(ctx, watcher.OperationCreate, "Job", jobData)

Configuration

Watch Configuration

When creating a watcher, you can configure it with various options:

  • WithInitialEventIterator(iterator EventIterator): Sets the starting position for watching if no checkpoint is found.
  • WithFilter(filter EventFilter): Sets the event filter for watching.
  • WithBufferSize(size int): Sets the size of the event buffer.
  • WithBatchSize(size int): Sets the number of events to fetch in each batch.
  • WithInitialBackoff(backoff time.Duration): Sets the initial backoff duration for retries.
  • WithMaxBackoff(backoff time.Duration): Sets the maximum backoff duration for retries.
  • WithMaxRetries(maxRetries int): Sets the maximum number of retries for event handling.
  • WithRetryStrategy(strategy RetryStrategy): Sets the retry strategy for event handling.

Example:

watcher, err := registry.Watch(ctx, "my-watcher", &MyHandler{},
    watcher.WithInitialEventIterator(watcher.TrimHorizonIterator()),
    watcher.WithFilter(watcher.EventFilter{
        ObjectTypes: []string{"Job", "Execution"},
        Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
    }),
    watcher.WithBufferSize(1000),
    watcher.WithBatchSize(100),
    watcher.WithMaxRetries(3),
    watcher.WithRetryStrategy(watcher.RetryStrategyBlock),
)
EventStore Configuration (BoltDB)

The BoltDB EventStore can be configured with various options:

  • WithEventsBucket(name string): Sets the name of the bucket used to store events.
  • WithCheckpointBucket(name string): Sets the name of the bucket used to store checkpoints.
  • WithEventSerializer(serializer watcher.Serializer): Sets the serializer used for events.
  • WithCacheSize(size int): Sets the size of the LRU cache used to store events.
  • WithLongPollingTimeout(timeout time.Duration): Sets the timeout duration for long-polling requests.
  • WithGCAgeThreshold(threshold time.Duration): Sets the age threshold for event pruning.
  • WithGCCadence(cadence time.Duration): Sets the interval at which garbage collection runs.
  • WithGCMaxRecordsPerRun(max int): Sets the maximum number of records to process in a single GC run.
  • WithGCMaxDuration(duration time.Duration): Sets the maximum duration for a single GC run.

Example:

store, err := boltdb.NewEventStore(db,
    boltdb.WithEventsBucket("myEvents"),
    boltdb.WithCheckpointBucket("myCheckpoints"),
    boltdb.WithCacheSize(1000),
    boltdb.WithLongPollingTimeout(10*time.Second),
)

Best Practices

  1. Use meaningful watcher IDs to easily identify different components subscribing to events.
  2. Implement error handling in your EventHandler to ensure robust event processing.
  3. Use appropriate filters to minimize unnecessary event processing.
  4. Regularly checkpoint your watchers to enable efficient restarts.
  5. Monitor watcher stats to ensure they're keeping up with event volume.

Troubleshooting

  1. If a watcher is falling behind, consider increasing the batch size or optimizing the event handling logic.
  2. For performance issues, check the BoltDB file size and consider tuning the garbage collection parameters.

Future Improvements

  1. Enhanced monitoring and metrics.

Documentation

Overview

Package watcher is a generated GoMock package.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrWatcherAlreadyExists = errors.New("watcher already exists")
	ErrWatcherNotFound      = errors.New("watcher not found")
	ErrCheckpointNotFound   = errors.New("checkpoint not found")
)

Functions

This section is empty.

Types

type CheckpointError

type CheckpointError struct {
	WatcherID string
	Err       error
}

CheckpointError represents an error related to checkpointing

func NewCheckpointError

func NewCheckpointError(watcherID string, err error) *CheckpointError

NewCheckpointError creates a new CheckpointError

func (*CheckpointError) Error

func (e *CheckpointError) Error() string

func (*CheckpointError) Unwrap

func (e *CheckpointError) Unwrap() error

type DeserializationError

type DeserializationError struct {
	Err error
}

func NewDeserializationError

func NewDeserializationError(err error) *DeserializationError

func (*DeserializationError) Error

func (e *DeserializationError) Error() string

func (*DeserializationError) Unwrap

func (e *DeserializationError) Unwrap() error

type Event

type Event struct {
	SeqNum     uint64      `json:"seqNum"`
	Operation  Operation   `json:"operation"`
	ObjectType string      `json:"objectType"`
	Object     interface{} `json:"object"`
	Timestamp  time.Time   `json:"timestamp"`
}

Event represents a single event in the event store.

type EventFilter

type EventFilter struct {
	// ObjectTypes is a list of object types to include in the filter.
	ObjectTypes []string

	// Operations is a list of operations to include in the filter.
	Operations []Operation
}

EventFilter defines criteria for filtering events.

type EventHandler

type EventHandler interface {
	// HandleEvent processes a single event.
	// It returns an error if the event processing fails.
	HandleEvent(ctx context.Context, event Event) error
}

EventHandler is an interface for handling events.

type EventHandlingError

type EventHandlingError struct {
	WatcherID string
	EventID   uint64
	Err       error
}

EventHandlingError represents an error that occurred during event processing

func NewEventHandlingError

func NewEventHandlingError(watcherID string, eventID uint64, err error) *EventHandlingError

NewEventHandlingError creates a new EventHandlingError

func (*EventHandlingError) Error

func (e *EventHandlingError) Error() string

func (*EventHandlingError) Unwrap

func (e *EventHandlingError) Unwrap() error

type EventIterator

type EventIterator struct {
	Type           EventIteratorType
	SequenceNumber uint64
}

EventIterator defines the starting position for reading events.

func AfterSequenceNumberIterator

func AfterSequenceNumberIterator(seqNum uint64) EventIterator

AfterSequenceNumberIterator creates an EventIterator that starts after a specific sequence number.

func AtSequenceNumberIterator

func AtSequenceNumberIterator(seqNum uint64) EventIterator

AtSequenceNumberIterator creates an EventIterator that starts at a specific sequence number.

func LatestIterator

func LatestIterator() EventIterator

LatestIterator creates an EventIterator that starts from the latest available event.

func TrimHorizonIterator

func TrimHorizonIterator() EventIterator

TrimHorizonIterator creates an EventIterator that starts from the oldest available event.

func (EventIterator) String

func (sp EventIterator) String() string

String returns a string representation of the EventIterator.

type EventIteratorType

type EventIteratorType int

EventIteratorType defines the type of starting position for reading events.

const (
	// EventIteratorTrimHorizon specifies that events should be read from the oldest available event.
	EventIteratorTrimHorizon EventIteratorType = iota
	// EventIteratorLatest specifies that events should be read from the latest available event.
	EventIteratorLatest
	// EventIteratorAtSequenceNumber specifies that events should be read starting from a specific sequence number.
	EventIteratorAtSequenceNumber
	// EventIteratorAfterSequenceNumber specifies that events should be read starting after a specific sequence number.
	EventIteratorAfterSequenceNumber
)

type EventStore

type EventStore interface {
	// StoreEvent stores a new event in the event store.
	StoreEvent(ctx context.Context, operation Operation, objectType string, object interface{}) error

	// GetEvents retrieves events based on the provided query parameters.
	GetEvents(ctx context.Context, params GetEventsRequest) (*GetEventsResponse, error)

	// GetLatestEventNum returns the sequence number of the latest event.
	GetLatestEventNum(ctx context.Context) (uint64, error)

	// StoreCheckpoint saves a checkpoint for a specific watcher.
	StoreCheckpoint(ctx context.Context, watcherID string, eventSeqNum uint64) error

	// GetCheckpoint retrieves the checkpoint for a specific watcher.
	GetCheckpoint(ctx context.Context, watcherID string) (uint64, error)

	// Close closes the event store.
	Close(ctx context.Context) error
}

EventStore defines the interface for event storage and retrieval.

type GetEventsRequest

type GetEventsRequest struct {
	// EventIterator specifies where to start reading events.
	EventIterator EventIterator

	// Limit is the maximum number of events to return.
	Limit int

	// Filter specifies criteria for filtering events.
	Filter EventFilter
}

GetEventsRequest defines parameters for querying events.

type GetEventsResponse

type GetEventsResponse struct {
	Events            []Event
	NextEventIterator EventIterator
}

type JSONSerializer

type JSONSerializer struct {
	// contains filtered or unexported fields
}

JSONSerializer implements the Serializer interface for JSON serialization

func NewJSONSerializer

func NewJSONSerializer() *JSONSerializer

NewJSONSerializer creates a new instance of JSONSerializer

func (*JSONSerializer) IsTypeRegistered

func (s *JSONSerializer) IsTypeRegistered(name string) bool

IsTypeRegistered checks if a type is registered in the serializer's type registry

func (*JSONSerializer) Marshal

func (s *JSONSerializer) Marshal(v Event) ([]byte, error)

Marshal serializes an Event into a byte slice

func (*JSONSerializer) RegisterType

func (s *JSONSerializer) RegisterType(name string, t reflect.Type) error

RegisterType adds a new type to the serializer's type registry It returns an error if the type is already registered or if the provided type is invalid

func (*JSONSerializer) Unmarshal

func (s *JSONSerializer) Unmarshal(data []byte, event *Event) error

Unmarshal deserializes a byte slice into an Event

type MockEventHandler

type MockEventHandler struct {
	// contains filtered or unexported fields
}

MockEventHandler is a mock of EventHandler interface.

func NewMockEventHandler

func NewMockEventHandler(ctrl *gomock.Controller) *MockEventHandler

NewMockEventHandler creates a new mock instance.

func (*MockEventHandler) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockEventHandler) HandleEvent

func (m *MockEventHandler) HandleEvent(ctx context.Context, event Event) error

HandleEvent mocks base method.

type MockEventHandlerMockRecorder

type MockEventHandlerMockRecorder struct {
	// contains filtered or unexported fields
}

MockEventHandlerMockRecorder is the mock recorder for MockEventHandler.

func (*MockEventHandlerMockRecorder) HandleEvent

func (mr *MockEventHandlerMockRecorder) HandleEvent(ctx, event interface{}) *gomock.Call

HandleEvent indicates an expected call of HandleEvent.

type MockEventStore

type MockEventStore struct {
	// contains filtered or unexported fields
}

MockEventStore is a mock of EventStore interface.

func NewMockEventStore

func NewMockEventStore(ctrl *gomock.Controller) *MockEventStore

NewMockEventStore creates a new mock instance.

func (*MockEventStore) Close

func (m *MockEventStore) Close(ctx context.Context) error

Close mocks base method.

func (*MockEventStore) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockEventStore) GetCheckpoint

func (m *MockEventStore) GetCheckpoint(ctx context.Context, watcherID string) (uint64, error)

GetCheckpoint mocks base method.

func (*MockEventStore) GetEvents

func (m *MockEventStore) GetEvents(ctx context.Context, params GetEventsRequest) (*GetEventsResponse, error)

GetEvents mocks base method.

func (*MockEventStore) GetLatestEventNum

func (m *MockEventStore) GetLatestEventNum(ctx context.Context) (uint64, error)

GetLatestEventNum mocks base method.

func (*MockEventStore) StoreCheckpoint

func (m *MockEventStore) StoreCheckpoint(ctx context.Context, watcherID string, eventSeqNum uint64) error

StoreCheckpoint mocks base method.

func (*MockEventStore) StoreEvent

func (m *MockEventStore) StoreEvent(ctx context.Context, operation Operation, objectType string, object interface{}) error

StoreEvent mocks base method.

type MockEventStoreMockRecorder

type MockEventStoreMockRecorder struct {
	// contains filtered or unexported fields
}

MockEventStoreMockRecorder is the mock recorder for MockEventStore.

func (*MockEventStoreMockRecorder) Close

func (mr *MockEventStoreMockRecorder) Close(ctx interface{}) *gomock.Call

Close indicates an expected call of Close.

func (*MockEventStoreMockRecorder) GetCheckpoint

func (mr *MockEventStoreMockRecorder) GetCheckpoint(ctx, watcherID interface{}) *gomock.Call

GetCheckpoint indicates an expected call of GetCheckpoint.

func (*MockEventStoreMockRecorder) GetEvents

func (mr *MockEventStoreMockRecorder) GetEvents(ctx, params interface{}) *gomock.Call

GetEvents indicates an expected call of GetEvents.

func (*MockEventStoreMockRecorder) GetLatestEventNum

func (mr *MockEventStoreMockRecorder) GetLatestEventNum(ctx interface{}) *gomock.Call

GetLatestEventNum indicates an expected call of GetLatestEventNum.

func (*MockEventStoreMockRecorder) StoreCheckpoint

func (mr *MockEventStoreMockRecorder) StoreCheckpoint(ctx, watcherID, eventSeqNum interface{}) *gomock.Call

StoreCheckpoint indicates an expected call of StoreCheckpoint.

func (*MockEventStoreMockRecorder) StoreEvent

func (mr *MockEventStoreMockRecorder) StoreEvent(ctx, operation, objectType, object interface{}) *gomock.Call

StoreEvent indicates an expected call of StoreEvent.

type MockRegistry

type MockRegistry struct {
	// contains filtered or unexported fields
}

MockRegistry is a mock of Registry interface.

func NewMockRegistry

func NewMockRegistry(ctrl *gomock.Controller) *MockRegistry

NewMockRegistry creates a new mock instance.

func (*MockRegistry) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockRegistry) GetWatcher

func (m *MockRegistry) GetWatcher(watcherID string) (Watcher, error)

GetWatcher mocks base method.

func (*MockRegistry) Stop

func (m *MockRegistry) Stop(ctx context.Context) error

Stop mocks base method.

func (*MockRegistry) Watch

func (m *MockRegistry) Watch(ctx context.Context, watcherID string, handler EventHandler, opts ...WatchOption) (Watcher, error)

Watch mocks base method.

type MockRegistryMockRecorder

type MockRegistryMockRecorder struct {
	// contains filtered or unexported fields
}

MockRegistryMockRecorder is the mock recorder for MockRegistry.

func (*MockRegistryMockRecorder) GetWatcher

func (mr *MockRegistryMockRecorder) GetWatcher(watcherID interface{}) *gomock.Call

GetWatcher indicates an expected call of GetWatcher.

func (*MockRegistryMockRecorder) Stop

func (mr *MockRegistryMockRecorder) Stop(ctx interface{}) *gomock.Call

Stop indicates an expected call of Stop.

func (*MockRegistryMockRecorder) Watch

func (mr *MockRegistryMockRecorder) Watch(ctx, watcherID, handler interface{}, opts ...interface{}) *gomock.Call

Watch indicates an expected call of Watch.

type MockSerializer

type MockSerializer struct {
	// contains filtered or unexported fields
}

MockSerializer is a mock of Serializer interface.

func NewMockSerializer

func NewMockSerializer(ctrl *gomock.Controller) *MockSerializer

NewMockSerializer creates a new mock instance.

func (*MockSerializer) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockSerializer) Marshal

func (m *MockSerializer) Marshal(event Event) ([]byte, error)

Marshal mocks base method.

func (*MockSerializer) Unmarshal

func (m *MockSerializer) Unmarshal(data []byte, event *Event) error

Unmarshal mocks base method.

type MockSerializerMockRecorder

type MockSerializerMockRecorder struct {
	// contains filtered or unexported fields
}

MockSerializerMockRecorder is the mock recorder for MockSerializer.

func (*MockSerializerMockRecorder) Marshal

func (mr *MockSerializerMockRecorder) Marshal(event interface{}) *gomock.Call

Marshal indicates an expected call of Marshal.

func (*MockSerializerMockRecorder) Unmarshal

func (mr *MockSerializerMockRecorder) Unmarshal(data, event interface{}) *gomock.Call

Unmarshal indicates an expected call of Unmarshal.

type MockWatcher

type MockWatcher struct {
	// contains filtered or unexported fields
}

MockWatcher is a mock of Watcher interface.

func NewMockWatcher

func NewMockWatcher(ctrl *gomock.Controller) *MockWatcher

NewMockWatcher creates a new mock instance.

func (*MockWatcher) Checkpoint

func (m *MockWatcher) Checkpoint(ctx context.Context, eventSeqNum uint64) error

Checkpoint mocks base method.

func (*MockWatcher) EXPECT

func (m *MockWatcher) EXPECT() *MockWatcherMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockWatcher) ID

func (m *MockWatcher) ID() string

ID mocks base method.

func (*MockWatcher) SeekToOffset

func (m *MockWatcher) SeekToOffset(ctx context.Context, eventSeqNum uint64) error

SeekToOffset mocks base method.

func (*MockWatcher) Stats

func (m *MockWatcher) Stats() Stats

Stats mocks base method.

func (*MockWatcher) Stop

func (m *MockWatcher) Stop(ctx context.Context)

Stop mocks base method.

type MockWatcherMockRecorder

type MockWatcherMockRecorder struct {
	// contains filtered or unexported fields
}

MockWatcherMockRecorder is the mock recorder for MockWatcher.

func (*MockWatcherMockRecorder) Checkpoint

func (mr *MockWatcherMockRecorder) Checkpoint(ctx, eventSeqNum interface{}) *gomock.Call

Checkpoint indicates an expected call of Checkpoint.

func (*MockWatcherMockRecorder) ID

ID indicates an expected call of ID.

func (*MockWatcherMockRecorder) SeekToOffset

func (mr *MockWatcherMockRecorder) SeekToOffset(ctx, eventSeqNum interface{}) *gomock.Call

SeekToOffset indicates an expected call of SeekToOffset.

func (*MockWatcherMockRecorder) Stats

func (mr *MockWatcherMockRecorder) Stats() *gomock.Call

Stats indicates an expected call of Stats.

func (*MockWatcherMockRecorder) Stop

func (mr *MockWatcherMockRecorder) Stop(ctx interface{}) *gomock.Call

Stop indicates an expected call of Stop.

type Operation

type Operation string

Operation represents the type of operation performed in an event.

const (
	// OperationCreate represents a creation event.
	OperationCreate Operation = "CREATE"
	// OperationUpdate represents an update event.
	OperationUpdate Operation = "UPDATE"
	// OperationDelete represents a deletion event.
	OperationDelete Operation = "DELETE"
)

type Registry

type Registry interface {
	// Watch starts watching for events with the given options.
	// It returns a Watcher that can be used to receive events.
	Watch(ctx context.Context, watcherID string, handler EventHandler, opts ...WatchOption) (Watcher, error)

	// GetWatcher retrieves an existing watcher by its ID.
	GetWatcher(watcherID string) (Watcher, error)

	// Stop gracefully shuts down the registry and all its watchers.
	Stop(ctx context.Context) error
}

Registry manages multiple event watchers and provides methods to watch for events.

func NewRegistry

func NewRegistry(store EventStore) Registry

NewRegistry creates a new Registry with the given EventStore.

Example usage:

store := // initialize your event store
registry := NewRegistry(store)
defer registry.Stop(context.Background())

watcher, err := registry.Watch(context.Background(), "myWatcher", myEventHandler)
if err != nil {
    // handle error
}

type RetryStrategy

type RetryStrategy int
const (
	RetryStrategyBlock RetryStrategy = iota
	RetryStrategySkip
)

type SerializationError

type SerializationError struct {
	Event Event
	Err   error
}

func NewSerializationError

func NewSerializationError(event Event, err error) *SerializationError

func (*SerializationError) Error

func (e *SerializationError) Error() string

func (*SerializationError) Unwrap

func (e *SerializationError) Unwrap() error

type Serializer

type Serializer interface {
	// Marshal serializes an Event into a byte slice.
	Marshal(event Event) ([]byte, error)

	// Unmarshal deserializes a byte slice into an Event.
	Unmarshal(data []byte, event *Event) error
}

Serializer defines the interface for event serialization and deserialization.

type State

type State string
const (
	StateIdle     State = "idle"
	StateRunning  State = "running"
	StateStopping State = "stopping"
	StateStopped  State = "stopped"
)

type Stats

type Stats struct {
	ID                     string
	State                  State
	NextEventIterator      EventIterator // Next event iterator for the watcher
	LastProcessedSeqNum    uint64        // SeqNum of the last event processed by this watcher
	LastProcessedEventTime time.Time     // timestamp of the last processed event
	LastListenTime         time.Time     // timestamp of the last successful listen operation
}

type WatchOption

type WatchOption func(*watchOptions)

WatchOption is a function type for configuring watch options

func WithBatchSize

func WithBatchSize(size int) WatchOption

WithBatchSize sets the number of events to fetch in each batch

func WithBufferSize

func WithBufferSize(size int) WatchOption

WithBufferSize sets the size of the event buffer

func WithFilter

func WithFilter(filter EventFilter) WatchOption

WithFilter sets the event filter for watching

func WithInitialBackoff

func WithInitialBackoff(backoff time.Duration) WatchOption

WithInitialBackoff sets the initial backoff duration for retries

func WithInitialEventIterator

func WithInitialEventIterator(iterator EventIterator) WatchOption

WithInitialEventIterator sets the starting position for watching if no checkpoint is found

func WithMaxBackoff

func WithMaxBackoff(backoff time.Duration) WatchOption

WithMaxBackoff sets the maximum backoff duration for retries

func WithMaxRetries

func WithMaxRetries(maxRetries int) WatchOption

WithMaxRetries sets the maximum number of retries for event handling

func WithRetryStrategy

func WithRetryStrategy(strategy RetryStrategy) WatchOption

WithRetryStrategy sets the retry strategy for event handling

type Watcher

type Watcher interface {
	// ID returns the unique identifier for the watcher.
	ID() string

	// Stats returns the current statistics for the watcher.
	Stats() Stats

	// Stop gracefully stops the watcher.
	Stop(ctx context.Context)

	// Checkpoint saves the current progress of the watcher.
	Checkpoint(ctx context.Context, eventSeqNum uint64) error

	// SeekToOffset moves the watcher to a specific event sequence number.
	SeekToOffset(ctx context.Context, eventSeqNum uint64) error
}

Watcher represents a single event watcher.

type WatcherError

type WatcherError struct {
	WatcherID string
	Err       error
}

WatcherError represents an error related to a specific watcher

func NewWatcherError

func NewWatcherError(watcherID string, err error) *WatcherError

func (*WatcherError) Error

func (e *WatcherError) Error() string

func (*WatcherError) Unwrap

func (e *WatcherError) Unwrap() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL