Documentation
¶
Overview ¶
Package mgr provides simple managing of flow control and logging.
Index ¶
- Constants
- Variables
- func RunModules(ctx context.Context, modules ...Module) error
- type EventCallback
- type EventCallbackFunc
- type EventMgr
- type EventSubscription
- type ExtendedGroup
- type Group
- type Manager
- func (m *Manager) Cancel()
- func (m *Manager) Ctx() context.Context
- func (m *Manager) Debug(msg string, args ...any)
- func (m *Manager) Delay(name string, period time.Duration, fn func(w *WorkerCtx) error) *WorkerMgr
- func (m *Manager) Do(name string, fn func(w *WorkerCtx) error) error
- func (m *Manager) Done() <-chan struct{}
- func (m *Manager) Error(msg string, args ...any)
- func (m *Manager) Go(name string, fn func(w *WorkerCtx) error)
- func (m *Manager) Info(msg string, args ...any)
- func (m *Manager) IsDone() bool
- func (m *Manager) Log(level slog.Level, msg string, args ...any)
- func (m *Manager) LogAttrs(level slog.Level, msg string, attrs ...slog.Attr)
- func (m *Manager) LogEnabled(level slog.Level) bool
- func (m *Manager) Name() string
- func (m *Manager) NewStateMgr() *StateMgr
- func (m *Manager) NewWorkerMgr(name string, fn func(w *WorkerCtx) error, ...) *WorkerMgr
- func (m *Manager) Repeat(name string, period time.Duration, fn func(w *WorkerCtx) error) *WorkerMgr
- func (m *Manager) Reset()
- func (m *Manager) WaitForWorkers(max time.Duration) (done bool)
- func (m *Manager) WaitForWorkersFromStop(max time.Duration) (done bool)
- func (m *Manager) Warn(msg string, args ...any)
- func (m *Manager) WorkerInfo(s *stack.Snapshot) (*WorkerInfo, error)
- type Module
- type SleepyTicker
- type State
- type StateMgr
- func (m *StateMgr) Add(s State)
- func (m *StateMgr) AddCallback(callbackName string, callback EventCallbackFunc[StateUpdate])
- func (m *StateMgr) Clear()
- func (m *StateMgr) Export() StateUpdate
- func (m *StateMgr) Remove(id string)
- func (m *StateMgr) Subscribe(subscriberName string, chanSize int) *EventSubscription[StateUpdate]
- type StateType
- type StateUpdate
- type StatefulModule
- type WorkerCtx
- func (w *WorkerCtx) AddToCtx(ctx context.Context) context.Context
- func (w *WorkerCtx) Cancel()
- func (w *WorkerCtx) Ctx() context.Context
- func (w *WorkerCtx) Debug(msg string, args ...any)
- func (w *WorkerCtx) Done() <-chan struct{}
- func (w *WorkerCtx) Error(msg string, args ...any)
- func (w *WorkerCtx) Info(msg string, args ...any)
- func (w *WorkerCtx) IsDone() bool
- func (w *WorkerCtx) Log(level slog.Level, msg string, args ...any)
- func (w *WorkerCtx) LogAttrs(level slog.Level, msg string, attrs ...slog.Attr)
- func (w *WorkerCtx) LogEnabled(level slog.Level) bool
- func (w *WorkerCtx) Logger() *slog.Logger
- func (w *WorkerCtx) Warn(msg string, args ...any)
- func (w *WorkerCtx) WorkerMgr() *WorkerMgr
- type WorkerInfo
- type WorkerInfoDetail
- type WorkerInfoModule
- type WorkerMgr
Constants ¶
const ( StateTypeUndefined = "" StateTypeHint = "hint" StateTypeWarning = "warning" StateTypeError = "error" )
State Types.
Variables ¶
var ( // ErrUnsuitableGroupState is returned when an operation cannot be executed due to an unsuitable state. ErrUnsuitableGroupState = errors.New("unsuitable group state") // ErrInvalidGroupState is returned when a group is in an invalid state and cannot be recovered. ErrInvalidGroupState = errors.New("invalid group state") // ErrExecuteCmdLineOp is returned when modules are created, but request // execution of a (somewhere else set) command line operation instead. ErrExecuteCmdLineOp = errors.New("command line operation execution requested") )
var ManagerNameSLogKey = "manager"
ManagerNameSLogKey is used as the logging key for the name of the manager.
var WorkerCtxContextKey = workerContextKey{}
WorkerCtxContextKey is the key used to add the WorkerCtx to a context.
Functions ¶
Types ¶
type EventCallback ¶
type EventCallback[T any] struct { // contains filtered or unexported fields }
EventCallback is a registered callback to an event.
type EventCallbackFunc ¶
EventCallbackFunc defines the event callback function.
type EventMgr ¶
type EventMgr[T any] struct { // contains filtered or unexported fields }
EventMgr is a simple event manager.
func NewEventMgr ¶
NewEventMgr returns a new event manager. It is easiest used as a public field on a struct, so that others can simply Subscribe() oder AddCallback().
func (*EventMgr[T]) AddCallback ¶
func (em *EventMgr[T]) AddCallback(callbackName string, callback EventCallbackFunc[T])
AddCallback adds a callback to executed on events. The received events are shared among all subscribers and callbacks. Be sure to apply proper concurrency safeguards, if applicable.
type EventSubscription ¶
type EventSubscription[T any] struct { // contains filtered or unexported fields }
EventSubscription is a subscription to an event.
func (*EventSubscription[T]) Cancel ¶
func (es *EventSubscription[T]) Cancel()
Cancel cancels the subscription. The events channel is not closed, but will not receive new events.
func (*EventSubscription[T]) Done ¶
func (es *EventSubscription[T]) Done() bool
Done returns whether the event subscription has been canceled.
func (*EventSubscription[T]) Events ¶
func (es *EventSubscription[T]) Events() <-chan T
Events returns a read channel for the events. The received events are shared among all subscribers and callbacks. Be sure to apply proper concurrency safeguards, if applicable.
type ExtendedGroup ¶
type ExtendedGroup struct { *Group // contains filtered or unexported fields }
ExtendedGroup extends the group with additional helpful functionality.
func NewExtendedGroup ¶
func NewExtendedGroup(modules ...Module) *ExtendedGroup
NewExtendedGroup returns a new extended group.
func UpgradeGroup ¶
func UpgradeGroup(g *Group) *ExtendedGroup
UpgradeGroup upgrades a regular group to an extended group.
func (*ExtendedGroup) EnsureStartedWorker ¶
func (eg *ExtendedGroup) EnsureStartedWorker(wCtx *WorkerCtx) error
EnsureStartedWorker tries to start the group until it succeeds or fails permanently.
func (*ExtendedGroup) EnsureStoppedWorker ¶
func (eg *ExtendedGroup) EnsureStoppedWorker(wCtx *WorkerCtx) error
EnsureStoppedWorker tries to stop the group until it succeeds or fails permanently.
type Group ¶
type Group struct {
// contains filtered or unexported fields
}
Group describes a group of modules.
func (*Group) Add ¶
Add validates the given module and adds it to the group, if all requirements are met. Not safe for concurrent use with any other method. All modules must be added before anything else is done with the group.
func (*Group) AddStatesCallback ¶
func (g *Group) AddStatesCallback(callbackName string, callback EventCallbackFunc[StateUpdate])
AddStatesCallback adds the given callback function to all group modules that expose a state manager at States().
func (*Group) GetStates ¶
func (g *Group) GetStates() []StateUpdate
GetStates returns the current states of all group modules.
func (*Group) Ready ¶
Ready returns whether all modules in the group have been started and are still running.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages workers.
func (*Manager) Delay ¶
Delay starts the given function delayed in a goroutine (as a "worker"). The worker context has - A separate context which is canceled when the functions returns. - Access to named structure logging. - By default error/panic will be logged. For custom behavior supply errorFn, the argument is optional. - Panic catching. - Flow control helpers.
func (*Manager) Do ¶
Do directly executes the given function (as a "worker"). The worker context has - A separate context which is canceled when the functions returns. - Access to named structure logging. - Given function is re-run after failure (with backoff). - Panic catching. - Flow control helpers.
func (*Manager) Done ¶
func (m *Manager) Done() <-chan struct{}
Done returns the context Done channel.
func (*Manager) Go ¶
Go starts the given function in a goroutine (as a "worker"). The worker context has - A separate context which is canceled when the functions returns. - Access to named structure logging. - Given function is re-run after failure (with backoff). - Panic catching. - Flow control helpers.
func (*Manager) Log ¶
Log emits a log record with the current time and the given level and message. The manager context is automatically supplied.
func (*Manager) LogAttrs ¶
LogAttrs is a more efficient version of Log() that accepts only Attrs. The manager context is automatically supplied.
func (*Manager) LogEnabled ¶
LogEnabled reports whether the logger emits log records at the given level. The manager context is automatically supplied.
func (*Manager) NewStateMgr ¶
NewStateMgr returns a new state manager.
func (*Manager) NewWorkerMgr ¶
func (m *Manager) NewWorkerMgr(name string, fn func(w *WorkerCtx) error, errorFn func(c *WorkerCtx, err error, panicInfo string)) *WorkerMgr
NewWorkerMgr creates a new scheduler for the given worker function. Errors and panic will only be logged by default. If custom behavior is required, supply an errorFn. If all scheduling has ended, the scheduler will end itself, including all related workers, except if keep-alive is enabled.
func (*Manager) Repeat ¶
Repeat executes the given function periodically in a goroutine (as a "worker"). The worker context has - A separate context which is canceled when the functions returns. - Access to named structure logging. - By default error/panic will be logged. For custom behavior supply errorFn, the argument is optional. - Flow control helpers. - Repeat is intended for long running tasks that are mostly idle.
func (*Manager) Reset ¶
func (m *Manager) Reset()
Reset resets the manager in order to be able to be used again. In the process, the current context is canceled. As part of a module (in a group), the module might be stopped and started again. This method is not goroutine-safe. The caller must make sure the manager is not being used in any way during execution.
func (*Manager) WaitForWorkers ¶
WaitForWorkers waits for all workers of this manager to be done. The default maximum waiting time is one minute.
func (*Manager) WaitForWorkersFromStop ¶
WaitForWorkersFromStop is a special version of WaitForWorkers, meant to be called from the stop routine. It waits for all workers of this manager to be done, except for the Stop function. The default maximum waiting time is one minute.
func (*Manager) WorkerInfo ¶ added in v1.6.22
func (m *Manager) WorkerInfo(s *stack.Snapshot) (*WorkerInfo, error)
WorkerInfo returns status information for all running workers of this manager.
type SleepyTicker ¶
type SleepyTicker struct {
// contains filtered or unexported fields
}
SleepyTicker is wrapper over time.Ticker that respects the sleep mode of the module.
func NewSleepyTicker ¶
func NewSleepyTicker(normalDuration time.Duration, sleepDuration time.Duration) *SleepyTicker
NewSleepyTicker returns a new SleepyTicker. This is a wrapper of the standard time.Ticker but it respects modules.Module sleep mode. Check https://pkg.go.dev/time#Ticker. If sleepDuration is set to 0 ticker will not tick during sleep.
func (*SleepyTicker) SetSleep ¶
func (st *SleepyTicker) SetSleep(enabled bool)
SetSleep sets the sleep mode of the ticker. If enabled is true, the ticker will tick with sleepDuration. If enabled is false, the ticker will tick with normalDuration.
func (*SleepyTicker) Stop ¶
func (st *SleepyTicker) Stop()
Stop turns off a ticker. After Stop, no more ticks will be sent. Stop does not close the channel, to prevent a concurrent goroutine reading from the channel from seeing an erroneous "tick".
func (*SleepyTicker) Wait ¶
func (st *SleepyTicker) Wait() <-chan time.Time
Wait waits until the module is not in sleep mode and returns time.Ticker.C channel.
type State ¶
type State struct { // ID is a program-unique ID. // It must not only be unique within the StateMgr, but for the whole program, // as it may be re-used with related systems. // Required. ID string // Name is the name of the state. // This may also serve as a notification title. // Required. Name string // Message is a more detailed message about the state. // Optional. Message string // Type defines the type of the state. // Optional. Type StateType // Time is the time when the state was created or the originating incident occurred. // Optional, will be set to current time if not set. Time time.Time // Data can hold any additional data necessary for further processing of connected systems. // Optional. Data any }
State describes the state of a manager or module.
type StateMgr ¶
type StateMgr struct {
// contains filtered or unexported fields
}
StateMgr is a simple state manager.
func NewStateMgr ¶
NewStateMgr returns a new state manager.
func (*StateMgr) Add ¶
Add adds a state. If a state with the same ID already exists, it is replaced.
func (*StateMgr) AddCallback ¶
func (m *StateMgr) AddCallback(callbackName string, callback EventCallbackFunc[StateUpdate])
AddCallback adds a callback to state update events.
func (*StateMgr) Export ¶
func (m *StateMgr) Export() StateUpdate
Export returns the current states.
func (*StateMgr) Subscribe ¶
func (m *StateMgr) Subscribe(subscriberName string, chanSize int) *EventSubscription[StateUpdate]
Subscribe subscribes to state update events.
type StateUpdate ¶
StateUpdate is used to update others about a state change.
type StatefulModule ¶
type StatefulModule interface {
States() *StateMgr
}
StatefulModule is used for interface checks on modules.
type WorkerCtx ¶
type WorkerCtx struct {
// contains filtered or unexported fields
}
WorkerCtx provides workers with the necessary environment for flow control and logging.
func WorkerFromCtx ¶
WorkerFromCtx returns the WorkerCtx from the given context.
func (*WorkerCtx) Cancel ¶
func (w *WorkerCtx) Cancel()
Cancel cancels the worker context. Is automatically called after the worker stops/returns, regardless of error.
func (*WorkerCtx) Ctx ¶
Ctx returns the worker context. Is automatically canceled after the worker stops/returns, regardless of error.
func (*WorkerCtx) Done ¶
func (w *WorkerCtx) Done() <-chan struct{}
Done returns the context Done channel.
func (*WorkerCtx) Log ¶
Log emits a log record with the current time and the given level and message. The worker context is automatically supplied.
func (*WorkerCtx) LogAttrs ¶
LogAttrs is a more efficient version of Log() that accepts only Attrs. The worker context is automatically supplied.
func (*WorkerCtx) LogEnabled ¶
LogEnabled reports whether the logger emits log records at the given level. The worker context is automatically supplied.
type WorkerInfo ¶ added in v1.6.22
type WorkerInfo struct { Running int Waiting int Other int Missing int Workers []*WorkerInfoDetail }
WorkerInfo holds status information about a managers workers.
func MergeWorkerInfo ¶ added in v1.6.22
func MergeWorkerInfo(infos ...*WorkerInfo) *WorkerInfo
MergeWorkerInfo merges multiple worker infos into one.
func (*WorkerInfo) Format ¶ added in v1.6.22
func (wi *WorkerInfo) Format() string
Format formats the worker information as a readable table.
type WorkerInfoDetail ¶ added in v1.6.22
type WorkerInfoDetail struct { Count int State string Mgr string Name string Func string CurrentLine string ExtraInfo string }
WorkerInfoDetail holds status information about a single worker.
type WorkerInfoModule ¶ added in v1.6.22
type WorkerInfoModule interface {
WorkerInfo(s *stack.Snapshot) (*WorkerInfo, error)
}
WorkerInfoModule is used for interface checks on modules.
type WorkerMgr ¶
type WorkerMgr struct {
// contains filtered or unexported fields
}
WorkerMgr schedules a worker.
func (*WorkerMgr) Delay ¶
Delay will schedule the worker to run after the given duration. If set, the repeat schedule will continue afterwards. Disable the delay by passing 0.
func (*WorkerMgr) Go ¶
func (s *WorkerMgr) Go()
Go executes the worker immediately. If the worker is currently being executed, the next execution will commence afterwards. Calling Go() implies KeepAlive() if nothing else was specified yet.
func (*WorkerMgr) KeepAlive ¶
KeepAlive instructs the scheduler to not self-destruct, even if all scheduled work is complete.
func (*WorkerMgr) Repeat ¶
Repeat will repeatedly execute the worker using the given interval. Disable repeating by passing 0.