services

package
v0.3.1-beholder Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2024 License: MIT Imports: 18 Imported by: 112

Documentation

Index

Examples

Constants

View Source
const DefaultJitter timeutil.JitterPct = 0.1

DefaultJitter is +/-10%

Variables

View Source
var (
	ErrAlreadyStopped      = errors.New("already stopped")
	ErrCannotStopUnstarted = errors.New("cannot stop unstarted service")
)

Functions

func CloseAll

func CloseAll(cs ...io.Closer) error

CloseAll closes all elements concurrently. Use this when you have various different types of io.Closer.

func CopyHealth

func CopyHealth(dest, src map[string]error)

CopyHealth copies health statuses from src to dest. Useful when implementing HealthReporter.HealthReport. If duplicate names are encountered, the errors are joined, unless testing in which case a panic is thrown.

func MultiCloser

func MultiCloser[C io.Closer](cs []C) io.Closer

MultiCloser returns an io.Closer which closes all elements concurrently. Use this when you have a slice of a type which implements io.Closer. []io.Closer can be cast directly to MultiCloser.

Example
ctx := context.Background()

f1 := CloseFailure("f")
f2 := CloseFailure("f")
cs := []CloseFailure{f1, f2}

var ms MultiStart
if err := ms.Start(ctx, f1, f2); err != nil {
	fmt.Println(err)
	return
}
mc := MultiCloser(cs)
if err := mc.Close(); err != nil {
	fmt.Println(err)
}
Output:

f started
f started
f close failure
f close failure
failed to close: f
failed to close: f

func NewTicker

func NewTicker(period time.Duration) *timeutil.Ticker

NewTicker returns a new timeutil.Ticker configured to: - fire the first tick immediately - apply DefaultJitter to each period Ticker.Stop should be called to prevent goroutine leaks.

Types

type Config added in v0.2.1

type Config struct {
	// Name is required. It will be logged shorthand on Start and Close, and appended to the logger name.
	// It must be unique among services sharing the same logger, in order to ensure uniqueness of the fully qualified name.
	Name string
	// NewSubServices is an optional constructor for dependent Services to Start and Close along with this one.
	NewSubServices func(logger.Logger) []Service
	// Start is an optional hook called after starting SubServices.
	Start func(context.Context) error
	// Close is an optional hook called before closing SubServices.
	Close func() error
}

Config is a configuration for constructing a Service, typically with an Engine, to be embedded and extended as part of a Service implementation.

func (Config) NewService added in v0.2.1

func (c Config) NewService(lggr logger.Logger) Service

NewService returns a new Service defined by Config.

  • You *should* embed the Service, in order to inherit the methods.
Example
package main

import (
	"context"
	"fmt"
	"time"

	"go.opentelemetry.io/otel/attribute"

	. "github.com/smartcontractkit/chainlink-common/pkg/internal/example" // nolint
	"github.com/smartcontractkit/chainlink-common/pkg/logger"
	"github.com/smartcontractkit/chainlink-common/pkg/services"
)

type configured struct {
	services.Service
	eng *services.Engine

	subA services.Service
	subB services.Service

	workCh chan func() (name string, err error)
}

func (c *configured) start(context.Context) error {
	c.eng.GoTick(services.NewTicker(time.Minute), c.do)
	return nil
}

func (c *configured) close() error {
	close(c.workCh)
	return nil
}

// do processes all outstanding work
func (c *configured) do(ctx context.Context) {
	ctx, span := c.eng.Tracer().Start(ctx, "DoWork")
	defer span.End()
	var count, errs int
	defer func() {
		span.SetAttributes(attribute.Int("count", count))
		span.SetAttributes(attribute.Int("errs", errs))
	}()
	for {
		select {
		case <-ctx.Done():
			return
		case work, ok := <-c.workCh:
			if !ok {
				return
			}
			count++
			name, err := work()
			if err != nil {
				errs++
				c.eng.SetHealthCond(name, err)
			} else {
				c.eng.ClearHealthCond(name)
			}
		default:
			return
		}
	}
}

func newFakeService(lggr logger.Logger, name string) services.Service {
	return services.Config{Name: name}.NewService(lggr)
}

func NewConfigured(lggr logger.Logger) services.Service {
	e := &configured{
		workCh: make(chan func() (string, error)),
	}
	e.Service, e.eng = services.Config{
		Name:  "Configured",
		Start: e.start,
		Close: e.close,
		NewSubServices: func(lggr logger.Logger) []services.Service {
			e.subA = newFakeService(lggr, "Sub-service-A")
			e.subB = newFakeService(lggr, "Sub-service-B")
			return []services.Service{e.subA, e.subB}
		},
	}.NewServiceEngine(lggr)
	return e
}

func main() {
	lggr, err := Logger()
	if err != nil {
		fmt.Println("Failed to create logger:", err)
		return
	}
	s := NewConfigured(lggr)
	if err = s.Start(context.Background()); err != nil {
		fmt.Println("Failed to start service:", err)
		return
	}
	if err = s.Close(); err != nil {
		fmt.Println("Failed to close service:", err)
		return
	}
	/* commented out because the log output is non-deterministic
	// Output:
	// INFO	Configured	Starting
	// INFO	Configured	Starting 2 sub-services
	// INFO	Configured.Sub-service-A	Starting
	// INFO	Configured.Sub-service-A	Started
	// INFO	Configured.Sub-service-B	Starting
	// INFO	Configured.Sub-service-B	Started
	// INFO	Configured	Started
	// INFO	Configured	Closing
	// INFO	Configured	Closing 2 sub-services
	// INFO	Configured.Sub-service-B	Closing
	// INFO	Configured.Sub-service-B	Closed
	// INFO	Configured.Sub-service-A	Closing
	// INFO	Configured.Sub-service-A	Closed
	// INFO	Configured	Closed
	*/
}
Output:

func (Config) NewServiceEngine added in v0.2.1

func (c Config) NewServiceEngine(lggr logger.Logger) (Service, *Engine)

NewServiceEngine returns a new Service defined by Config, and an Engine for managing health, goroutines, and logging.

  • You *should* embed the Service, in order to inherit the methods.
  • You *should not* embed the Engine. Use an unexported field instead.

For example:

type myType struct {
	services.Service
	env *service.Engine
}
t := myType{}
t.Service, t.eng = service.Config{
	Name: "MyType",
	Start: t.start,
	Close: t.close,
}.NewServiceEngine(lggr)

type Engine added in v0.2.1

type Engine struct {
	StopChan
	logger.SugaredLogger
	// contains filtered or unexported fields
}

Engine manages service internals like health, goroutine tracking, and shutdown signals. See Config.NewServiceEngine

func (*Engine) ClearHealthCond added in v0.2.1

func (e *Engine) ClearHealthCond(condition string)

ClearHealthCond removes a condition and error recorded by SetHealthCond.

func (*Engine) EmitHealthErr added in v0.2.1

func (e *Engine) EmitHealthErr(err error)

EmitHealthErr records an error to be reported via the next call to Healthy().

func (*Engine) Go added in v0.2.1

func (e *Engine) Go(fn func(context.Context))

Go runs fn in a tracked goroutine that will block closing the service.

If this operation runs continuously in the background, then do not trace it. If this operation will terminate, consider tracing via Tracer:

v.e.Go(func(ctx context.Context) {
	ctx, span := v.e.Tracer().Start(ctx, "MyOperationName")
	defer span.End()
})

func (*Engine) GoTick added in v0.2.1

func (e *Engine) GoTick(ticker *timeutil.Ticker, fn func(context.Context))

GoTick is like Go but calls fn for each tick.

v.e.GoTick(services.NewTicker(time.Minute), v.method)

Consider tracing each tick via Tracer:

v.e.GoTick(services.NewTicker(time.Minute), func(ctx context.Context) {
	ctx, span := v.e.Tracer().Start(ctx, "MyOperationName")
	defer span.End()
})

func (*Engine) IfNotStopped added in v0.4.0

func (e *Engine) IfNotStopped(fn func() error) error

IfNotStopped calls fn only if the service is not stopped.

func (*Engine) IfStarted added in v0.4.0

func (e *Engine) IfStarted(fn func() error) error

IfStarted calls fn only if the service is started.

func (*Engine) NewHealthCond added in v0.2.1

func (e *Engine) NewHealthCond(err error) (clear func())

NewHealthCond causes an unhealthy report, until the returned clear func() is called. Use this for simple cases where the func() can be kept in scope, and prefer to defer it inline if possible:

defer NewHealthCond(fmt.Errorf("foo bar: %i", err))()

See SetHealthCond for an alternative API.

func (*Engine) SetHealthCond added in v0.2.1

func (e *Engine) SetHealthCond(condition string, err error)

SetHealthCond records a condition key and an error, which causes an unhealthy report, until ClearHealthCond(condition) is called. condition keys are for internal use only, and do not show up in the health report.

func (*Engine) Tracer added in v0.4.0

func (e *Engine) Tracer() trace.Tracer

Tracer returns the otel tracer with service attributes included.

type ErrorBuffer

type ErrorBuffer struct {
	// contains filtered or unexported fields
}

ErrorBuffer uses joinedErrors interface to join multiple errors into a single error. This is useful to track the most recent N errors in a service and flush them as a single error.

func (*ErrorBuffer) Append

func (eb *ErrorBuffer) Append(incoming error)

func (*ErrorBuffer) Flush

func (eb *ErrorBuffer) Flush() (err error)

func (*ErrorBuffer) SetCap

func (eb *ErrorBuffer) SetCap(cap int)

type HealthChecker

type HealthChecker struct {
	StateMachine
	// contains filtered or unexported fields
}

HealthChecker is a services.Service which monitors other services and can be probed for system health.

func NewChecker deprecated

func NewChecker(ver, sha string) *HealthChecker

Deprecated: Use NewHealthChecker

func NewHealthChecker added in v0.4.0

func NewHealthChecker(ver, sha string) *HealthChecker

func (*HealthChecker) Close

func (c *HealthChecker) Close() error

func (*HealthChecker) IsHealthy

func (c *HealthChecker) IsHealthy() (healthy bool, errors map[string]error)

IsHealthy returns the current health of the system. A system is considered healthy if all checks are passing (no errors)

func (*HealthChecker) IsReady

func (c *HealthChecker) IsReady() (ready bool, errors map[string]error)

IsReady returns the current readiness of the system. A system is considered ready if all checks are passing (no errors)

func (*HealthChecker) Register

func (c *HealthChecker) Register(service HealthReporter) error

Register a service for health checks.

func (*HealthChecker) Start

func (c *HealthChecker) Start() error

func (*HealthChecker) Unregister

func (c *HealthChecker) Unregister(name string) error

Unregister a service.

type HealthReporter

type HealthReporter interface {
	// Ready should return nil if ready, or an error message otherwise. From the k8s docs:
	// > ready means it’s initialized and healthy means that it can accept traffic in kubernetes
	// See: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/
	Ready() error
	// HealthReport returns a full health report of the callee including its dependencies.
	// Keys are based on Name(), with nil values when healthy or errors otherwise.
	// Use CopyHealth to collect reports from sub-services.
	// This should run very fast, so avoid doing computation and instead prefer reporting pre-calculated state.
	HealthReport() map[string]error
	// Name returns the fully qualified name of the component. Usually the logger name.
	Name() string
}

HealthReporter should be implemented by any type requiring health checks.

type MultiStart

type MultiStart struct {
	// contains filtered or unexported fields
}

MultiStart is a utility for starting multiple services together. The set of started services is tracked internally, so that they can be closed if any single service fails to start.

Example
ctx := context.Background()

a := Healthy("a")
b := CloseFailure("b")
c := WontStart("c")

var ms MultiStart
if err := ms.Start(ctx, a, b, c); err != nil {
	fmt.Println(err)
}
Output:

a started
b started
c start failure
b close failure
a closed
failed to start: c
failed to close: b

func (*MultiStart) Close

func (m *MultiStart) Close() (err error)

Close closes all started services, in reverse order.

func (*MultiStart) CloseBecause

func (m *MultiStart) CloseBecause(reason error) (err error)

CloseBecause calls Close and returns reason along with any additional errors.

func (*MultiStart) Start

func (m *MultiStart) Start(ctx context.Context, srvcs ...StartClose) (err error)

Start attempts to Start all services. If any service fails to start, the previously started services will be Closed, and an error returned.

type Service

type Service interface {
	// Start the service.
	//  - Must return promptly if the context is cancelled.
	//  - Must not retain the context after returning (only applies to start-up)
	//  - Must not depend on external resources (no blocking network calls)
	Start(context.Context) error
	// Close stops the Service.
	// Invariants: Usually after this call the Service cannot be started
	// again, you need to build a new Service to do so.
	//
	// See MultiCloser
	Close() error

	HealthReporter
}

Service represents a long-running service inside the Application.

The simplest way to implement a Service is via NewService.

For other cases, consider embedding a services.StateMachine to implement these calls in a safe manner.

Example
package main

import (
	"context"
	"fmt"
	"sync"

	"github.com/smartcontractkit/chainlink-common/pkg/logger"
	"github.com/smartcontractkit/chainlink-common/pkg/services"

	. "github.com/smartcontractkit/chainlink-common/pkg/internal/example" // nolint
)

type stateMachine struct {
	services.StateMachine

	lggr logger.Logger

	stop services.StopChan
	wg   sync.WaitGroup
}

func (f *stateMachine) HealthReport() map[string]error {
	return map[string]error{f.Name(): f.Healthy()}
}

func (f *stateMachine) Name() string { return f.lggr.Name() }

func NewStateMachine(lggr logger.Logger) services.Service {
	return &stateMachine{
		lggr: logger.Named(lggr, "StateMachine"),
		stop: make(services.StopChan),
	}
}

func (f *stateMachine) Start(ctx context.Context) error {
	return f.StartOnce("StateMachine", func() error {
		f.lggr.Info("Starting")
		f.wg.Add(1)
		go f.run()
		return nil
	})
}

func (f *stateMachine) Close() error {
	return f.StopOnce("StateMachine", func() error {
		f.lggr.Info("Closing")
		close(f.stop) // trigger goroutine cleanup
		f.wg.Wait()   // wait for cleanup to complete
		return nil
	})
}

func (f *stateMachine) run() {
	defer f.wg.Done()

	for range f.stop {
		return // stop the routine
	}
}

func main() {
	lggr, err := Logger()
	if err != nil {
		fmt.Println("Failed to create logger:", err)
		return
	}
	s := NewStateMachine(lggr)
	if err = s.Start(context.Background()); err != nil {
		fmt.Println("Failed to start service:", err)
		return
	}
	if err = s.Close(); err != nil {
		fmt.Println("Failed to close service:", err)
		return
	}

}
Output:

INFO	StateMachine	Starting
INFO	StateMachine	Closing

type StartClose

type StartClose interface {
	Start(context.Context) error
	Close() error
}

StartClose is a subset of the Service interface.

type StateMachine

type StateMachine struct {
	sync.RWMutex // lock is held during startup/shutdown, RLock is held while executing functions dependent on a particular state

	// SvcErrBuffer is an ErrorBuffer that let service owners track critical errors happening in the service.
	//
	// SvcErrBuffer.SetCap(int) Overrides buffer limit from defaultErrorBufferCap
	// SvcErrBuffer.Append(error) Appends an error to the buffer
	// SvcErrBuffer.Flush() error returns all tracked errors as a single joined error
	SvcErrBuffer ErrorBuffer
	// contains filtered or unexported fields
}

StateMachine is a low-level primitive designed to be embedded in Service implementations. Default implementations of Ready() and Healthy() are included, as well as StartOnce and StopOnce helpers for implementing Service.Start and Service.Close.

See Config for a higher-level wrapper that extends StateMachine and that should be preferred when applicable.

func (*StateMachine) Healthy

func (once *StateMachine) Healthy() error

Healthy returns ErrNotStarted if the state is not started. Override this per-service with more specific implementations.

func (*StateMachine) IfNotStopped

func (once *StateMachine) IfNotStopped(f func()) (ok bool)

IfNotStopped runs the func and returns true if in any state other than Stopped

func (*StateMachine) IfStarted

func (once *StateMachine) IfStarted(f func()) (ok bool)

IfStarted runs the func and returns true only if started, otherwise returns false

func (*StateMachine) Ready

func (once *StateMachine) Ready() error

Ready returns ErrNotStarted if the state is not started.

func (*StateMachine) StartOnce

func (once *StateMachine) StartOnce(name string, fn func() error) error

StartOnce sets the state to Started

func (*StateMachine) State

func (once *StateMachine) State() string

State retrieves the current state

func (*StateMachine) StopOnce

func (once *StateMachine) StopOnce(name string, fn func() error) error

StopOnce sets the state to Stopped

type StopChan

type StopChan chan struct{}

A StopChan signals when some work should stop. Use StopChanR if you already have a read only <-chan.

func (StopChan) Ctx

Ctx cancels a context.Context when StopChan is closed.

func (StopChan) CtxCancel

CtxCancel cancels a context.Context when StopChan is closed. Returns ctx and cancel unmodified, for convenience.

func (StopChan) CtxWithTimeout added in v0.4.0

func (s StopChan) CtxWithTimeout(timeout time.Duration) (context.Context, context.CancelFunc)

CtxWithTimeout cancels a context.Context when StopChan is closed.

func (StopChan) NewCtx

func (s StopChan) NewCtx() (context.Context, context.CancelFunc)

NewCtx returns a background context.Context that is cancelled when StopChan is closed.

type StopRChan

type StopRChan <-chan struct{}

A StopRChan signals when some work should stop. This is a receive-only version of StopChan, for casting an existing <-chan.

func (StopRChan) Ctx

Ctx cancels a context.Context when StopChan is closed.

func (StopRChan) CtxCancel

CtxCancel cancels a context.Context when StopChan is closed. Returns ctx and cancel unmodified, for convenience.

func (StopRChan) CtxWithTimeout added in v0.4.0

func (s StopRChan) CtxWithTimeout(timeout time.Duration) (context.Context, context.CancelFunc)

CtxWithTimeout cancels a context.Context when StopChan is closed.

func (StopRChan) NewCtx

func (s StopRChan) NewCtx() (context.Context, context.CancelFunc)

NewCtx returns a background context.Context that is cancelled when StopChan is closed.

type TickerConfig

type TickerConfig struct {
	// Initial delay before the first tick.
	Initial time.Duration
	// JitterPct to apply to each period.
	JitterPct timeutil.JitterPct
}

func (TickerConfig) NewTicker

func (c TickerConfig) NewTicker(period time.Duration) *timeutil.Ticker

NewTicker returns a new timeutil.Ticker for the given configuration. Ticker.Stop should be called to prevent goroutine leaks.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL