bus

package
v0.46.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2024 License: MIT Imports: 16 Imported by: 63

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrDirectiveDisposed = errors.New("directive disposed unexpectedly")

ErrDirectiveDisposed is returned when the directive was unexpectedly disposed.

Functions

func ExecCollectValues

func ExecCollectValues[T directive.Value](
	ctx context.Context,
	bus Bus,
	dir directive.Directive,
	waitOne bool,
	valDisposeCb func(),
) ([]T, directive.Instance, directive.Reference, error)

ExecCollectValues collects one or more values while ctx is active and directive is not idle.

Returns vals, ref, nil if the directive is idle. Returns err if any resolver returns an error. valDisposeCb is called if any of the values are no longer valid. valDisposeCb might be called multiple times. If err != nil, ref == nil. If waitOne=true, waits for at least one value before returning.

func ExecOneOff

func ExecOneOff(
	ctx context.Context,
	bus Bus,
	dir directive.Directive,
	idleCb ExecIdleCallback,
	valDisposeCallback func(),
) (directive.AttachedValue, directive.Instance, directive.Reference, error)

ExecOneOff executes a one-off directive.

idleCb is called when idle with the list of resolver errors. idleCb should return (wait, error): if wait=true, continues to wait. if idleCb is nil: continues to wait when the directive becomes idle errs is the list of errors from the resolvers (if any)

If any resolvers return an error, returns that error. valDisposeCb is called if the value is no longer valid. valDisposeCb might be called multiple times. If err != nil, ref == nil.

func ExecOneOffTyped added in v0.36.0

func ExecOneOffTyped[T comparable](
	ctx context.Context,
	bus Bus,
	dir directive.Directive,
	idleCb ExecIdleCallback,
	valDisposeCallback func(),
) (directive.TypedAttachedValue[T], directive.Instance, directive.Reference, error)

ExecOneOffTyped executes a one-off directive with a value type.

idleCb is called when idle with the list of resolver errors. idleCb should return (wait, error): if wait=true, continues to wait. if idleCb is nil: continues to wait when the directive becomes idle errs is the list of errors from the resolvers (if any)

If any resolvers return an error, returns that error. valDisposeCb is called if the value is no longer valid. valDisposeCb might be called multiple times. If err != nil, ref == nil.

func ExecOneOffWatchCb added in v0.34.0

ExecOneOffWatchCb executes a one-off directive and watches for changes. Selects one value from the result. Calls the callback when the selected value changes. Calls with nil when the value becomes unset. If the callback returns false, the value is rejected, and the next value will be used instead.

func ExecOneOffWatchCh added in v0.8.7

ExecOneOffWatchCh executes a one-off directive and watches for changes. Returns a channel with size 1 which will hold the latest value. Continues to watch for changes until the directive is released. Returns a nil value if the value becomes unset.

func ExecOneOffWatchCtr added in v0.28.0

ExecOneOffWatchCtr executes a one-off directive and watches for changes. Stores the latest value into the ccontainer. Continues to watch for changes until the directive is released.

func ExecOneOffWatchEffect added in v0.36.2

func ExecOneOffWatchEffect[T directive.ComparableValue](
	cb func(val directive.TypedAttachedValue[T]) func(),
	b Bus,
	dir directive.Directive,
) (directive.Instance, directive.Reference, error)

ExecOneOffWatchEffect calls a callback for a single value resolved for a directive.

The callback can return an optional function to call when the value was removed. The callback will continue to be called until the ref is removed.

func ExecOneOffWatchLatestCb added in v0.46.1

func ExecOneOffWatchLatestCb[T directive.ComparableValue](
	ctx context.Context,
	b Bus,
	dir directive.Directive,
	initCb func(di directive.Instance, ref directive.Reference) error,
	valCb func(val directive.TypedAttachedValue[T]) error,
) error

ExecOneOffWatchLatestCb executes a one-off directive and watches for changes. The init callback is called with the directive instance and reference once set. The callback is called with the latest value or nil if the value is removed. The callback will not be called concurrently (only one instance at a time). Values may be dropped if the callback takes longer to execute than a value is available. Continues to watch for changes until the directive is released. Both callbacks are optional. Calls the callback with a nil value if the value becomes unset.

func ExecOneOffWatchRoutine added in v0.22.4

ExecOneOffWatchRoutine executes a one-off directive and watches for changes.

Calls SetState on the routine container. The routine will be restarted when the value changes. Note: the routine will not start until SetContext and SetStateRoutine are called on the routine container.

func ExecOneOffWatchTransformEffect added in v0.36.3

func ExecOneOffWatchTransformEffect[T, E directive.ComparableValue](
	ctx context.Context,
	transform func(ctx context.Context, val directive.TypedAttachedValue[T]) (E, bool, error),
	selectValue func(vals []directive.TransformedAttachedValue[T, E]) int,
	effect func(val directive.TransformedAttachedValue[T, E]) func(),
	b Bus,
	dir directive.Directive,
	keyedOpts ...keyed.Option[uint32, directive.TypedAttachedValue[T]],
) (directive.Instance, directive.Reference, error)

ExecOneOffWatchTransformEffect calls a callback for a single value resolved for a directive.

The transform function will be called concurrently for each value resolved by the directive. The transform callback can return nil, false to reject the value. The transform function can return a transformed value.

The select function is called with all values produced by the transform functions so far, and should select which value to emit to the effect based on a stable sort. It can return an index in the slice or -1 to select none.

If selectValue is nil, the first returned value (lowest value id) will be used.

The callback will be called with the most recently selected value. The callback can return an optional function to call when the value was removed or changed. The callback will continue to be called until the ref is removed.

func ExecOneOffWithFilter added in v0.20.8

func ExecOneOffWithFilter(
	ctx context.Context,
	bus Bus,
	dir directive.Directive,
	idleCb ExecIdleCallback,
	valDisposeCallback func(),
	filterCb func(val directive.AttachedValue) (bool, error),
) (directive.AttachedValue, directive.Instance, directive.Reference, error)

ExecOneOffWithFilter executes a one-off directive with a filter cb.

Waits until the callback returns true before returning a value. valDisposeCb is called if the value is no longer valid. valDisposeCb might be called multiple times.

idleCb is called when idle with the list of resolver errors. idleCb should return (wait, error): if wait=true, continues to wait. if idleCb is nil: if the directive becomes idle & any resolvers failed, returns the resolver error. errs is the list of errors from the resolvers (if any)

If err != nil, ref == nil.

func ExecOneOffWithFilterTyped added in v0.36.0

func ExecOneOffWithFilterTyped[T comparable](
	ctx context.Context,
	bus Bus,
	dir directive.Directive,
	idleCb ExecIdleCallback,
	valDisposeCallback func(),
	filterCb func(val directive.TypedAttachedValue[T]) (bool, error),
) (directive.TypedAttachedValue[T], directive.Instance, directive.Reference, error)

ExecOneOffWithFilterTyped executes a one-off directive with a filter cb.

Waits until the callback returns true before returning a value. valDisposeCb is called if the value is no longer valid. valDisposeCb might be called multiple times.

idleCb is called when idle with the list of resolver errors. idleCb should return (wait, error): if wait=true, continues to wait. if idleCb is nil: continues to wait when the directive becomes idle errs is the list of errors from the resolvers (if any)

If err != nil, ref == nil.

func ExecOneOffWithXfrm added in v0.38.0

func ExecOneOffWithXfrm(
	ctx context.Context,
	bus Bus,
	dir directive.Directive,
	idleCb ExecIdleCallback,
	valDisposeCallback func(),
	xfrmCb func(val directive.AttachedValue) (directive.Value, bool, error),
) (directive.Value, directive.AttachedValue, directive.Instance, directive.Reference, error)

ExecOneOffWithXfrm executes a one-off directive with a transformation filter cb.

Waits until the callback returns true and nil err before returning. valDisposeCb is called if the value is no longer valid. valDisposeCb might be called multiple times.

idleCb is called when idle with the list of resolver errors. idleCb should return (wait, error): if wait=true, continues to wait. if idleCb is nil: continues to wait when the directive becomes idle errs is the list of errors from the resolvers (if any)

If err != nil, ref == nil.

func ExecOneOffWithXfrmTyped added in v0.38.0

func ExecOneOffWithXfrmTyped[T, R directive.ComparableValue](
	ctx context.Context,
	bus Bus,
	dir directive.Directive,
	idleCb ExecIdleCallback,
	valDisposeCallback func(),
	xfrmCb func(val directive.TypedAttachedValue[T]) (R, bool, error),
) (R, directive.TypedAttachedValue[T], directive.Instance, directive.Reference, error)

ExecOneOffWithXfrmTyped executes a one-off directive with a transformation filter cb.

Waits until the callback returns true and nil err before returning. valDisposeCb is called if the value is no longer valid. valDisposeCb might be called multiple times.

idleCb is called when idle with the list of resolver errors. idleCb should return (wait, error): if wait=true, continues to wait. if idleCb is nil: continues to wait when the directive becomes idle errs is the list of errors from the resolvers (if any)

If err != nil, ref == nil.

func ExecWaitValue added in v0.15.0

func ExecWaitValue[T directive.Value](
	ctx context.Context,
	b Bus,
	dir directive.Directive,
	idleCb func(isIdle bool, errs []error) (bool, error),
	valDisposeCallback func(),
	checkCb func(val T) (bool, error),
) (T, directive.Instance, directive.Reference, error)

ExecWaitValue executes a directive and waits for a value matching the cb.

valDisposeCallback can be nil, will be called if the value is disposed. If cb returns true, nil, returns the value. If checkCb is nil, returns first value.

idleCb is called when idle with the list of resolver errors. idleCb should return (wait, error): if wait=true, continues to wait. if idleCb is nil: continues to wait when the directive becomes idle errs is the list of errors from the resolvers (if any)

func ExecWatchEffect added in v0.35.1

func ExecWatchEffect[T directive.ComparableValue](
	cb func(val directive.TypedAttachedValue[T]) func(),
	b Bus,
	dir directive.Directive,
) (directive.Instance, directive.Reference, error)

ExecWatchEffect calls a callback for each value resolved for a directive.

NOTE: The callbacks are run as part of the Directive Value callback and should return as quickly as possible to avoid blocking the other directive value callbacks.

The callback can return an optional function to call when the value was removed. The callback will continue to be called until the ref is removed.

func ExecWatchTransformEffect added in v0.36.3

func ExecWatchTransformEffect[T, E directive.ComparableValue](
	ctx context.Context,
	transform func(ctx context.Context, val directive.TypedAttachedValue[T]) (E, bool, error),
	effect func(val directive.TransformedAttachedValue[T, E]) func(),
	b Bus,
	dir directive.Directive,
	keyedOpts ...keyed.Option[uint32, directive.TypedAttachedValue[T]],
) (directive.Instance, directive.Reference, error)

ExecWatchTransformEffect calls a callback for each value resolved for a directive.

The transform function will be called concurrently for each value resolved by the directive. The transform callback can return nil, false to reject the value. The transform function can return a transformed value.

The callback will be called with the transformed values. The callback can return an optional function to call when the value was removed or changed. The callback will continue to be called until the ref is removed.

func NewCallbackHandler

func NewCallbackHandler(
	valCb func(directive.AttachedValue),
	removedCb func(directive.AttachedValue),
	disposeCb func(),
) directive.ReferenceHandler

NewCallbackHandler wraps callback functions into a handler object.

func NewOneOffRefCount added in v0.36.0

func NewOneOffRefCount[T comparable](
	ctx context.Context,
	bus Bus,
	dir directive.Directive,
	idleCb ExecIdleCallback,
	filterCb func(val directive.TypedAttachedValue[T]) (bool, error),
	keepUnref bool,
	target *ccontainer.CContainer[T],
	targetErr *ccontainer.CContainer[*error],
) *refcount.RefCount[T]

NewOneOffRefCount builds a RefCount which executes a one-off directive.

When at least one reference to the container is present, and ctx is set to an active context, the directive will be added to the bus, and the first valid value returned matching the type parameter and optional filter will be stored in the refcount container.

ctx, target and targetErr can be empty keepUnref sets if the value should be kept if there are zero references.

idleCb is called when idle with the list of resolver errors. idleCb should return (wait, error): if wait=true, continues to wait. if idleCb is nil: continues to wait when the directive becomes idle errs is the list of errors from the resolvers (if any)

If err != nil, ref == nil.

func NewOneOffRefCountResolver added in v0.36.0

func NewOneOffRefCountResolver[T comparable](
	bus Bus,
	dir directive.Directive,
	idleCb ExecIdleCallback,
	filterCb func(val directive.TypedAttachedValue[T]) (bool, error),
) refcount.RefCountResolver[T]

NewOneOffRefCountResolver builds a RefCount resolver which executes a one-off directive.

When at least one reference to the container is present, and ctx is set to an active context, the directive will be added to the bus, and the first valid value returned matching the type parameter and optional filter will be stored in the refcount container.

idleCb is called when idle with the list of resolver errors. idleCb should return (wait, error): if wait=true, continues to wait. if idleCb is nil: continues to wait when the directive becomes idle errs is the list of errors from the resolvers (if any)

If err != nil, ref == nil.

func NewPassThruHandler

func NewPassThruHandler(
	passthru directive.ResolverHandler,
	disposeCb func(),
) directive.ReferenceHandler

NewPassThruHandler builds a new pass-through handler.

func NewTransformHandler added in v0.29.2

func NewTransformHandler(
	passthru directive.ResolverHandler,
	xfrm func(val directive.AttachedValue) (directive.Value, bool),
	disposeCb func(),
) directive.ReferenceHandler

NewTransformHandler builds a new pass-through handler.

Types

type Bus

type Bus interface {
	// Controller is the directive controller.
	directive.Controller

	// GetControllers returns a list of all currently active controllers.
	GetControllers() []controller.Controller

	// AddController adds a controller to the bus and calls Execute().
	// The controller will exit if ctx is canceled.
	// Returns a release function for the controller reference.
	// The controller will receive directive callbacks until removed.
	// Any fatal error in the controller is written to cb.
	// If the controller is released, cb will be called with nil.
	// cb can be nil
	AddController(ctx context.Context, ctrl controller.Controller, cb func(exitErr error)) (func(), error)

	// ExecuteController adds a controller to the bus and calls Execute().
	// The controller will exit if ctx is canceled.
	// Any fatal error in the controller is returned.
	// The controller will receive directive callbacks.
	// If this function returns nil, call RemoveController to remove the controller.
	ExecuteController(context.Context, controller.Controller) error

	// RemoveController removes the controller from the bus.
	// The controller will no longer receive callbacks.
	// Note: this might not cancel the Execute() context automatically.
	RemoveController(controller.Controller)
}

Bus manages running controllers. It has an attached directive controller, which is used to build declarative state requests between controllers.

type BusController added in v0.15.1

type BusController[T config.Config] struct {
	// contains filtered or unexported fields
}

BusController implements a base Controller with an attached Bus for directives. It has stubs for the functions implementing all required interfaces. Usually you should extend this type, overriding functions as needed.

func NewBusController added in v0.15.1

func NewBusController[T config.Config](
	le *logrus.Entry,
	b Bus,
	conf T,
	controllerID string,
	controllerVersion semver.Version,
	controllerDescrip string,
) *BusController[T]

NewBusController constructs a new Controller with details.

func (*BusController[T]) Close added in v0.15.1

func (c *BusController[T]) Close() error

Close releases any resources used by the controller.

func (*BusController[T]) Execute added in v0.15.1

func (c *BusController[T]) Execute(ctx context.Context) error

Execute executes the controller goroutine.

func (*BusController[T]) GetBus added in v0.15.1

func (c *BusController[T]) GetBus() Bus

GetBus returns the bus.

func (*BusController[T]) GetConfig added in v0.15.1

func (c *BusController[T]) GetConfig() T

GetConfig returns the config. Note: don't modify this object!

func (*BusController[T]) GetControllerInfo added in v0.15.1

func (c *BusController[T]) GetControllerInfo() *controller.Info

GetControllerInfo returns information about the controller.

func (*BusController[T]) GetLogger added in v0.15.1

func (c *BusController[T]) GetLogger() *logrus.Entry

GetLogger returns the root logger.

func (*BusController[T]) HandleDirective added in v0.15.1

func (c *BusController[T]) HandleDirective(ctx context.Context, di directive.Instance) ([]directive.Resolver, error)

HandleDirective asks if the handler can resolve the directive.

type BusFactory added in v0.15.0

type BusFactory[T config.Config, C controller.Controller] struct {
	// contains filtered or unexported fields
}

BusFactory implements Factory with an attached Bus for directives.

func NewBusControllerFactory added in v0.15.1

func NewBusControllerFactory[T config.Config, C controller.Controller](
	b Bus,
	configID string,
	controllerID string,
	version semver.Version,
	controllerDescription string,
	newConfig func() T,
	newController func(base *BusController[T]) (C, error),
) *BusFactory[T, C]

NewBusControllerFactory builds a Factory for a type wrapping BusController.

func NewBusFactory added in v0.23.2

func NewBusFactory[T config.Config, C controller.Controller](
	b Bus,
	configID string,
	version semver.Version,
	newConfig func() T,
	newController func(le *logrus.Entry, b Bus, conf T) (C, error),
) *BusFactory[T, C]

NewBusFactory constructs a new Factory with details.

func (*BusFactory[T, C]) Construct added in v0.15.0

func (f *BusFactory[T, C]) Construct(
	ctx context.Context,
	cc config.Config,
	opts controller.ConstructOpts,
) (controller.Controller, error)

Construct constructs the associated controller given configuration.

func (*BusFactory[T, C]) ConstructConfig added in v0.15.0

func (f *BusFactory[T, C]) ConstructConfig() config.Config

ConstructConfig constructs an instance of the controller configuration.

func (*BusFactory[T, C]) GetConfigID added in v0.15.0

func (f *BusFactory[T, C]) GetConfigID() string

GetConfigID returns the unique config ID for the controller.

func (*BusFactory[T, C]) GetVersion added in v0.15.0

func (f *BusFactory[T, C]) GetVersion() semver.Version

GetVersion returns the version of this controller.

type CallbackHandler

type CallbackHandler = directive.CallbackHandler

CallbackHandler is a reference handler that uses function callbacks.

type ExecIdleCallback added in v0.27.0

type ExecIdleCallback func(isIdle bool, errs []error) (bool, error)

ExecIdleCallback is an idle callback for ExecOneOffWithFilter.

idleCb is called with the idle state and list of resolver errors. idleCb should return (wait, error): if wait=true, continues to wait. if idleCb is nil: continues to wait when the directive becomes idle errs is the list of errors from the resolvers (if any)

func ReturnIfIdle added in v0.27.1

func ReturnIfIdle(returnIfIdle bool) ExecIdleCallback

ReturnIfIdle returns an ExecIdleCallback that returns when the directive becomes idle if true.

func ReturnWhenIdle added in v0.27.0

func ReturnWhenIdle() ExecIdleCallback

ReturnWhenIdle returns an ExecIdleCallback that returns when the directive becomes idle.

func WaitWhenIdle added in v0.27.0

func WaitWhenIdle(ignoreErrors bool) ExecIdleCallback

WaitWhenIdle returns an ExecIdleCallback that waits when the directive becomes idle.

if ignoreErrors is false, returns any non-nil resolver error that occurs.

type PassThruHandler

type PassThruHandler struct {
	// contains filtered or unexported fields
}

PassThruHandler is a reference handler that passes through to a resolver handler.

func (*PassThruHandler) HandleInstanceDisposed

func (h *PassThruHandler) HandleInstanceDisposed(di directive.Instance)

HandleInstanceDisposed is called when a directive instance is disposed. This will occur if Close() is called on the directive instance.

func (*PassThruHandler) HandleValueAdded

func (h *PassThruHandler) HandleValueAdded(
	_ directive.Instance,
	v directive.AttachedValue,
)

HandleValueAdded is called when a value is added to the directive.

func (*PassThruHandler) HandleValueRemoved

func (h *PassThruHandler) HandleValueRemoved(
	_ directive.Instance,
	v directive.AttachedValue,
)

HandleValueRemoved is called when a value is removed from the directive.

type TransformHandler added in v0.29.2

type TransformHandler struct {
	// contains filtered or unexported fields
}

TransformHandler is a reference handler that transforms values to a resolver handler. xfrm is the transformation callback called with each AttachedValue. xfrm should return the Value to add to the ResolverHandler. if xfrm returns nil, false, the value is ignored. if xfrm is nil, acts identically to PassThruHandler.

func (*TransformHandler) HandleInstanceDisposed added in v0.29.2

func (h *TransformHandler) HandleInstanceDisposed(di directive.Instance)

HandleInstanceDisposed is called when a directive instance is disposed. This will occur if Close() is called on the directive instance.

func (*TransformHandler) HandleValueAdded added in v0.29.2

func (h *TransformHandler) HandleValueAdded(
	_ directive.Instance,
	v directive.AttachedValue,
)

HandleValueAdded is called when a value is added to the directive.

func (*TransformHandler) HandleValueRemoved added in v0.29.2

func (h *TransformHandler) HandleValueRemoved(
	_ directive.Instance,
	v directive.AttachedValue,
)

HandleValueRemoved is called when a value is removed from the directive.

Directories

Path Synopsis
api

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL