watcher

package
v1.6.0-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

README

Watcher Library

Overview

The Watcher Library is an internal component of the Bacalhau project that provides a robust event watching and processing system. It's designed to efficiently store, retrieve, and process events. The library ensures events are stored in a durable, ordered manner, allowing for consistent and reliable event processing. It supports features like checkpointing, filtering, and long-polling, while maintaining the ability to replay events from any point in the event history.

Key Features

  1. Ordered Event Processing: Events are processed in the exact order they were created, ensuring consistency and predictability in event handling.
  2. Durability: Events are stored persistently in BoltDB, ensuring they survive system restarts or crashes.
  3. Replayability: The system allows replaying events from any point in history, facilitating data recovery, debugging, and system reconciliation.
  4. Concurrency: Multiple watchers can process events concurrently, improving system throughput.
  5. Filtering: Watchers can filter events based on object types and operations, allowing for targeted event processing.
  6. Checkpointing: Watchers can save their progress and resume from where they left off, enhancing reliability and efficiency.
  7. Long-polling: Efficient event retrieval with support for long-polling, reducing unnecessary network traffic and database queries.
  8. Garbage Collection: Automatic cleanup of old events to manage storage while maintaining the ability to replay from critical points.
  9. Flexible Event Iteration: Different types of iterators for various use cases, including the ability to start from the oldest event, the latest event, or any specific point in the event history.

Key Components

  1. Manager: Manages multiple watchers and provides methods to create, lookup, and stop watchers.
  2. Watcher: Represents a single event watcher that processes events sequentially.
  3. EventStore: Responsible for storing and retrieving events, with BoltDB as the default implementation.
  4. EventHandler: Interface for handling individual events.
  5. Serializer: Handles the serialization and deserialization of events.

Core Concepts

Event

An Event represents a single occurrence in the system. It has the following properties:

  • SeqNum: A unique, sequential identifier for the event.
  • Operation: The type of operation (Create, Update, Delete).
  • ObjectType: The type of object the event relates to.
  • Object: The actual data associated with the event.
  • Timestamp: When the event occurred.
EventStore

The EventStore is responsible for persisting events and providing methods to retrieve them. It uses BoltDB as the underlying storage engine and supports features like caching, checkpointing, and garbage collection.

Manager

The Manager manages multiple watchers and provides methods to create, lookup, and stop watchers.

Watcher

A Watcher represents a single subscriber to events. It processes events sequentially and can be configured with filters and checkpoints.

EventIterator

An EventIterator defines the starting position for reading events. There are four types of iterators:

  1. TrimHorizonIterator: Starts from the oldest available event.
  2. LatestIterator: Starts from the latest available event.
  3. AtSequenceNumberIterator: Starts at a specific sequence number.
  4. AfterSequenceNumberIterator: Starts after a specific sequence number.

Usage

Here's how you typically use the Watcher library within Bacalhau:

  1. Create an EventStore:
db, _ := bbolt.Open("events.db", 0600, nil)
store, _ := boltdb.NewEventStore(db)
  1. Create a manager:
manager := watcher.NewManager(store)
  1. Implement an EventHandler:
type MyHandler struct{}

func (h *MyHandler) HandleEvent(ctx context.Context, event watcher.Event) error {
    // Process the event
    return nil
}
  1. Create a watcher and set handler:

There are two main approaches to create and configure a watcher with a handler:

a. Two-Step Creation (Handler After Creation):

// Create watcher
w, _ := manager.Create(ctx, "my-watcher", 
    watcher.WithFilter(watcher.EventFilter{
        ObjectTypes: []string{"Job", "Execution"},
        Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
    }),
)

// Set handler
err = w.SetHandler(&MyHandler{})

// Start watching
err = w.Start(ctx)

b. One-Step Creation (With Auto-Start):

w, _ := manager.Create(ctx, "my-watcher",
    watcher.WithHandler(&MyHandler{}),
    watcher.WithAutoStart(),
    watcher.WithFilter(watcher.EventFilter{
        ObjectTypes: []string{"Job", "Execution"},
        Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
    }),
)
  1. Store events:
store.StoreEvent(ctx, watcher.OperationCreate, "Job", jobData)

Configuration

Watch Configuration

When creating a watcher, you can configure it with various options:

  • WithInitialEventIterator(iterator EventIterator): Sets the starting position for watching if no checkpoint is found.
  • WithHandler(handler EventHandler): Sets the event handler for the watcher.
  • WithAutoStart(): Enables automatic start of the watcher after creation.
  • WithFilter(filter EventFilter): Sets the event filter for watching.
  • WithBufferSize(size int): Sets the size of the event buffer.
  • WithBatchSize(size int): Sets the number of events to fetch in each batch.
  • WithInitialBackoff(backoff time.Duration): Sets the initial backoff duration for retries.
  • WithMaxBackoff(backoff time.Duration): Sets the maximum backoff duration for retries.
  • WithMaxRetries(maxRetries int): Sets the maximum number of retries for event handling.
  • WithRetryStrategy(strategy RetryStrategy): Sets the retry strategy for event handling.

Example:

w, err := manager.Create(ctx, "my-watcher",
    watcher.WithInitialEventIterator(watcher.TrimHorizonIterator()),
    watcher.WithHandler(&MyHandler{}),
    watcher.WithAutoStart(),
    watcher.WithFilter(watcher.EventFilter{
        ObjectTypes: []string{"Job", "Execution"},
        Operations: []watcher.Operation{watcher.OperationCreate, watcher.OperationUpdate},
    }),
    watcher.WithBufferSize(1000),
    watcher.WithBatchSize(100),
    watcher.WithMaxRetries(3),
    watcher.WithRetryStrategy(watcher.RetryStrategyBlock),
)
EventStore Configuration (BoltDB)

The BoltDB EventStore can be configured with various options:

  • WithEventsBucket(name string): Sets the name of the bucket used to store events.
  • WithCheckpointBucket(name string): Sets the name of the bucket used to store checkpoints.
  • WithEventSerializer(serializer watcher.Serializer): Sets the serializer used for events.
  • WithCacheSize(size int): Sets the size of the LRU cache used to store events.
  • WithLongPollingTimeout(timeout time.Duration): Sets the timeout duration for long-polling requests.
  • WithGCAgeThreshold(threshold time.Duration): Sets the age threshold for event pruning.
  • WithGCCadence(cadence time.Duration): Sets the interval at which garbage collection runs.
  • WithGCMaxRecordsPerRun(max int): Sets the maximum number of records to process in a single GC run.
  • WithGCMaxDuration(duration time.Duration): Sets the maximum duration for a single GC run.

Example:

store, err := boltdb.NewEventStore(db,
    boltdb.WithEventsBucket("myEvents"),
    boltdb.WithCheckpointBucket("myCheckpoints"),
    boltdb.WithCacheSize(1000),
    boltdb.WithLongPollingTimeout(10*time.Second),
)

Best Practices

  1. Use meaningful watcher IDs to easily identify different components subscribing to events.
  2. Implement error handling in your EventHandler to ensure robust event processing.
  3. Use appropriate filters to minimize unnecessary event processing.
  4. Regularly checkpoint your watchers to enable efficient restarts.
  5. Monitor watcher stats to ensure they're keeping up with event volume.

Troubleshooting

  1. If a watcher is falling behind, consider increasing the batch size or optimizing the event handling logic.
  2. For performance issues, check the BoltDB file size and consider tuning the garbage collection parameters.

Future Improvements

  1. Enhanced monitoring and metrics.

Documentation

Overview

Package watcher is a generated GoMock package.

Index

Constants

View Source
const (
	DefaultShutdownTimeout = 30 * time.Second
)

Variables

View Source
var (
	ErrWatcherAlreadyExists = errors.New("watcher already exists")
	ErrWatcherNotFound      = errors.New("watcher not found")
	ErrCheckpointNotFound   = errors.New("checkpoint not found")
	ErrNoHandler            = errors.New("no handler configured")
	ErrHandlerExists        = errors.New("handler already exists")
)

Functions

This section is empty.

Types

type CheckpointError

type CheckpointError struct {
	WatcherID string
	Err       error
}

CheckpointError represents an error related to checkpointing

func NewCheckpointError

func NewCheckpointError(watcherID string, err error) *CheckpointError

NewCheckpointError creates a new CheckpointError

func (*CheckpointError) Error

func (e *CheckpointError) Error() string

func (*CheckpointError) Unwrap

func (e *CheckpointError) Unwrap() error

type DeserializationError

type DeserializationError struct {
	Err error
}

func NewDeserializationError

func NewDeserializationError(err error) *DeserializationError

func (*DeserializationError) Error

func (e *DeserializationError) Error() string

func (*DeserializationError) Unwrap

func (e *DeserializationError) Unwrap() error

type Event

type Event struct {
	SeqNum     uint64      `json:"seqNum"`
	Operation  Operation   `json:"operation"`
	ObjectType string      `json:"objectType"`
	Object     interface{} `json:"object"`
	Timestamp  time.Time   `json:"timestamp"`
}

Event represents a single event in the event store.

type EventFilter

type EventFilter struct {
	// ObjectTypes is a list of object types to include in the filter.
	ObjectTypes []string

	// Operations is a list of operations to include in the filter.
	Operations []Operation
}

EventFilter defines criteria for filtering events.

type EventHandler

type EventHandler interface {
	// HandleEvent processes a single event.
	// It returns an error if the event processing fails.
	// Implementations MUST honor context cancellation and return immediately when ctx.Done()
	HandleEvent(ctx context.Context, event Event) error
}

EventHandler is an interface for handling events.

type EventHandlingError

type EventHandlingError struct {
	WatcherID string
	EventID   uint64
	Err       error
}

EventHandlingError represents an error that occurred during event processing

func NewEventHandlingError

func NewEventHandlingError(watcherID string, eventID uint64, err error) *EventHandlingError

NewEventHandlingError creates a new EventHandlingError

func (*EventHandlingError) Error

func (e *EventHandlingError) Error() string

func (*EventHandlingError) Unwrap

func (e *EventHandlingError) Unwrap() error

type EventIterator

type EventIterator struct {
	Type           EventIteratorType
	SequenceNumber uint64
}

EventIterator defines the starting position for reading events.

func AfterSequenceNumberIterator

func AfterSequenceNumberIterator(seqNum uint64) EventIterator

AfterSequenceNumberIterator creates an EventIterator that starts after a specific sequence number.

func AtSequenceNumberIterator

func AtSequenceNumberIterator(seqNum uint64) EventIterator

AtSequenceNumberIterator creates an EventIterator that starts at a specific sequence number.

func LatestIterator

func LatestIterator() EventIterator

LatestIterator creates an EventIterator that starts from the latest available event.

func TrimHorizonIterator

func TrimHorizonIterator() EventIterator

TrimHorizonIterator creates an EventIterator that starts from the oldest available event.

func (EventIterator) String

func (sp EventIterator) String() string

String returns a string representation of the EventIterator.

type EventIteratorType

type EventIteratorType int

EventIteratorType defines the type of starting position for reading events.

const (
	// EventIteratorTrimHorizon specifies that events should be read from the oldest available event.
	EventIteratorTrimHorizon EventIteratorType = iota
	// EventIteratorLatest specifies that events should be read from the latest available event.
	EventIteratorLatest
	// EventIteratorAtSequenceNumber specifies that events should be read starting from a specific sequence number.
	EventIteratorAtSequenceNumber
	// EventIteratorAfterSequenceNumber specifies that events should be read starting after a specific sequence number.
	EventIteratorAfterSequenceNumber
)

type EventStore

type EventStore interface {
	// StoreEvent stores a new event in the event store.
	StoreEvent(ctx context.Context, request StoreEventRequest) error

	// GetEvents retrieves events based on the provided query parameters.
	GetEvents(ctx context.Context, request GetEventsRequest) (*GetEventsResponse, error)

	// GetLatestEventNum returns the sequence number of the latest event.
	GetLatestEventNum(ctx context.Context) (uint64, error)

	// StoreCheckpoint saves a checkpoint for a specific watcher.
	StoreCheckpoint(ctx context.Context, watcherID string, eventSeqNum uint64) error

	// GetCheckpoint retrieves the checkpoint for a specific watcher.
	GetCheckpoint(ctx context.Context, watcherID string) (uint64, error)

	// Close closes the event store.
	Close(ctx context.Context) error
}

EventStore defines the interface for event storage and retrieval.

type GetEventsRequest

type GetEventsRequest struct {
	// WatcherID optionally specifies the watcher ID for logging and tracking purposes.
	WatcherID string

	// EventIterator specifies where to start reading events.
	EventIterator EventIterator

	// Limit is the maximum number of events to return.
	Limit int

	// Filter specifies criteria for filtering events.
	Filter EventFilter
}

GetEventsRequest defines parameters for querying events.

type GetEventsResponse

type GetEventsResponse struct {
	Events            []Event
	NextEventIterator EventIterator
}

type JSONSerializer

type JSONSerializer struct {
	// contains filtered or unexported fields
}

JSONSerializer implements the Serializer interface for JSON serialization

func NewJSONSerializer

func NewJSONSerializer() *JSONSerializer

NewJSONSerializer creates a new instance of JSONSerializer

func (*JSONSerializer) IsTypeRegistered

func (s *JSONSerializer) IsTypeRegistered(name string) bool

IsTypeRegistered checks if a type is registered in the serializer's type manager

func (*JSONSerializer) Marshal

func (s *JSONSerializer) Marshal(v Event) ([]byte, error)

Marshal serializes an Event into a byte slice

func (*JSONSerializer) RegisterType

func (s *JSONSerializer) RegisterType(name string, t reflect.Type) error

RegisterType adds a new type to the serializer's type manager It returns an error if the type is already registered or if the provided type is invalid

func (*JSONSerializer) Unmarshal

func (s *JSONSerializer) Unmarshal(data []byte, event *Event) error

Unmarshal deserializes a byte slice into an Event

type Manager added in v1.5.2

type Manager interface {
	// Create creates a new not-started watcher with the given ID and options.
	// The watcher must be configured with a handler before it can start watching.
	Create(ctx context.Context, watcherID string, opts ...WatchOption) (Watcher, error)

	// Lookup retrieves an existing watcher by its ID.
	Lookup(ctx context.Context, watcherID string) (Watcher, error)

	// Stop gracefully shuts down the manager and all its watchers.
	Stop(ctx context.Context) error
}

Manager handles lifecycle of multiple watchers with shared resources

func NewManager added in v1.5.2

func NewManager(store EventStore) Manager

NewManager creates a new Manager with the given EventStore.

Example usage:

store := // initialize your event store
manager := NewManager(store)
defer manager.Stop(context.Background())

watcher, err := manager.Create(context.Background(), "myWatcher")
if err != nil {
    // handle error
}

type MockEventHandler

type MockEventHandler struct {
	// contains filtered or unexported fields
}

MockEventHandler is a mock of EventHandler interface.

func NewMockEventHandler

func NewMockEventHandler(ctrl *gomock.Controller) *MockEventHandler

NewMockEventHandler creates a new mock instance.

func (*MockEventHandler) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockEventHandler) HandleEvent

func (m *MockEventHandler) HandleEvent(ctx context.Context, event Event) error

HandleEvent mocks base method.

type MockEventHandlerMockRecorder

type MockEventHandlerMockRecorder struct {
	// contains filtered or unexported fields
}

MockEventHandlerMockRecorder is the mock recorder for MockEventHandler.

func (*MockEventHandlerMockRecorder) HandleEvent

func (mr *MockEventHandlerMockRecorder) HandleEvent(ctx, event interface{}) *gomock.Call

HandleEvent indicates an expected call of HandleEvent.

type MockEventStore

type MockEventStore struct {
	// contains filtered or unexported fields
}

MockEventStore is a mock of EventStore interface.

func NewMockEventStore

func NewMockEventStore(ctrl *gomock.Controller) *MockEventStore

NewMockEventStore creates a new mock instance.

func (*MockEventStore) Close

func (m *MockEventStore) Close(ctx context.Context) error

Close mocks base method.

func (*MockEventStore) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockEventStore) GetCheckpoint

func (m *MockEventStore) GetCheckpoint(ctx context.Context, watcherID string) (uint64, error)

GetCheckpoint mocks base method.

func (*MockEventStore) GetEvents

func (m *MockEventStore) GetEvents(ctx context.Context, request GetEventsRequest) (*GetEventsResponse, error)

GetEvents mocks base method.

func (*MockEventStore) GetLatestEventNum

func (m *MockEventStore) GetLatestEventNum(ctx context.Context) (uint64, error)

GetLatestEventNum mocks base method.

func (*MockEventStore) StoreCheckpoint

func (m *MockEventStore) StoreCheckpoint(ctx context.Context, watcherID string, eventSeqNum uint64) error

StoreCheckpoint mocks base method.

func (*MockEventStore) StoreEvent

func (m *MockEventStore) StoreEvent(ctx context.Context, request StoreEventRequest) error

StoreEvent mocks base method.

type MockEventStoreMockRecorder

type MockEventStoreMockRecorder struct {
	// contains filtered or unexported fields
}

MockEventStoreMockRecorder is the mock recorder for MockEventStore.

func (*MockEventStoreMockRecorder) Close

func (mr *MockEventStoreMockRecorder) Close(ctx interface{}) *gomock.Call

Close indicates an expected call of Close.

func (*MockEventStoreMockRecorder) GetCheckpoint

func (mr *MockEventStoreMockRecorder) GetCheckpoint(ctx, watcherID interface{}) *gomock.Call

GetCheckpoint indicates an expected call of GetCheckpoint.

func (*MockEventStoreMockRecorder) GetEvents

func (mr *MockEventStoreMockRecorder) GetEvents(ctx, request interface{}) *gomock.Call

GetEvents indicates an expected call of GetEvents.

func (*MockEventStoreMockRecorder) GetLatestEventNum

func (mr *MockEventStoreMockRecorder) GetLatestEventNum(ctx interface{}) *gomock.Call

GetLatestEventNum indicates an expected call of GetLatestEventNum.

func (*MockEventStoreMockRecorder) StoreCheckpoint

func (mr *MockEventStoreMockRecorder) StoreCheckpoint(ctx, watcherID, eventSeqNum interface{}) *gomock.Call

StoreCheckpoint indicates an expected call of StoreCheckpoint.

func (*MockEventStoreMockRecorder) StoreEvent

func (mr *MockEventStoreMockRecorder) StoreEvent(ctx, request interface{}) *gomock.Call

StoreEvent indicates an expected call of StoreEvent.

type MockManager added in v1.5.2

type MockManager struct {
	// contains filtered or unexported fields
}

MockManager is a mock of Manager interface.

func NewMockManager added in v1.5.2

func NewMockManager(ctrl *gomock.Controller) *MockManager

NewMockManager creates a new mock instance.

func (*MockManager) Create added in v1.5.2

func (m *MockManager) Create(ctx context.Context, watcherID string, opts ...WatchOption) (Watcher, error)

Create mocks base method.

func (*MockManager) EXPECT added in v1.5.2

func (m *MockManager) EXPECT() *MockManagerMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockManager) Lookup added in v1.5.2

func (m *MockManager) Lookup(ctx context.Context, watcherID string) (Watcher, error)

Lookup mocks base method.

func (*MockManager) Stop added in v1.5.2

func (m *MockManager) Stop(ctx context.Context) error

Stop mocks base method.

type MockManagerMockRecorder added in v1.5.2

type MockManagerMockRecorder struct {
	// contains filtered or unexported fields
}

MockManagerMockRecorder is the mock recorder for MockManager.

func (*MockManagerMockRecorder) Create added in v1.5.2

func (mr *MockManagerMockRecorder) Create(ctx, watcherID interface{}, opts ...interface{}) *gomock.Call

Create indicates an expected call of Create.

func (*MockManagerMockRecorder) Lookup added in v1.5.2

func (mr *MockManagerMockRecorder) Lookup(ctx, watcherID interface{}) *gomock.Call

Lookup indicates an expected call of Lookup.

func (*MockManagerMockRecorder) Stop added in v1.5.2

func (mr *MockManagerMockRecorder) Stop(ctx interface{}) *gomock.Call

Stop indicates an expected call of Stop.

type MockSerializer

type MockSerializer struct {
	// contains filtered or unexported fields
}

MockSerializer is a mock of Serializer interface.

func NewMockSerializer

func NewMockSerializer(ctrl *gomock.Controller) *MockSerializer

NewMockSerializer creates a new mock instance.

func (*MockSerializer) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockSerializer) Marshal

func (m *MockSerializer) Marshal(event Event) ([]byte, error)

Marshal mocks base method.

func (*MockSerializer) Unmarshal

func (m *MockSerializer) Unmarshal(data []byte, event *Event) error

Unmarshal mocks base method.

type MockSerializerMockRecorder

type MockSerializerMockRecorder struct {
	// contains filtered or unexported fields
}

MockSerializerMockRecorder is the mock recorder for MockSerializer.

func (*MockSerializerMockRecorder) Marshal

func (mr *MockSerializerMockRecorder) Marshal(event interface{}) *gomock.Call

Marshal indicates an expected call of Marshal.

func (*MockSerializerMockRecorder) Unmarshal

func (mr *MockSerializerMockRecorder) Unmarshal(data, event interface{}) *gomock.Call

Unmarshal indicates an expected call of Unmarshal.

type MockWatcher

type MockWatcher struct {
	// contains filtered or unexported fields
}

MockWatcher is a mock of Watcher interface.

func NewMockWatcher

func NewMockWatcher(ctrl *gomock.Controller) *MockWatcher

NewMockWatcher creates a new mock instance.

func (*MockWatcher) Checkpoint

func (m *MockWatcher) Checkpoint(ctx context.Context, eventSeqNum uint64) error

Checkpoint mocks base method.

func (*MockWatcher) EXPECT

func (m *MockWatcher) EXPECT() *MockWatcherMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockWatcher) ID

func (m *MockWatcher) ID() string

ID mocks base method.

func (*MockWatcher) SeekToOffset

func (m *MockWatcher) SeekToOffset(ctx context.Context, eventSeqNum uint64) error

SeekToOffset mocks base method.

func (*MockWatcher) SetHandler added in v1.5.2

func (m *MockWatcher) SetHandler(handler EventHandler) error

SetHandler mocks base method.

func (*MockWatcher) Start added in v1.5.2

func (m *MockWatcher) Start(ctx context.Context) error

Start mocks base method.

func (*MockWatcher) Stats

func (m *MockWatcher) Stats() Stats

Stats mocks base method.

func (*MockWatcher) Stop

func (m *MockWatcher) Stop(ctx context.Context)

Stop mocks base method.

type MockWatcherMockRecorder

type MockWatcherMockRecorder struct {
	// contains filtered or unexported fields
}

MockWatcherMockRecorder is the mock recorder for MockWatcher.

func (*MockWatcherMockRecorder) Checkpoint

func (mr *MockWatcherMockRecorder) Checkpoint(ctx, eventSeqNum interface{}) *gomock.Call

Checkpoint indicates an expected call of Checkpoint.

func (*MockWatcherMockRecorder) ID

ID indicates an expected call of ID.

func (*MockWatcherMockRecorder) SeekToOffset

func (mr *MockWatcherMockRecorder) SeekToOffset(ctx, eventSeqNum interface{}) *gomock.Call

SeekToOffset indicates an expected call of SeekToOffset.

func (*MockWatcherMockRecorder) SetHandler added in v1.5.2

func (mr *MockWatcherMockRecorder) SetHandler(handler interface{}) *gomock.Call

SetHandler indicates an expected call of SetHandler.

func (*MockWatcherMockRecorder) Start added in v1.5.2

func (mr *MockWatcherMockRecorder) Start(ctx interface{}) *gomock.Call

Start indicates an expected call of Start.

func (*MockWatcherMockRecorder) Stats

func (mr *MockWatcherMockRecorder) Stats() *gomock.Call

Stats indicates an expected call of Stats.

func (*MockWatcherMockRecorder) Stop

func (mr *MockWatcherMockRecorder) Stop(ctx interface{}) *gomock.Call

Stop indicates an expected call of Stop.

type Operation

type Operation string

Operation represents the type of operation performed in an event.

const (
	// OperationCreate represents a creation event.
	OperationCreate Operation = "CREATE"
	// OperationUpdate represents an update event.
	OperationUpdate Operation = "UPDATE"
	// OperationDelete represents a deletion event.
	OperationDelete Operation = "DELETE"
)

type RetryStrategy

type RetryStrategy int
const (
	RetryStrategyBlock RetryStrategy = iota
	RetryStrategySkip
)

type SerializationError

type SerializationError struct {
	Event Event
	Err   error
}

func NewSerializationError

func NewSerializationError(event Event, err error) *SerializationError

func (*SerializationError) Error

func (e *SerializationError) Error() string

func (*SerializationError) Unwrap

func (e *SerializationError) Unwrap() error

type Serializer

type Serializer interface {
	// Marshal serializes an Event into a byte slice.
	Marshal(event Event) ([]byte, error)

	// Unmarshal deserializes a byte slice into an Event.
	Unmarshal(data []byte, event *Event) error
}

Serializer defines the interface for event serialization and deserialization.

type State

type State string
const (
	StateIdle     State = "idle"
	StateRunning  State = "running"
	StateStopping State = "stopping"
	StateStopped  State = "stopped"
)

type Stats

type Stats struct {
	ID                     string
	State                  State
	NextEventIterator      EventIterator // Next event iterator for the watcher
	CheckpointIterator     EventIterator // Checkpoint iterator for the watcher
	LastProcessedSeqNum    uint64        // SeqNum of the last event processed by this watcher
	LastProcessedEventTime time.Time     // timestamp of the last processed event
	LastListenTime         time.Time     // timestamp of the last successful listen operation
}

type StoreEventRequest added in v1.5.2

type StoreEventRequest struct {
	Operation  Operation   `json:"operation"`
	ObjectType string      `json:"objectType"`
	Object     interface{} `json:"object"`
}

StoreEventRequest represents the input for creating an event.

type WatchOption

type WatchOption func(*watchOptions)

WatchOption is a function type for configuring watch options

func WithAutoStart added in v1.5.2

func WithAutoStart() WatchOption

WithAutoStart enables auto-start for the watcher right after creation

func WithBatchSize

func WithBatchSize(size int) WatchOption

WithBatchSize sets the number of events to fetch in each batch

func WithEphemeral added in v1.6.0

func WithEphemeral() WatchOption

WithEphemeral sets the watcher to be ephemeral (non-checkpointing)

func WithFilter

func WithFilter(filter EventFilter) WatchOption

WithFilter sets the event filter for watching

func WithHandler added in v1.5.2

func WithHandler(handler EventHandler) WatchOption

WithHandler sets the event handler for watching

func WithInitialBackoff

func WithInitialBackoff(backoff time.Duration) WatchOption

WithInitialBackoff sets the initial backoff duration for retries

func WithInitialEventIterator

func WithInitialEventIterator(iterator EventIterator) WatchOption

WithInitialEventIterator sets the starting position for watching if no checkpoint is found

func WithMaxBackoff

func WithMaxBackoff(backoff time.Duration) WatchOption

WithMaxBackoff sets the maximum backoff duration for retries

func WithMaxRetries

func WithMaxRetries(maxRetries int) WatchOption

WithMaxRetries sets the maximum number of retries for event handling

func WithRetryStrategy

func WithRetryStrategy(strategy RetryStrategy) WatchOption

WithRetryStrategy sets the retry strategy for event handling

type Watcher

type Watcher interface {
	// ID returns the unique identifier for the watcher.
	ID() string

	// Stats returns the current statistics for the watcher.
	Stats() Stats

	// SetHandler sets the event handler for the watcher. Must be set before calling Start.
	// Will fail if the handler is already set.
	// Returns error if handler is nil or already configured.
	SetHandler(handler EventHandler) error

	// Start begins processing events.
	// Returns error if no handler configured or already running.
	Start(ctx context.Context) error

	// Stop gracefully stops the watcher.
	Stop(ctx context.Context)

	// Checkpoint saves the current progress of the watcher.
	Checkpoint(ctx context.Context, eventSeqNum uint64) error

	// SeekToOffset moves the watcher to a specific event sequence number.
	// Will stop and restart the watcher if running.
	SeekToOffset(ctx context.Context, eventSeqNum uint64) error
}

Watcher represents a single event watcher.

func New added in v1.5.2

func New(ctx context.Context, id string, store EventStore, opts ...WatchOption) (Watcher, error)

New creates a new watcher with the given parameters

type WatcherError

type WatcherError struct {
	WatcherID string
	Err       error
}

WatcherError represents an error related to a specific watcher

func NewWatcherError

func NewWatcherError(watcherID string, err error) *WatcherError

func (*WatcherError) Error

func (e *WatcherError) Error() string

func (*WatcherError) Unwrap

func (e *WatcherError) Unwrap() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL