mgr

package
v1.6.23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2024 License: GPL-3.0 Imports: 18 Imported by: 0

Documentation

Overview

Package mgr provides simple managing of flow control and logging.

Index

Constants

View Source
const (
	StateTypeUndefined = ""
	StateTypeHint      = "hint"
	StateTypeWarning   = "warning"
	StateTypeError     = "error"
)

State Types.

Variables

View Source
var (
	// ErrUnsuitableGroupState is returned when an operation cannot be executed due to an unsuitable state.
	ErrUnsuitableGroupState = errors.New("unsuitable group state")

	// ErrInvalidGroupState is returned when a group is in an invalid state and cannot be recovered.
	ErrInvalidGroupState = errors.New("invalid group state")

	// ErrExecuteCmdLineOp is returned when modules are created, but request
	// execution of a (somewhere else set) command line operation instead.
	ErrExecuteCmdLineOp = errors.New("command line operation execution requested")
)
View Source
var ManagerNameSLogKey = "manager"

ManagerNameSLogKey is used as the logging key for the name of the manager.

View Source
var WorkerCtxContextKey = workerContextKey{}

WorkerCtxContextKey is the key used to add the WorkerCtx to a context.

Functions

func RunModules

func RunModules(ctx context.Context, modules ...Module) error

RunModules is a simple wrapper function to start modules and stop them again when the given context is canceled.

Types

type EventCallback

type EventCallback[T any] struct {
	// contains filtered or unexported fields
}

EventCallback is a registered callback to an event.

type EventCallbackFunc

type EventCallbackFunc[T any] func(*WorkerCtx, T) (cancel bool, err error)

EventCallbackFunc defines the event callback function.

type EventMgr

type EventMgr[T any] struct {
	// contains filtered or unexported fields
}

EventMgr is a simple event manager.

func NewEventMgr

func NewEventMgr[T any](eventName string, mgr *Manager) *EventMgr[T]

NewEventMgr returns a new event manager. It is easiest used as a public field on a struct, so that others can simply Subscribe() oder AddCallback().

func (*EventMgr[T]) AddCallback

func (em *EventMgr[T]) AddCallback(callbackName string, callback EventCallbackFunc[T])

AddCallback adds a callback to executed on events. The received events are shared among all subscribers and callbacks. Be sure to apply proper concurrency safeguards, if applicable.

func (*EventMgr[T]) Submit

func (em *EventMgr[T]) Submit(event T)

Submit submits a new event.

func (*EventMgr[T]) Subscribe

func (em *EventMgr[T]) Subscribe(subscriberName string, chanSize int) *EventSubscription[T]

Subscribe subscribes to events. The received events are shared among all subscribers and callbacks. Be sure to apply proper concurrency safeguards, if applicable.

type EventSubscription

type EventSubscription[T any] struct {
	// contains filtered or unexported fields
}

EventSubscription is a subscription to an event.

func (*EventSubscription[T]) Cancel

func (es *EventSubscription[T]) Cancel()

Cancel cancels the subscription. The events channel is not closed, but will not receive new events.

func (*EventSubscription[T]) Done

func (es *EventSubscription[T]) Done() bool

Done returns whether the event subscription has been canceled.

func (*EventSubscription[T]) Events

func (es *EventSubscription[T]) Events() <-chan T

Events returns a read channel for the events. The received events are shared among all subscribers and callbacks. Be sure to apply proper concurrency safeguards, if applicable.

type ExtendedGroup

type ExtendedGroup struct {
	*Group
	// contains filtered or unexported fields
}

ExtendedGroup extends the group with additional helpful functionality.

func NewExtendedGroup

func NewExtendedGroup(modules ...Module) *ExtendedGroup

NewExtendedGroup returns a new extended group.

func UpgradeGroup

func UpgradeGroup(g *Group) *ExtendedGroup

UpgradeGroup upgrades a regular group to an extended group.

func (*ExtendedGroup) EnsureStartedWorker

func (eg *ExtendedGroup) EnsureStartedWorker(wCtx *WorkerCtx) error

EnsureStartedWorker tries to start the group until it succeeds or fails permanently.

func (*ExtendedGroup) EnsureStoppedWorker

func (eg *ExtendedGroup) EnsureStoppedWorker(wCtx *WorkerCtx) error

EnsureStoppedWorker tries to stop the group until it succeeds or fails permanently.

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group describes a group of modules.

func NewGroup

func NewGroup(modules ...Module) *Group

NewGroup returns a new group of modules.

func (*Group) Add

func (g *Group) Add(m Module)

Add validates the given module and adds it to the group, if all requirements are met. Not safe for concurrent use with any other method. All modules must be added before anything else is done with the group.

func (*Group) AddStatesCallback

func (g *Group) AddStatesCallback(callbackName string, callback EventCallbackFunc[StateUpdate])

AddStatesCallback adds the given callback function to all group modules that expose a state manager at States().

func (*Group) GetStates

func (g *Group) GetStates() []StateUpdate

GetStates returns the current states of all group modules.

func (*Group) Modules

func (g *Group) Modules() []Module

Modules returns a copy of the modules.

func (*Group) Ready

func (g *Group) Ready() bool

Ready returns whether all modules in the group have been started and are still running.

func (*Group) Start

func (g *Group) Start() error

Start starts all modules in the group in the defined order. If a module fails to start, itself and all previous modules will be stopped in the reverse order.

func (*Group) Stop

func (g *Group) Stop() error

Stop stops all modules in the group in the reverse order.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages workers.

func New

func New(name string) *Manager

New returns a new manager.

func (*Manager) Cancel

func (m *Manager) Cancel()

Cancel cancels the worker context.

func (*Manager) Ctx

func (m *Manager) Ctx() context.Context

Ctx returns the worker context.

func (*Manager) Debug

func (m *Manager) Debug(msg string, args ...any)

Debug logs at LevelDebug. The manager context is automatically supplied.

func (*Manager) Delay

func (m *Manager) Delay(name string, period time.Duration, fn func(w *WorkerCtx) error) *WorkerMgr

Delay starts the given function delayed in a goroutine (as a "worker"). The worker context has - A separate context which is canceled when the functions returns. - Access to named structure logging. - By default error/panic will be logged. For custom behavior supply errorFn, the argument is optional. - Panic catching. - Flow control helpers.

func (*Manager) Do

func (m *Manager) Do(name string, fn func(w *WorkerCtx) error) error

Do directly executes the given function (as a "worker"). The worker context has - A separate context which is canceled when the functions returns. - Access to named structure logging. - Given function is re-run after failure (with backoff). - Panic catching. - Flow control helpers.

func (*Manager) Done

func (m *Manager) Done() <-chan struct{}

Done returns the context Done channel.

func (*Manager) Error

func (m *Manager) Error(msg string, args ...any)

Error logs at LevelError. The manager context is automatically supplied.

func (*Manager) Go

func (m *Manager) Go(name string, fn func(w *WorkerCtx) error)

Go starts the given function in a goroutine (as a "worker"). The worker context has - A separate context which is canceled when the functions returns. - Access to named structure logging. - Given function is re-run after failure (with backoff). - Panic catching. - Flow control helpers.

func (*Manager) Info

func (m *Manager) Info(msg string, args ...any)

Info logs at LevelInfo. The manager context is automatically supplied.

func (*Manager) IsDone

func (m *Manager) IsDone() bool

IsDone checks whether the manager context is done.

func (*Manager) Log

func (m *Manager) Log(level slog.Level, msg string, args ...any)

Log emits a log record with the current time and the given level and message. The manager context is automatically supplied.

func (*Manager) LogAttrs

func (m *Manager) LogAttrs(level slog.Level, msg string, attrs ...slog.Attr)

LogAttrs is a more efficient version of Log() that accepts only Attrs. The manager context is automatically supplied.

func (*Manager) LogEnabled

func (m *Manager) LogEnabled(level slog.Level) bool

LogEnabled reports whether the logger emits log records at the given level. The manager context is automatically supplied.

func (*Manager) Name

func (m *Manager) Name() string

Name returns the manager name.

func (*Manager) NewStateMgr

func (m *Manager) NewStateMgr() *StateMgr

NewStateMgr returns a new state manager.

func (*Manager) NewWorkerMgr

func (m *Manager) NewWorkerMgr(name string, fn func(w *WorkerCtx) error, errorFn func(c *WorkerCtx, err error, panicInfo string)) *WorkerMgr

NewWorkerMgr creates a new scheduler for the given worker function. Errors and panic will only be logged by default. If custom behavior is required, supply an errorFn. If all scheduling has ended, the scheduler will end itself, including all related workers, except if keep-alive is enabled.

func (*Manager) Repeat

func (m *Manager) Repeat(name string, period time.Duration, fn func(w *WorkerCtx) error) *WorkerMgr

Repeat executes the given function periodically in a goroutine (as a "worker"). The worker context has - A separate context which is canceled when the functions returns. - Access to named structure logging. - By default error/panic will be logged. For custom behavior supply errorFn, the argument is optional. - Flow control helpers. - Repeat is intended for long running tasks that are mostly idle.

func (*Manager) Reset

func (m *Manager) Reset()

Reset resets the manager in order to be able to be used again. In the process, the current context is canceled. As part of a module (in a group), the module might be stopped and started again. This method is not goroutine-safe. The caller must make sure the manager is not being used in any way during execution.

func (*Manager) WaitForWorkers

func (m *Manager) WaitForWorkers(max time.Duration) (done bool)

WaitForWorkers waits for all workers of this manager to be done. The default maximum waiting time is one minute.

func (*Manager) WaitForWorkersFromStop

func (m *Manager) WaitForWorkersFromStop(max time.Duration) (done bool)

WaitForWorkersFromStop is a special version of WaitForWorkers, meant to be called from the stop routine. It waits for all workers of this manager to be done, except for the Stop function. The default maximum waiting time is one minute.

func (*Manager) Warn

func (m *Manager) Warn(msg string, args ...any)

Warn logs at LevelWarn. The manager context is automatically supplied.

func (*Manager) WorkerInfo added in v1.6.22

func (m *Manager) WorkerInfo(s *stack.Snapshot) (*WorkerInfo, error)

WorkerInfo returns status information for all running workers of this manager.

type Module

type Module interface {
	Manager() *Manager
	Start() error
	Stop() error
}

Module is an manage-able instance of some component.

type SleepyTicker

type SleepyTicker struct {
	// contains filtered or unexported fields
}

SleepyTicker is wrapper over time.Ticker that respects the sleep mode of the module.

func NewSleepyTicker

func NewSleepyTicker(normalDuration time.Duration, sleepDuration time.Duration) *SleepyTicker

NewSleepyTicker returns a new SleepyTicker. This is a wrapper of the standard time.Ticker but it respects modules.Module sleep mode. Check https://pkg.go.dev/time#Ticker. If sleepDuration is set to 0 ticker will not tick during sleep.

func (*SleepyTicker) SetSleep

func (st *SleepyTicker) SetSleep(enabled bool)

SetSleep sets the sleep mode of the ticker. If enabled is true, the ticker will tick with sleepDuration. If enabled is false, the ticker will tick with normalDuration.

func (*SleepyTicker) Stop

func (st *SleepyTicker) Stop()

Stop turns off a ticker. After Stop, no more ticks will be sent. Stop does not close the channel, to prevent a concurrent goroutine reading from the channel from seeing an erroneous "tick".

func (*SleepyTicker) Wait

func (st *SleepyTicker) Wait() <-chan time.Time

Wait waits until the module is not in sleep mode and returns time.Ticker.C channel.

type State

type State struct {
	// ID is a program-unique ID.
	// It must not only be unique within the StateMgr, but for the whole program,
	// as it may be re-used with related systems.
	// Required.
	ID string

	// Name is the name of the state.
	// This may also serve as a notification title.
	// Required.
	Name string

	// Message is a more detailed message about the state.
	// Optional.
	Message string

	// Type defines the type of the state.
	// Optional.
	Type StateType

	// Time is the time when the state was created or the originating incident occurred.
	// Optional, will be set to current time if not set.
	Time time.Time

	// Data can hold any additional data necessary for further processing of connected systems.
	// Optional.
	Data any
}

State describes the state of a manager or module.

type StateMgr

type StateMgr struct {
	// contains filtered or unexported fields
}

StateMgr is a simple state manager.

func NewStateMgr

func NewStateMgr(mgr *Manager) *StateMgr

NewStateMgr returns a new state manager.

func (*StateMgr) Add

func (m *StateMgr) Add(s State)

Add adds a state. If a state with the same ID already exists, it is replaced.

func (*StateMgr) AddCallback

func (m *StateMgr) AddCallback(callbackName string, callback EventCallbackFunc[StateUpdate])

AddCallback adds a callback to state update events.

func (*StateMgr) Clear

func (m *StateMgr) Clear()

Clear removes all states.

func (*StateMgr) Export

func (m *StateMgr) Export() StateUpdate

Export returns the current states.

func (*StateMgr) Remove

func (m *StateMgr) Remove(id string)

Remove removes the state with the given ID.

func (*StateMgr) Subscribe

func (m *StateMgr) Subscribe(subscriberName string, chanSize int) *EventSubscription[StateUpdate]

Subscribe subscribes to state update events.

type StateType

type StateType string

StateType defines commonly used states.

func (StateType) Severity

func (st StateType) Severity() int

Severity returns a number representing the gravity of the state for ordering.

type StateUpdate

type StateUpdate struct {
	Module string
	States []State
}

StateUpdate is used to update others about a state change.

type StatefulModule

type StatefulModule interface {
	States() *StateMgr
}

StatefulModule is used for interface checks on modules.

type WorkerCtx

type WorkerCtx struct {
	// contains filtered or unexported fields
}

WorkerCtx provides workers with the necessary environment for flow control and logging.

func WorkerFromCtx

func WorkerFromCtx(ctx context.Context) *WorkerCtx

WorkerFromCtx returns the WorkerCtx from the given context.

func (*WorkerCtx) AddToCtx

func (w *WorkerCtx) AddToCtx(ctx context.Context) context.Context

AddToCtx adds the WorkerCtx to the given context.

func (*WorkerCtx) Cancel

func (w *WorkerCtx) Cancel()

Cancel cancels the worker context. Is automatically called after the worker stops/returns, regardless of error.

func (*WorkerCtx) Ctx

func (w *WorkerCtx) Ctx() context.Context

Ctx returns the worker context. Is automatically canceled after the worker stops/returns, regardless of error.

func (*WorkerCtx) Debug

func (w *WorkerCtx) Debug(msg string, args ...any)

Debug logs at LevelDebug. The worker context is automatically supplied.

func (*WorkerCtx) Done

func (w *WorkerCtx) Done() <-chan struct{}

Done returns the context Done channel.

func (*WorkerCtx) Error

func (w *WorkerCtx) Error(msg string, args ...any)

Error logs at LevelError. The worker context is automatically supplied.

func (*WorkerCtx) Info

func (w *WorkerCtx) Info(msg string, args ...any)

Info logs at LevelInfo. The worker context is automatically supplied.

func (*WorkerCtx) IsDone

func (w *WorkerCtx) IsDone() bool

IsDone checks whether the worker context is done.

func (*WorkerCtx) Log

func (w *WorkerCtx) Log(level slog.Level, msg string, args ...any)

Log emits a log record with the current time and the given level and message. The worker context is automatically supplied.

func (*WorkerCtx) LogAttrs

func (w *WorkerCtx) LogAttrs(level slog.Level, msg string, attrs ...slog.Attr)

LogAttrs is a more efficient version of Log() that accepts only Attrs. The worker context is automatically supplied.

func (*WorkerCtx) LogEnabled

func (w *WorkerCtx) LogEnabled(level slog.Level) bool

LogEnabled reports whether the logger emits log records at the given level. The worker context is automatically supplied.

func (*WorkerCtx) Logger

func (w *WorkerCtx) Logger() *slog.Logger

Logger returns the logger used by the worker context.

func (*WorkerCtx) Warn

func (w *WorkerCtx) Warn(msg string, args ...any)

Warn logs at LevelWarn. The worker context is automatically supplied.

func (*WorkerCtx) WorkerMgr

func (w *WorkerCtx) WorkerMgr() *WorkerMgr

WorkerMgr returns the worker manager the worker was started from. Returns nil if the worker is not associated with a scheduler.

type WorkerInfo added in v1.6.22

type WorkerInfo struct {
	Running int
	Waiting int

	Other   int
	Missing int

	Workers []*WorkerInfoDetail
}

WorkerInfo holds status information about a managers workers.

func MergeWorkerInfo added in v1.6.22

func MergeWorkerInfo(infos ...*WorkerInfo) *WorkerInfo

MergeWorkerInfo merges multiple worker infos into one.

func (*WorkerInfo) Format added in v1.6.22

func (wi *WorkerInfo) Format() string

Format formats the worker information as a readable table.

type WorkerInfoDetail added in v1.6.22

type WorkerInfoDetail struct {
	Count       int
	State       string
	Mgr         string
	Name        string
	Func        string
	CurrentLine string
	ExtraInfo   string
}

WorkerInfoDetail holds status information about a single worker.

type WorkerInfoModule added in v1.6.22

type WorkerInfoModule interface {
	WorkerInfo(s *stack.Snapshot) (*WorkerInfo, error)
}

WorkerInfoModule is used for interface checks on modules.

type WorkerMgr

type WorkerMgr struct {
	// contains filtered or unexported fields
}

WorkerMgr schedules a worker.

func (*WorkerMgr) Delay

func (s *WorkerMgr) Delay(duration time.Duration) *WorkerMgr

Delay will schedule the worker to run after the given duration. If set, the repeat schedule will continue afterwards. Disable the delay by passing 0.

func (*WorkerMgr) Go

func (s *WorkerMgr) Go()

Go executes the worker immediately. If the worker is currently being executed, the next execution will commence afterwards. Calling Go() implies KeepAlive() if nothing else was specified yet.

func (*WorkerMgr) KeepAlive

func (s *WorkerMgr) KeepAlive() *WorkerMgr

KeepAlive instructs the scheduler to not self-destruct, even if all scheduled work is complete.

func (*WorkerMgr) Repeat

func (s *WorkerMgr) Repeat(interval time.Duration) *WorkerMgr

Repeat will repeatedly execute the worker using the given interval. Disable repeating by passing 0.

func (*WorkerMgr) Status added in v1.6.22

func (s *WorkerMgr) Status() string

Status returns the current status of the worker manager.

func (*WorkerMgr) Stop

func (s *WorkerMgr) Stop()

Stop immediately stops the scheduler and all related workers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL