syncz

package
v17.5.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunLayers added in v17.4.0

func RunLayers(ctx context.Context, layers ...Layer) error

Types

type EventCallback

type EventCallback[E any] func(ctx context.Context, e E)

type Layer added in v17.4.0

type Layer interface {
	ToStageFuncs() ([]stager.StageFunc, error)
}

Layer is a layer in a startup and shutdown stack. Each layer can expand into zero or more stages that are run first to last and shut down in reverse order.

Example

|---------| | Layer A | |---------| | Layer B | |---------|

Let's say the above layers expand into the following:

|----------| | Stage A1 | |----------| | Stage A2 | |----------| | Stage B1 | |----------| | Stage B2 | |----------|

Stages are shut down in bottom up order: B2, B1, A2, A1.

While stages are started in the order they come, it doesn't matter much since they all start concurrently in independent goroutines and, hence, no real ordering is achieved. The purpose of this mechanism is to guarantee reverse shutdown order of the stages. This is ensured by the underlying github.com/ash2k/stager library. Layers provide more flexibility and composability on top of the library.

type LayerFunc added in v17.4.0

type LayerFunc stager.StageFunc

func (LayerFunc) ToStageFuncs added in v17.4.0

func (f LayerFunc) ToStageFuncs() ([]stager.StageFunc, error)

type LayerGoFunc added in v17.4.0

type LayerGoFunc func(ctx context.Context) error

func (LayerGoFunc) ToStageFuncs added in v17.4.0

func (f LayerGoFunc) ToStageFuncs() ([]stager.StageFunc, error)

type Listen added in v17.1.0

type Listen[E any] func(cb EventCallback[E])

type LogWhenDoneLayer added in v17.4.0

type LogWhenDoneLayer struct {
	Log *slog.Logger
	Msg string
}

func (*LogWhenDoneLayer) ToStageFuncs added in v17.4.0

func (l *LogWhenDoneLayer) ToStageFuncs() ([]stager.StageFunc, error)

type Mutex

type Mutex struct {
	// contains filtered or unexported fields
}

Mutex is a non-reentrant (like sync.Mutex) mutex that (unlike sync.Mutex) allows to acquire the mutex with a possibility to abort the attempt early if a context signals done.

A buffered channel of size 1 is used as the mutex. Think of it as of a box - the party that has put something into it has acquired the mutex. To unlock it, remove the contents from the box, so that someone else can use it. An empty box is created in the NewMutex() constructor.

TryLock, Lock, and Unlock provide memory access ordering guarantees by piggybacking on channel's "happens before" guarantees. See https://golang.org/ref/mem

func NewMutex

func NewMutex() Mutex

func (Mutex) Lock

func (m Mutex) Lock(ctx context.Context) bool

func (Mutex) TryLock

func (m Mutex) TryLock() bool

func (Mutex) Unlock

func (m Mutex) Unlock()

type SiblingLayers added in v17.4.0

type SiblingLayers []Layer

SiblingLayers allows to compose multiple layers to be siblings rather than ordered on top of each other.

Example

A is a simple layer and B and C are sibling layers. |---------|---------| | Layer A | |---------|---------| | Layer B | Layer C | |---------|---------|

Can expand into:

|----------|----------| | Stage A1 | |----------|----------| | Stage B1 | Stage C1 | | |----------| | | Stage C2 | |----------|----------|

Shutdown order: (B1 and (C2 then C1)) then A1. Where "and" means things happen concurrently and "then" means an ordering dependency.

func (SiblingLayers) ToStageFuncs added in v17.4.0

func (l SiblingLayers) ToStageFuncs() ([]stager.StageFunc, error)

type Subscriptions

type Subscriptions[E any] struct {
	// contains filtered or unexported fields
}

func (*Subscriptions[E]) Dispatch

func (s *Subscriptions[E]) Dispatch(ctx context.Context, e E)

Dispatch dispatches the given event to all added subscriptions.

func (*Subscriptions[E]) Len added in v17.1.0

func (s *Subscriptions[E]) Len() int

Len returns the number of subscriptions.

func (*Subscriptions[E]) On

func (s *Subscriptions[E]) On(ctx context.Context, cb EventCallback[E])

func (*Subscriptions[E]) Subscribe added in v17.1.0

func (s *Subscriptions[E]) Subscribe(ctx context.Context) Listen[E]

Subscribe subscribes to events but, unlike On, does not start listening immediately. It returns a function that must be called to listen to events. Make sure the function is always called as it performs cleanup once the passed context is done. The returned function must only be called once.

type ValueHolder added in v17.2.0

type ValueHolder[T comparable] struct {
	// contains filtered or unexported fields
}

ValueHolder holds agent id of this agentk.

func NewValueHolder added in v17.2.0

func NewValueHolder[T comparable]() *ValueHolder[T]

func (*ValueHolder[T]) Get added in v17.2.0

func (h *ValueHolder[T]) Get(ctx context.Context) (T, error)

func (*ValueHolder[T]) Set added in v17.2.0

func (h *ValueHolder[T]) Set(value T) error

Set is not safe for concurrent use. It's ok since we don't need that.

func (*ValueHolder[T]) TryGet added in v17.2.0

func (h *ValueHolder[T]) TryGet() (T, bool)

type WaitGroup added in v17.5.0

type WaitGroup struct {
	// contains filtered or unexported fields
}

func NewWaitGroup added in v17.5.0

func NewWaitGroup() *WaitGroup

func (*WaitGroup) Add added in v17.5.0

func (wg *WaitGroup) Add()

func (*WaitGroup) Done added in v17.5.0

func (wg *WaitGroup) Done()

func (*WaitGroup) Wait added in v17.5.0

func (wg *WaitGroup) Wait(timeout time.Duration) bool

Wait waits until the internal counter reaches zero or the timeout occurs. Returns false on timeout and true otherwise. Returns false immediately if timeout is non-positive.

type WorkSource

type WorkSource[ID comparable, C any] struct {
	ID            ID
	Configuration C
}

type Worker

type Worker interface {
	Run(context.Context)
}

type WorkerFactory

type WorkerFactory[ID comparable, C any] interface {
	New(WorkSource[ID, C]) Worker
}

type WorkerFunc

type WorkerFunc func(context.Context)

func (WorkerFunc) Run

func (wf WorkerFunc) Run(ctx context.Context)

type WorkerHolder

type WorkerHolder[C any] struct {
	// contains filtered or unexported fields
}

WorkerHolder holds a worker and restarts it when configuration changes.

func NewComparableWorkerHolder

func NewComparableWorkerHolder[C comparable](factory func(C) Worker) *WorkerHolder[C]

func NewProtoWorkerHolder

func NewProtoWorkerHolder[C proto.Message](factory func(C) Worker) *WorkerHolder[C]

func NewWorkerHolder

func NewWorkerHolder[C any](factory func(C) Worker, isEqual func(config1, config2 C) bool) *WorkerHolder[C]

func (*WorkerHolder[C]) ApplyConfig

func (w *WorkerHolder[C]) ApplyConfig(ctx context.Context, config C) bool

ApplyConfig ensures a worker is running with the provided or equal config.

This method starts a worker if it's not running already. If it is running and the config is not equal then the worker is stopped, a new worker is started then with the new config.

func (*WorkerHolder[C]) StopAndWait

func (w *WorkerHolder[C]) StopAndWait()

type WorkerManager

type WorkerManager[ID comparable, C any] struct {
	// contains filtered or unexported fields
}

func NewProtoWorkerManager

func NewProtoWorkerManager[ID comparable, C proto.Message](log *slog.Logger, fld func(ID) slog.Attr, workerFactory WorkerFactory[ID, C]) *WorkerManager[ID, C]

func NewWorkerManager

func NewWorkerManager[ID comparable, C any](log *slog.Logger, fld func(ID) slog.Attr, workerFactory WorkerFactory[ID, C], equal func(c1, c2 C) bool) *WorkerManager[ID, C]

func (*WorkerManager[ID, C]) ApplyConfiguration

func (m *WorkerManager[ID, C]) ApplyConfiguration(sources []WorkSource[ID, C]) error

func (*WorkerManager[ID, C]) Stop added in v17.4.0

func (m *WorkerManager[ID, C]) Stop()

Stop tells all workers to stop. It does not wait for workers to stop. Use Wait() for that.

func (*WorkerManager[ID, C]) StopAndWait added in v17.4.0

func (m *WorkerManager[ID, C]) StopAndWait()

func (*WorkerManager[ID, C]) Wait added in v17.4.0

func (m *WorkerManager[ID, C]) Wait()

Wait waits for all workers to stop.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL