stream

package
v0.0.0-...-dda4347 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2025 License: AGPL-3.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FileNotifier

type FileNotifier interface {
	// Changes returns a channel if a file was created or deleted.
	Changes() (<-chan bool, error)
}

FileNotifyWatcher notifies when a file has been created or deleted within a given directory.

type MetricsCollector

type MetricsCollector interface {
	WatermarkInsertsInc()
	WatermarkRetriesInc()
	ChangesRequestDurationObserve(val float64)
	ChangesCountObserve(val int)
}

MetricsCollector represents the metrics methods called.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream defines a worker that will poll the database for change events.

func New

func New(
	id string,
	db coredatabase.TxnRunner,
	fileNotifier FileNotifier,
	clock clock.Clock,
	metrics MetricsCollector,
	logger logger.Logger,
) *Stream

New creates a new Stream.

func NewInternalStates

func NewInternalStates(
	id string,
	db coredatabase.TxnRunner,
	fileNotifier FileNotifier,
	clock clock.Clock,
	metrics MetricsCollector,
	logger logger.Logger,
	internalStates chan string,
) *Stream

NewInternalStates creates a new Stream with an internal state channel.

func (*Stream) Dying

func (s *Stream) Dying() <-chan struct{}

Dying returns a channel to notify when the stream is dying.

func (*Stream) Kill

func (s *Stream) Kill()

Kill is part of the worker.Worker interface.

func (*Stream) Report

func (s *Stream) Report() map[string]any

Report returns

func (*Stream) Terms

func (s *Stream) Terms() <-chan changestream.Term

Terms returns a channel for a given namespace (database) that returns a set of terms. The notion of terms are a set of changes that can be run one at a time asynchronously. Allowing changes within a given term to be signaled of a change independently from one another. Once a change within a term has been completed, only at that point is another change processed, until all changes are exhausted.

func (*Stream) Wait

func (s *Stream) Wait() error

Wait is part of the worker.Worker interface.

type Term

type Term struct {
	// contains filtered or unexported fields
}

Term represents a set of changes that are bounded by a coalesced set. The notion of a term are a set of changes that can be run one at a time asynchronously. Allowing changes within a given term to be signaled of a change independently from one another. Once a change within a term has been completed, only at that point is another change processed, until all changes are exhausted.

func (*Term) Changes

func (t *Term) Changes() []changestream.ChangeEvent

Changes returns the changes that are part of the term.

func (*Term) Done

func (t *Term) Done(empty bool, abort <-chan struct{})

Done signals that the term has been completed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL