common

package
v4.0.0-...-13a3402 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2023 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateLogger

func CreateLogger(c *cli.Context, conf config.Type, streamsMode bool) (logger log.Modular, err error)

CreateLogger from a CLI context and a stream config.

func DelayShutdown

func DelayShutdown(ctx context.Context, duration time.Duration) error

DelayShutdown attempts to block until either: - The delay period ends - The provided context is cancelled - The process receives an interrupt or sigterm

func ReadConfig

func ReadConfig(c *cli.Context, streamsMode bool) (mainPath string, inferred bool, conf *config.Reader)

ReadConfig attempts to read a general service wide config via a returned config.Reader based on input CLI flags. This includes applying any config overrides expressed by the --set flag.

func RunManagerUntilStopped

func RunManagerUntilStopped(
	c *cli.Context,
	conf config.Type,
	stopMgr *StoppableManager,
	stopStrm Stoppable,
	dataStreamClosedChan chan struct{},
) int

RunManagerUntilStopped will run the provided HTTP server and block until either a provided stream stoppable is gracefully terminated (via the dataStreamClosedChan) or a signal is given to the process to terminate, at which point the provided HTTP server, the manager, and the stoppable is stopped according to the configured shutdown timeout.

func RunService

func RunService(c *cli.Context, version, dateBuilt string, streamsMode bool) int

RunService runs a service command (either the default or the streams subcommand).

Types

type Stoppable

type Stoppable interface {
	Stop(ctx context.Context) error
}

Stoppable represents a resource (a Benthos stream) that can be stopped.

func CombineStoppables

func CombineStoppables(stoppables ...Stoppable) Stoppable

CombineStoppables returns a single Stoppable that will call each provided Stoppable in the order they are specified on a Stop. If any stoppable returns an error all subsequent stoppables will still be called before an error is returned.

type StoppableManager

type StoppableManager struct {
	// contains filtered or unexported fields
}

StoppableManager wraps a manager and API type that potentially outlives one or more dependent streams and encapsulates the logic for shutting them down within the deadline of a given context.

func CreateManager

func CreateManager(
	c *cli.Context,
	logger log.Modular,
	streamsMode bool,
	version, dateBuilt string,
	conf config.Type,
	mgrOpts ...manager.OptFunc,
) (stoppableMgr *StoppableManager, err error)

CreateManager from a CLI context and a stream config.

func (*StoppableManager) API

func (s *StoppableManager) API() *api.Type

API returns the underlying api type.

func (*StoppableManager) Manager

func (s *StoppableManager) Manager() *manager.Type

Manager returns the underlying manager type.

func (*StoppableManager) Stop

func (s *StoppableManager) Stop(ctx context.Context) error

Stop the manager and the API server, gracefully if possible. If the context has a deadline then this will be used as a mechanism for pre-emptively attempting ungraceful stopping when nearing the deadline.

type SwappableStopper

type SwappableStopper struct {
	// contains filtered or unexported fields
}

SwappableStopper wraps an active Stoppable resource in a mechanism that allows changing the resource for something else after stopping it.

func NewSwappableStopper

func NewSwappableStopper(s Stoppable) *SwappableStopper

NewSwappableStopper creates a new swappable stopper resource around an initial stoppable.

func (*SwappableStopper) Replace

func (s *SwappableStopper) Replace(ctx context.Context, fn func() (Stoppable, error)) error

Replace the resource with something new only once the existing one is stopped. In order to avoid unnecessary start up of the swapping resource we accept a closure that constructs it and is only called when we're ready.

func (*SwappableStopper) Stop

func (s *SwappableStopper) Stop(ctx context.Context) error

Stop the wrapped resource.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL