Documentation ¶
Index ¶
Constants ¶
const (
KeySeqNum = "Bacalhau-SeqNum"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // CheckpointInterval determines how often the dispatcher saves its progress. // Lower values provide better durability at the cost of more IO operations. // Negative values disable checkpointing. // Default: 5 seconds CheckpointInterval time.Duration // CheckpointTimeout is the maximum time allowed for a checkpoint operation. // If exceeded, the checkpoint is abandoned and will be retried next interval. // Default: 5 seconds CheckpointTimeout time.Duration // StallTimeout is the duration after which a pending message is considered stalled. // When a message is stalled, recovery mechanisms may be triggered. // Should be significantly longer than expected network latency and processing time. // Default: 5 minutes StallTimeout time.Duration // StallCheckInterval defines how often to check for stalled messages. // More frequent checks provide quicker detection but increase CPU usage. // Should be shorter than StallTimeout. // Default: 30 seconds StallCheckInterval time.Duration // ProcessInterval controls how frequently the dispatcher processes publish results. // Lower values reduce latency but increase CPU usage. // Default: 10 milliseconds ProcessInterval time.Duration // SeekTimeout is the maximum time allowed for seeking to a position in the event stream. // Exceeding this timeout indicates potential issues with the event source. // Default: 30 seconds SeekTimeout time.Duration // BaseRetryInterval is the initial delay between retry attempts after a failure. // This interval increases exponentially up to MaxRetryInterval. // Default: 5 seconds BaseRetryInterval time.Duration // MaxRetryInterval caps the maximum delay between retry attempts. // Prevents exponential backoff from growing too large. // Default: 5 minutes MaxRetryInterval time.Duration }
Config defines the configuration settings for the dispatcher. It controls various timeouts, intervals and retry behavior for the event dispatch process.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config initialized with reasonable default values. These defaults are designed to work well for most use cases but can be overridden based on specific requirements.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher handles reliable delivery of events from a watcher to NATS. It maintains sequence ordering, handles retries, and provides checkpointing for resuming after restarts.
func New ¶
func New(publisher ncl.OrderedPublisher, watcher watcher.Watcher, messageCreator nclprotocol.MessageCreator, config Config) (*Dispatcher, error)
New creates a new Dispatcher with the given configuration and dependencies. The provided publisher will be used to publish messages to NATS. The watcher provides the source of events. The messageCreator determines how events are converted to messages. Returns an error if any dependencies are nil or if config validation fails.
func (*Dispatcher) Start ¶
func (d *Dispatcher) Start(ctx context.Context) error
Start begins processing events and managing async publish results. It launches background goroutines for processing publish results, checking for stalled messages, and checkpointing progress. Returns an error if the dispatcher is already running or if the watcher fails to start.
func (*Dispatcher) State ¶
func (d *Dispatcher) State() State
type ErrDispatcher ¶
func (*ErrDispatcher) Error ¶
func (e *ErrDispatcher) Error() string