dispatcher

package
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	KeySeqNum = "Bacalhau-SeqNum"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// CheckpointInterval determines how often the dispatcher saves its progress.
	// Lower values provide better durability at the cost of more IO operations.
	// Negative values disable checkpointing.
	// Default: 5 seconds
	CheckpointInterval time.Duration

	// CheckpointTimeout is the maximum time allowed for a checkpoint operation.
	// If exceeded, the checkpoint is abandoned and will be retried next interval.
	// Default: 5 seconds
	CheckpointTimeout time.Duration

	// StallTimeout is the duration after which a pending message is considered stalled.
	// When a message is stalled, recovery mechanisms may be triggered.
	// Should be significantly longer than expected network latency and processing time.
	// Default: 5 minutes
	StallTimeout time.Duration

	// StallCheckInterval defines how often to check for stalled messages.
	// More frequent checks provide quicker detection but increase CPU usage.
	// Should be shorter than StallTimeout.
	// Default: 30 seconds
	StallCheckInterval time.Duration

	// ProcessInterval controls how frequently the dispatcher processes publish results.
	// Lower values reduce latency but increase CPU usage.
	// Default: 10 milliseconds
	ProcessInterval time.Duration

	// SeekTimeout is the maximum time allowed for seeking to a position in the event stream.
	// Exceeding this timeout indicates potential issues with the event source.
	// Default: 30 seconds
	SeekTimeout time.Duration

	// BaseRetryInterval is the initial delay between retry attempts after a failure.
	// This interval increases exponentially up to MaxRetryInterval.
	// Default: 5 seconds
	BaseRetryInterval time.Duration

	// MaxRetryInterval caps the maximum delay between retry attempts.
	// Prevents exponential backoff from growing too large.
	// Default: 5 minutes
	MaxRetryInterval time.Duration
}

Config defines the configuration settings for the dispatcher. It controls various timeouts, intervals and retry behavior for the event dispatch process.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config initialized with reasonable default values. These defaults are designed to work well for most use cases but can be overridden based on specific requirements.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the configuration values are valid and returns an error if not. This helps catch configuration issues early before they cause problems at runtime.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher handles reliable delivery of events from a watcher to NATS. It maintains sequence ordering, handles retries, and provides checkpointing for resuming after restarts.

func New

func New(publisher ncl.OrderedPublisher,
	watcher watcher.Watcher,
	messageCreator nclprotocol.MessageCreator, config Config) (*Dispatcher, error)

New creates a new Dispatcher with the given configuration and dependencies. The provided publisher will be used to publish messages to NATS. The watcher provides the source of events. The messageCreator determines how events are converted to messages. Returns an error if any dependencies are nil or if config validation fails.

func (*Dispatcher) Start

func (d *Dispatcher) Start(ctx context.Context) error

Start begins processing events and managing async publish results. It launches background goroutines for processing publish results, checking for stalled messages, and checkpointing progress. Returns an error if the dispatcher is already running or if the watcher fails to start.

func (*Dispatcher) State

func (d *Dispatcher) State() State

func (*Dispatcher) Stop

func (d *Dispatcher) Stop(ctx context.Context) error

Stop gracefully shuts down the dispatcher and its background goroutines.

type ErrDispatcher

type ErrDispatcher struct {
	Op  string // Operation that failed
	Err error  // Underlying error
}

func (*ErrDispatcher) Error

func (e *ErrDispatcher) Error() string

type State

type State struct {
	LastAckedSeqNum    uint64
	LastObservedSeqNum uint64
	LastCheckpoint     uint64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL