manager

package
v4.0.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2022 License: MIT Imports: 31 Imported by: 0

Documentation

Overview

Package manager creates and manages multiple streams, providing an API for performing CRUD operations.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStreamExists       = errors.New("stream already exists")
	ErrStreamDoesNotExist = errors.New("stream does not exist")
)

Errors specifically returned by a stream manager.

Functions

func LoadStreamConfigsFromDirectory deprecated

func LoadStreamConfigsFromDirectory(replaceEnvVars bool, dir string) (map[string]stream.Config, error)

LoadStreamConfigsFromDirectory reads a map of stream ids to configurations by walking a directory of .json and .yaml files.

Deprecated: The streams builder is using ./internal/config now.

func LoadStreamConfigsFromPath deprecated

func LoadStreamConfigsFromPath(target, testSuffix string, streamMap map[string]stream.Config) ([]string, error)

LoadStreamConfigsFromPath reads a map of stream ids to configurations by either walking a directory of .json and .yaml files or by reading a file directly. Returns linting errors prefixed with their path.

Deprecated: The streams builder is using ./internal/config now.

func OptAPIEnabled

func OptAPIEnabled(b bool) func(*Type)

OptAPIEnabled sets whether the stream manager registers API endpoints for CRUD operations on streams. This is enabled by default.

func OptSetAPITimeout

func OptSetAPITimeout(tout time.Duration) func(*Type)

OptSetAPITimeout sets the default timeout for HTTP API requests.

Types

type ConfigSet

type ConfigSet map[string]stream.Config

ConfigSet is a map of stream configurations mapped by ID, which can be YAML parsed without losing default values inside the stream configs.

func (ConfigSet) UnmarshalYAML

func (c ConfigSet) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.

type StreamProcConstructorFunc

type StreamProcConstructorFunc func(streamID string) (processor.V1, error)

StreamProcConstructorFunc is a closure type that constructs a processor type for new streams, where the id of the stream is provided as an argument.

type StreamStatus

type StreamStatus struct {
	// contains filtered or unexported fields
}

StreamStatus tracks a stream along with information regarding its internals.

func NewStreamStatus

func NewStreamStatus(
	conf stream.Config,
	strm *stream.Type,
	logger log.Modular,
	stats *metrics.Local,
) *StreamStatus

NewStreamStatus creates a new StreamStatus.

func (*StreamStatus) Config

func (s *StreamStatus) Config() stream.Config

Config returns the configuration of the stream.

func (*StreamStatus) IsReady

func (s *StreamStatus) IsReady() bool

IsReady returns a boolean indicating whether the stream is connected at both the input and output level.

func (*StreamStatus) IsRunning

func (s *StreamStatus) IsRunning() bool

IsRunning returns a boolean indicating whether the stream is currently running.

func (*StreamStatus) Metrics

func (s *StreamStatus) Metrics() *metrics.Local

Metrics returns a metrics aggregator of the stream.

func (*StreamStatus) Uptime

func (s *StreamStatus) Uptime() time.Duration

Uptime returns a time.Duration indicating the current uptime of the stream.

type Type

type Type struct {
	// contains filtered or unexported fields
}

Type manages a collection of streams, providing APIs for CRUD operations on the streams.

func New

func New(mgr bundle.NewManagement, opts ...func(*Type)) *Type

New creates a new stream manager.Type.

func (*Type) Create

func (m *Type) Create(id string, conf stream.Config) error

Create attempts to construct and run a new stream under a unique ID. If the ID already exists an error is returned.

func (*Type) Delete

func (m *Type) Delete(id string, timeout time.Duration) error

Delete attempts to stop and remove a stream by its ID. Returns an error if the stream was not found, or if clean shutdown fails in the specified period of time.

func (*Type) HandleResourceCRUD

func (m *Type) HandleResourceCRUD(w http.ResponseWriter, r *http.Request)

HandleResourceCRUD is an http.HandleFunc for performing CRUD operations on resource components.

func (*Type) HandleStreamCRUD

func (m *Type) HandleStreamCRUD(w http.ResponseWriter, r *http.Request)

HandleStreamCRUD is an http.HandleFunc for performing CRUD operations on individual streams.

func (*Type) HandleStreamReady

func (m *Type) HandleStreamReady(w http.ResponseWriter, r *http.Request)

HandleStreamReady is an http.HandleFunc for providing a ready check across all streams.

func (*Type) HandleStreamStats

func (m *Type) HandleStreamStats(w http.ResponseWriter, r *http.Request)

HandleStreamStats is an http.HandleFunc for obtaining metrics for a stream.

func (*Type) HandleStreamsCRUD

func (m *Type) HandleStreamsCRUD(w http.ResponseWriter, r *http.Request)

HandleStreamsCRUD is an http.HandleFunc for returning maps of active benthos streams by their id, status and uptime or overwriting the entire set of streams.

func (*Type) Read

func (m *Type) Read(id string) (*StreamStatus, error)

Read attempts to obtain the status of a managed stream. Returns an error if the stream does not exist.

func (*Type) Stop

func (m *Type) Stop(timeout time.Duration) error

Stop attempts to gracefully shut down all active streams and close the stream manager.

func (*Type) Update

func (m *Type) Update(id string, conf stream.Config, timeout time.Duration) error

Update attempts to stop an existing stream and replace it with a new version of the same stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL