services

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2024 License: MIT Imports: 13 Imported by: 103

Documentation

Index

Examples

Constants

View Source
const DefaultJitter timeutil.JitterPct = 0.1

DefaultJitter is +/-10%

Variables

View Source
var (
	ErrAlreadyStopped      = errors.New("already stopped")
	ErrCannotStopUnstarted = errors.New("cannot stop unstarted service")
)

Functions

func CloseAll

func CloseAll(cs ...io.Closer) error

CloseAll closes all elements concurrently. Use this when you have various different types of io.Closer.

func CopyHealth

func CopyHealth(dest, src map[string]error)

CopyHealth copies health statuses from src to dest. Useful when implementing HealthReporter.HealthReport. If duplicate names are encountered, the errors are joined, unless testing in which case a panic is thrown.

func MultiCloser

func MultiCloser[C io.Closer](cs []C) io.Closer

MultiCloser returns an io.Closer which closes all elements concurrently. Use this when you have a slice of a type which implements io.Closer. []io.Closer can be cast directly to MultiCloser.

Example
ctx := context.Background()

f1 := CloseFailure("f")
f2 := CloseFailure("f")
cs := []CloseFailure{f1, f2}

var ms MultiStart
if err := ms.Start(ctx, f1, f2); err != nil {
	fmt.Println(err)
	return
}
mc := MultiCloser(cs)
if err := mc.Close(); err != nil {
	fmt.Println(err)
}
Output:

f started
f started
f close failure
f close failure
failed to close: f
failed to close: f

func NewTicker

func NewTicker(period time.Duration) *timeutil.Ticker

NewTicker returns a new timeutil.Ticker configured to: - fire the first tick immediately - apply DefaultJitter to each period

Types

type ErrorBuffer

type ErrorBuffer struct {
	// contains filtered or unexported fields
}

ErrorBuffer uses joinedErrors interface to join multiple errors into a single error. This is useful to track the most recent N errors in a service and flush them as a single error.

func (*ErrorBuffer) Append

func (eb *ErrorBuffer) Append(incoming error)

func (*ErrorBuffer) Flush

func (eb *ErrorBuffer) Flush() (err error)

func (*ErrorBuffer) SetCap

func (eb *ErrorBuffer) SetCap(cap int)

type HealthChecker

type HealthChecker struct {
	StateMachine
	// contains filtered or unexported fields
}

HealthChecker is a services.Service which monitors other services and can be probed for system health.

func NewChecker

func NewChecker(ver, sha string) *HealthChecker

func (*HealthChecker) Close

func (c *HealthChecker) Close() error

func (*HealthChecker) IsHealthy

func (c *HealthChecker) IsHealthy() (healthy bool, errors map[string]error)

IsHealthy returns the current health of the system. A system is considered healthy if all checks are passing (no errors)

func (*HealthChecker) IsReady

func (c *HealthChecker) IsReady() (ready bool, errors map[string]error)

IsReady returns the current readiness of the system. A system is considered ready if all checks are passing (no errors)

func (*HealthChecker) Register

func (c *HealthChecker) Register(service HealthReporter) error

Register a service for health checks.

func (*HealthChecker) Start

func (c *HealthChecker) Start() error

func (*HealthChecker) Unregister

func (c *HealthChecker) Unregister(name string) error

Unregister a service.

type HealthReporter

type HealthReporter interface {
	// Ready should return nil if ready, or an error message otherwise. From the k8s docs:
	// > ready means it’s initialized and healthy means that it can accept traffic in kubernetes
	// See: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/
	Ready() error
	// HealthReport returns a full health report of the callee including its dependencies.
	// key is the dep name, value is nil if healthy, or error message otherwise.
	// Use CopyHealth to collect reports from sub-services.
	HealthReport() map[string]error
	// Name returns the fully qualified name of the component. Usually the logger name.
	Name() string
}

HealthReporter should be implemented by any type requiring health checks.

type MultiStart

type MultiStart struct {
	// contains filtered or unexported fields
}

MultiStart is a utility for starting multiple services together. The set of started services is tracked internally, so that they can be closed if any single service fails to start.

Example
ctx := context.Background()

a := Healthy("a")
b := CloseFailure("b")
c := WontStart("c")

var ms MultiStart
if err := ms.Start(ctx, a, b, c); err != nil {
	fmt.Println(err)
}
Output:

a started
b started
c start failure
b close failure
a closed
failed to start: c
failed to close: b

func (*MultiStart) Close

func (m *MultiStart) Close() (err error)

Close closes all started services, in reverse order.

func (*MultiStart) CloseBecause

func (m *MultiStart) CloseBecause(reason error) (err error)

CloseBecause calls Close and returns reason along with any additional errors.

func (*MultiStart) Start

func (m *MultiStart) Start(ctx context.Context, srvcs ...StartClose) (err error)

Start attempts to Start all services. If any service fails to start, the previously started services will be Closed, and an error returned.

type Service

type Service interface {
	// Start the service.
	//  - Must return promptly if the context is cancelled.
	//  - Must not retain the context after returning (only applies to start-up)
	//  - Must not depend on external resources (no blocking network calls)
	Start(context.Context) error
	// Close stops the service.
	// Invariants: Usually after this call the Service cannot be started
	// again, you need to build a new Service to do so.
	Close() error

	HealthReporter
}

Service represents a long-running service inside the Application.

Typically, a Service will leverage utils.StateMachine to implement these calls in a safe manner.

Template

Mockable Foo service with a run loop

type (
	// Expose a public interface so we can mock the service.
	Foo interface {
		service.Service

		// ...
	}

	foo struct {
		// ...

		stop chan struct{}
		done chan struct{}

		utils.StartStopOnce
	}
)

var _ Foo = (*foo)(nil)

func NewFoo() Foo {
	f := &foo{
		// ...
	}

	return f
}

func (f *foo) Start(ctx context.Context) error {
	return f.StartOnce("Foo", func() error {
		go f.run()

		return nil
	})
}

func (f *foo) Close() error {
	return f.StopOnce("Foo", func() error {
		// trigger goroutine cleanup
		close(f.stop)
		// wait for cleanup to complete
		<-f.done
		return nil
	})
}

func (f *foo) run() {
	// signal cleanup completion
	defer close(f.done)

	for {
		select {
		// ...
		case <-f.stop:
			// stop the routine
			return
		}
	}

}

type StartClose

type StartClose interface {
	Start(context.Context) error
	Close() error
}

StartClose is a subset of the ServiceCtx interface.

type StateMachine

type StateMachine struct {
	sync.RWMutex // lock is held during startup/shutdown, RLock is held while executing functions dependent on a particular state

	// SvcErrBuffer is an ErrorBuffer that let service owners track critical errors happening in the service.
	//
	// SvcErrBuffer.SetCap(int) Overrides buffer limit from defaultErrorBufferCap
	// SvcErrBuffer.Append(error) Appends an error to the buffer
	// SvcErrBuffer.Flush() error returns all tracked errors as a single joined error
	SvcErrBuffer ErrorBuffer
	// contains filtered or unexported fields
}

StateMachine contains a state integer

func (*StateMachine) Healthy

func (once *StateMachine) Healthy() error

Healthy returns ErrNotStarted if the state is not started. Override this per-service with more specific implementations.

func (*StateMachine) IfNotStopped

func (once *StateMachine) IfNotStopped(f func()) (ok bool)

IfNotStopped runs the func and returns true if in any state other than Stopped

func (*StateMachine) IfStarted

func (once *StateMachine) IfStarted(f func()) (ok bool)

IfStarted runs the func and returns true only if started, otherwise returns false

func (*StateMachine) Ready

func (once *StateMachine) Ready() error

Ready returns ErrNotStarted if the state is not started.

func (*StateMachine) StartOnce

func (once *StateMachine) StartOnce(name string, fn func() error) error

StartOnce sets the state to Started

func (*StateMachine) State

func (once *StateMachine) State() string

State retrieves the current state

func (*StateMachine) StopOnce

func (once *StateMachine) StopOnce(name string, fn func() error) error

StopOnce sets the state to Stopped

type StopChan

type StopChan chan struct{}

A StopChan signals when some work should stop. Use StopChanR if you already have a read only <-chan.

func (StopChan) Ctx

Ctx cancels a context.Context when StopChan is closed.

func (StopChan) CtxCancel

CtxCancel cancels a context.Context when StopChan is closed. Returns ctx and cancel unmodified, for convenience.

func (StopChan) NewCtx

func (s StopChan) NewCtx() (context.Context, context.CancelFunc)

NewCtx returns a background context.Context that is cancelled when StopChan is closed.

type StopRChan

type StopRChan <-chan struct{}

A StopRChan signals when some work should stop. This is a receive-only version of StopChan, for casting an existing <-chan.

func (StopRChan) Ctx

Ctx cancels a context.Context when StopChan is closed.

func (StopRChan) CtxCancel

CtxCancel cancels a context.Context when StopChan is closed. Returns ctx and cancel unmodified, for convenience.

func (StopRChan) NewCtx

func (s StopRChan) NewCtx() (context.Context, context.CancelFunc)

NewCtx returns a background context.Context that is cancelled when StopChan is closed.

type TickerConfig

type TickerConfig struct {
	// Initial delay before the first tick.
	Initial time.Duration
	// JitterPct to apply to each period.
	JitterPct timeutil.JitterPct
}

func (TickerConfig) NewTicker

func (c TickerConfig) NewTicker(period time.Duration) *timeutil.Ticker

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL