Documentation ¶
Index ¶
- Constants
- Variables
- func CloseAll(cs ...io.Closer) error
- func CopyHealth(dest, src map[string]error)
- func MultiCloser[C io.Closer](cs []C) io.Closer
- func NewTicker(period time.Duration) *timeutil.Ticker
- type Config
- type Engine
- func (e *Engine) ClearHealthCond(condition string)
- func (e *Engine) EmitHealthErr(err error)
- func (e *Engine) Go(fn func(context.Context))
- func (e *Engine) GoTick(ticker *timeutil.Ticker, fn func(context.Context))
- func (e *Engine) IfNotStopped(fn func() error) error
- func (e *Engine) IfStarted(fn func() error) error
- func (e *Engine) NewHealthCond(err error) (clear func())
- func (e *Engine) SetHealthCond(condition string, err error)
- func (e *Engine) Tracer() trace.Tracer
- type ErrorBuffer
- type HealthChecker
- func (c *HealthChecker) Close() error
- func (c *HealthChecker) IsHealthy() (healthy bool, errors map[string]error)
- func (c *HealthChecker) IsReady() (ready bool, errors map[string]error)
- func (c *HealthChecker) Register(service HealthReporter) error
- func (c *HealthChecker) Start() error
- func (c *HealthChecker) Unregister(name string) error
- type HealthReporter
- type MultiStart
- type Service
- type StartClose
- type StateMachine
- func (once *StateMachine) Healthy() error
- func (once *StateMachine) IfNotStopped(f func()) (ok bool)
- func (once *StateMachine) IfStarted(f func()) (ok bool)
- func (once *StateMachine) Ready() error
- func (once *StateMachine) StartOnce(name string, fn func() error) error
- func (once *StateMachine) State() string
- func (once *StateMachine) StopOnce(name string, fn func() error) error
- type StopChan
- func (s StopChan) Ctx(ctx context.Context) (context.Context, context.CancelFunc)
- func (s StopChan) CtxCancel(ctx context.Context, cancel context.CancelFunc) (context.Context, context.CancelFunc)
- func (s StopChan) CtxWithTimeout(timeout time.Duration) (context.Context, context.CancelFunc)
- func (s StopChan) NewCtx() (context.Context, context.CancelFunc)
- type StopRChan
- func (s StopRChan) Ctx(ctx context.Context) (context.Context, context.CancelFunc)
- func (s StopRChan) CtxCancel(ctx context.Context, cancel context.CancelFunc) (context.Context, context.CancelFunc)
- func (s StopRChan) CtxWithTimeout(timeout time.Duration) (context.Context, context.CancelFunc)
- func (s StopRChan) NewCtx() (context.Context, context.CancelFunc)
- type TickerConfig
Examples ¶
Constants ¶
const DefaultJitter timeutil.JitterPct = 0.1
DefaultJitter is +/-10%
Variables ¶
var ( ErrAlreadyStopped = errors.New("already stopped") ErrCannotStopUnstarted = errors.New("cannot stop unstarted service") )
Functions ¶
func CloseAll ¶
CloseAll closes all elements concurrently. Use this when you have various different types of io.Closer.
func CopyHealth ¶
CopyHealth copies health statuses from src to dest. Useful when implementing HealthReporter.HealthReport. If duplicate names are encountered, the errors are joined, unless testing in which case a panic is thrown.
func MultiCloser ¶
MultiCloser returns an io.Closer which closes all elements concurrently. Use this when you have a slice of a type which implements io.Closer. []io.Closer can be cast directly to MultiCloser.
Example ¶
ctx := context.Background() f1 := CloseFailure("f") f2 := CloseFailure("f") cs := []CloseFailure{f1, f2} var ms MultiStart if err := ms.Start(ctx, f1, f2); err != nil { fmt.Println(err) return } mc := MultiCloser(cs) if err := mc.Close(); err != nil { fmt.Println(err) }
Output: f started f started f close failure f close failure failed to close: f failed to close: f
Types ¶
type Config ¶ added in v0.2.1
type Config struct { // Name is required. It will be logged shorthand on Start and Close, and appended to the logger name. // It must be unique among services sharing the same logger, in order to ensure uniqueness of the fully qualified name. Name string // NewSubServices is an optional constructor for dependent Services to Start and Close along with this one. NewSubServices func(logger.Logger) []Service // Start is an optional hook called after starting SubServices. Start func(context.Context) error // Close is an optional hook called before closing SubServices. Close func() error }
Config is a configuration for constructing a Service, typically with an Engine, to be embedded and extended as part of a Service implementation.
func (Config) NewService ¶ added in v0.2.1
NewService returns a new Service defined by Config.
- You *should* embed the Service, in order to inherit the methods.
Example ¶
package main import ( "context" "fmt" "time" "go.opentelemetry.io/otel/attribute" . "github.com/smartcontractkit/chainlink-common/pkg/internal/example" // nolint "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" ) type configured struct { services.Service eng *services.Engine subA services.Service subB services.Service workCh chan func() (name string, err error) } func (c *configured) start(context.Context) error { c.eng.GoTick(services.NewTicker(time.Minute), c.do) return nil } func (c *configured) close() error { close(c.workCh) return nil } // do processes all outstanding work func (c *configured) do(ctx context.Context) { ctx, span := c.eng.Tracer().Start(ctx, "DoWork") defer span.End() var count, errs int defer func() { span.SetAttributes(attribute.Int("count", count)) span.SetAttributes(attribute.Int("errs", errs)) }() for { select { case <-ctx.Done(): return case work, ok := <-c.workCh: if !ok { return } count++ name, err := work() if err != nil { errs++ c.eng.SetHealthCond(name, err) } else { c.eng.ClearHealthCond(name) } default: return } } } func newFakeService(lggr logger.Logger, name string) services.Service { return services.Config{Name: name}.NewService(lggr) } func NewConfigured(lggr logger.Logger) services.Service { e := &configured{ workCh: make(chan func() (string, error)), } e.Service, e.eng = services.Config{ Name: "Configured", Start: e.start, Close: e.close, NewSubServices: func(lggr logger.Logger) []services.Service { e.subA = newFakeService(lggr, "Sub-service-A") e.subB = newFakeService(lggr, "Sub-service-B") return []services.Service{e.subA, e.subB} }, }.NewServiceEngine(lggr) return e } func main() { lggr, err := Logger() if err != nil { fmt.Println("Failed to create logger:", err) return } s := NewConfigured(lggr) if err = s.Start(context.Background()); err != nil { fmt.Println("Failed to start service:", err) return } if err = s.Close(); err != nil { fmt.Println("Failed to close service:", err) return } /* commented out because the log output is non-deterministic // Output: // INFO Configured Starting // INFO Configured Starting 2 sub-services // INFO Configured.Sub-service-A Starting // INFO Configured.Sub-service-A Started // INFO Configured.Sub-service-B Starting // INFO Configured.Sub-service-B Started // INFO Configured Started // INFO Configured Closing // INFO Configured Closing 2 sub-services // INFO Configured.Sub-service-B Closing // INFO Configured.Sub-service-B Closed // INFO Configured.Sub-service-A Closing // INFO Configured.Sub-service-A Closed // INFO Configured Closed */ }
Output:
func (Config) NewServiceEngine ¶ added in v0.2.1
NewServiceEngine returns a new Service defined by Config, and an Engine for managing health, goroutines, and logging.
- You *should* embed the Service, in order to inherit the methods.
- You *should not* embed the Engine. Use an unexported field instead.
For example:
type myType struct { services.Service env *service.Engine } t := myType{} t.Service, t.eng = service.Config{ Name: "MyType", Start: t.start, Close: t.close, }.NewServiceEngine(lggr)
type Engine ¶ added in v0.2.1
type Engine struct { StopChan logger.SugaredLogger // contains filtered or unexported fields }
Engine manages service internals like health, goroutine tracking, and shutdown signals. See Config.NewServiceEngine
func (*Engine) ClearHealthCond ¶ added in v0.2.1
ClearHealthCond removes a condition and error recorded by SetHealthCond.
func (*Engine) EmitHealthErr ¶ added in v0.2.1
EmitHealthErr records an error to be reported via the next call to Healthy().
func (*Engine) Go ¶ added in v0.2.1
Go runs fn in a tracked goroutine that will block closing the service.
If this operation runs continuously in the background, then do not trace it. If this operation will terminate, consider tracing via Tracer:
v.e.Go(func(ctx context.Context) { ctx, span := v.e.Tracer().Start(ctx, "MyOperationName") defer span.End() })
func (*Engine) GoTick ¶ added in v0.2.1
GoTick is like Go but calls fn for each tick.
v.e.GoTick(services.NewTicker(time.Minute), v.method)
Consider tracing each tick via Tracer:
v.e.GoTick(services.NewTicker(time.Minute), func(ctx context.Context) { ctx, span := v.e.Tracer().Start(ctx, "MyOperationName") defer span.End() })
func (*Engine) IfNotStopped ¶ added in v0.4.0
IfNotStopped calls fn only if the service is not stopped.
func (*Engine) NewHealthCond ¶ added in v0.2.1
NewHealthCond causes an unhealthy report, until the returned clear func() is called. Use this for simple cases where the func() can be kept in scope, and prefer to defer it inline if possible:
defer NewHealthCond(fmt.Errorf("foo bar: %i", err))()
See SetHealthCond for an alternative API.
func (*Engine) SetHealthCond ¶ added in v0.2.1
SetHealthCond records a condition key and an error, which causes an unhealthy report, until ClearHealthCond(condition) is called. condition keys are for internal use only, and do not show up in the health report.
type ErrorBuffer ¶
type ErrorBuffer struct {
// contains filtered or unexported fields
}
ErrorBuffer uses joinedErrors interface to join multiple errors into a single error. This is useful to track the most recent N errors in a service and flush them as a single error.
func (*ErrorBuffer) Append ¶
func (eb *ErrorBuffer) Append(incoming error)
func (*ErrorBuffer) Flush ¶
func (eb *ErrorBuffer) Flush() (err error)
func (*ErrorBuffer) SetCap ¶
func (eb *ErrorBuffer) SetCap(cap int)
type HealthChecker ¶
type HealthChecker struct { StateMachine // contains filtered or unexported fields }
HealthChecker is a services.Service which monitors other services and can be probed for system health.
func NewChecker
deprecated
func NewChecker(ver, sha string) *HealthChecker
Deprecated: Use NewHealthChecker
func NewHealthChecker ¶ added in v0.4.0
func NewHealthChecker(ver, sha string) *HealthChecker
func (*HealthChecker) Close ¶
func (c *HealthChecker) Close() error
func (*HealthChecker) IsHealthy ¶
func (c *HealthChecker) IsHealthy() (healthy bool, errors map[string]error)
IsHealthy returns the current health of the system. A system is considered healthy if all checks are passing (no errors)
func (*HealthChecker) IsReady ¶
func (c *HealthChecker) IsReady() (ready bool, errors map[string]error)
IsReady returns the current readiness of the system. A system is considered ready if all checks are passing (no errors)
func (*HealthChecker) Register ¶
func (c *HealthChecker) Register(service HealthReporter) error
Register a service for health checks.
func (*HealthChecker) Start ¶
func (c *HealthChecker) Start() error
func (*HealthChecker) Unregister ¶
func (c *HealthChecker) Unregister(name string) error
Unregister a service.
type HealthReporter ¶
type HealthReporter interface { // Ready should return nil if ready, or an error message otherwise. From the k8s docs: // > ready means it’s initialized and healthy means that it can accept traffic in kubernetes // See: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/ Ready() error // HealthReport returns a full health report of the callee including its dependencies. // Keys are based on Name(), with nil values when healthy or errors otherwise. // Use CopyHealth to collect reports from sub-services. // This should run very fast, so avoid doing computation and instead prefer reporting pre-calculated state. HealthReport() map[string]error // Name returns the fully qualified name of the component. Usually the logger name. Name() string }
HealthReporter should be implemented by any type requiring health checks.
type MultiStart ¶
type MultiStart struct {
// contains filtered or unexported fields
}
MultiStart is a utility for starting multiple services together. The set of started services is tracked internally, so that they can be closed if any single service fails to start.
Example ¶
ctx := context.Background() a := Healthy("a") b := CloseFailure("b") c := WontStart("c") var ms MultiStart if err := ms.Start(ctx, a, b, c); err != nil { fmt.Println(err) }
Output: a started b started c start failure b close failure a closed failed to start: c failed to close: b
func (*MultiStart) Close ¶
func (m *MultiStart) Close() (err error)
Close closes all started services, in reverse order.
func (*MultiStart) CloseBecause ¶
func (m *MultiStart) CloseBecause(reason error) (err error)
CloseBecause calls Close and returns reason along with any additional errors.
func (*MultiStart) Start ¶
func (m *MultiStart) Start(ctx context.Context, srvcs ...StartClose) (err error)
Start attempts to Start all services. If any service fails to start, the previously started services will be Closed, and an error returned.
type Service ¶
type Service interface { // Start the service. // - Must return promptly if the context is cancelled. // - Must not retain the context after returning (only applies to start-up) // - Must not depend on external resources (no blocking network calls) Start(context.Context) error // Close stops the Service. // Invariants: Usually after this call the Service cannot be started // again, you need to build a new Service to do so. // // See MultiCloser Close() error HealthReporter }
Service represents a long-running service inside the Application.
The simplest way to implement a Service is via NewService.
For other cases, consider embedding a services.StateMachine to implement these calls in a safe manner.
Example ¶
package main import ( "context" "fmt" "sync" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" . "github.com/smartcontractkit/chainlink-common/pkg/internal/example" // nolint ) type stateMachine struct { services.StateMachine lggr logger.Logger stop services.StopChan wg sync.WaitGroup } func (f *stateMachine) HealthReport() map[string]error { return map[string]error{f.Name(): f.Healthy()} } func (f *stateMachine) Name() string { return f.lggr.Name() } func NewStateMachine(lggr logger.Logger) services.Service { return &stateMachine{ lggr: logger.Named(lggr, "StateMachine"), stop: make(services.StopChan), } } func (f *stateMachine) Start(ctx context.Context) error { return f.StartOnce("StateMachine", func() error { f.lggr.Info("Starting") f.wg.Add(1) go f.run() return nil }) } func (f *stateMachine) Close() error { return f.StopOnce("StateMachine", func() error { f.lggr.Info("Closing") close(f.stop) // trigger goroutine cleanup f.wg.Wait() // wait for cleanup to complete return nil }) } func (f *stateMachine) run() { defer f.wg.Done() for range f.stop { return // stop the routine } } func main() { lggr, err := Logger() if err != nil { fmt.Println("Failed to create logger:", err) return } s := NewStateMachine(lggr) if err = s.Start(context.Background()); err != nil { fmt.Println("Failed to start service:", err) return } if err = s.Close(); err != nil { fmt.Println("Failed to close service:", err) return } }
Output: INFO StateMachine Starting INFO StateMachine Closing
type StartClose ¶
StartClose is a subset of the Service interface.
type StateMachine ¶
type StateMachine struct { sync.RWMutex // lock is held during startup/shutdown, RLock is held while executing functions dependent on a particular state // SvcErrBuffer is an ErrorBuffer that let service owners track critical errors happening in the service. // // SvcErrBuffer.SetCap(int) Overrides buffer limit from defaultErrorBufferCap // SvcErrBuffer.Append(error) Appends an error to the buffer // SvcErrBuffer.Flush() error returns all tracked errors as a single joined error SvcErrBuffer ErrorBuffer // contains filtered or unexported fields }
StateMachine is a low-level primitive designed to be embedded in Service implementations. Default implementations of Ready() and Healthy() are included, as well as StartOnce and StopOnce helpers for implementing Service.Start and Service.Close.
See Config for a higher-level wrapper that extends StateMachine and that should be preferred when applicable.
func (*StateMachine) Healthy ¶
func (once *StateMachine) Healthy() error
Healthy returns ErrNotStarted if the state is not started. Override this per-service with more specific implementations.
func (*StateMachine) IfNotStopped ¶
func (once *StateMachine) IfNotStopped(f func()) (ok bool)
IfNotStopped runs the func and returns true if in any state other than Stopped
func (*StateMachine) IfStarted ¶
func (once *StateMachine) IfStarted(f func()) (ok bool)
IfStarted runs the func and returns true only if started, otherwise returns false
func (*StateMachine) Ready ¶
func (once *StateMachine) Ready() error
Ready returns ErrNotStarted if the state is not started.
func (*StateMachine) StartOnce ¶
func (once *StateMachine) StartOnce(name string, fn func() error) error
StartOnce sets the state to Started
func (*StateMachine) State ¶
func (once *StateMachine) State() string
State retrieves the current state
type StopChan ¶
type StopChan chan struct{}
A StopChan signals when some work should stop. Use StopChanR if you already have a read only <-chan.
func (StopChan) Ctx ¶
Ctx cancels a context.Context when StopChan is closed.
func (StopChan) CtxCancel ¶
func (s StopChan) CtxCancel(ctx context.Context, cancel context.CancelFunc) (context.Context, context.CancelFunc)
CtxCancel cancels a context.Context when StopChan is closed. Returns ctx and cancel unmodified, for convenience.
func (StopChan) CtxWithTimeout ¶ added in v0.4.0
CtxWithTimeout cancels a context.Context when StopChan is closed.
func (StopChan) NewCtx ¶
func (s StopChan) NewCtx() (context.Context, context.CancelFunc)
NewCtx returns a background context.Context that is cancelled when StopChan is closed.
type StopRChan ¶
type StopRChan <-chan struct{}
A StopRChan signals when some work should stop. This is a receive-only version of StopChan, for casting an existing <-chan.
func (StopRChan) Ctx ¶
Ctx cancels a context.Context when StopChan is closed.
func (StopRChan) CtxCancel ¶
func (s StopRChan) CtxCancel(ctx context.Context, cancel context.CancelFunc) (context.Context, context.CancelFunc)
CtxCancel cancels a context.Context when StopChan is closed. Returns ctx and cancel unmodified, for convenience.
func (StopRChan) CtxWithTimeout ¶ added in v0.4.0
CtxWithTimeout cancels a context.Context when StopChan is closed.
func (StopRChan) NewCtx ¶
func (s StopRChan) NewCtx() (context.Context, context.CancelFunc)
NewCtx returns a background context.Context that is cancelled when StopChan is closed.