manager

package
v0.32.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2018 License: MIT Imports: 25 Imported by: 2

Documentation

Overview

Package manager creates and manages multiple streams, providing an API for performing CRUD operations.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStreamExists       = errors.New("stream already exists")
	ErrStreamDoesNotExist = errors.New("stream does not exist")
)

Errors specifically returned by a stream manager.

Functions

func LoadStreamConfigsFromDirectory added in v0.13.0

func LoadStreamConfigsFromDirectory(replaceEnvVars bool, dir string) (map[string]stream.Config, error)

LoadStreamConfigsFromDirectory reads a map of stream ids to configurations by walking a directory of .json and .yaml files.

func OptAddInputPipelines added in v0.13.0

func OptAddInputPipelines(pipes ...StreamPipeConstructorFunc) func(*Type)

OptAddInputPipelines adds pipeline constructors that will be called for every new stream and attached to the input component. The constructor is given the name of the stream as an argument.

func OptAddOutputPipelines added in v0.13.0

func OptAddOutputPipelines(pipes ...StreamPipeConstructorFunc) func(*Type)

OptAddOutputPipelines adds pipeline constructors that will be called for every new stream and attached to the output component. The constructor is given the name of the stream as an argument.

func OptAddProcessors added in v0.13.0

func OptAddProcessors(procs ...StreamProcConstructorFunc) func(*Type)

OptAddProcessors adds processor constructors that will be called for every new stream and attached to the processor pipelines. The constructor is given the name of the stream as an argument.

func OptSetAPITimeout added in v0.12.0

func OptSetAPITimeout(tout time.Duration) func(*Type)

OptSetAPITimeout sets the default timeout for HTTP API requests.

func OptSetLogger

func OptSetLogger(log log.Modular) func(*Type)

OptSetLogger sets the logging output to be used by the manager and all child streams.

func OptSetManager

func OptSetManager(mgr types.Manager) func(*Type)

OptSetManager sets the service manager to be used by the stream manager and all child streams.

func OptSetStats

func OptSetStats(stats metrics.Type) func(*Type)

OptSetStats sets the metrics aggregator to be used by the manager and all child streams.

Types

type ConfigSet added in v0.13.0

type ConfigSet map[string]stream.Config

ConfigSet is a map of stream configurations mapped by ID, which can be YAML parsed without losing default values inside the stream configs.

func (ConfigSet) UnmarshalYAML added in v0.13.0

func (c ConfigSet) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.

type StreamPipeConstructorFunc added in v0.13.0

type StreamPipeConstructorFunc func(streamID string) (types.Pipeline, error)

StreamPipeConstructorFunc is a closure type that constructs a pipeline type for new streams, where the id of the stream is provided as an argument.

type StreamProcConstructorFunc added in v0.13.0

type StreamProcConstructorFunc func(streamID string) (types.Processor, error)

StreamProcConstructorFunc is a closure type that constructs a processor type for new streams, where the id of the stream is provided as an argument.

type StreamStatus

type StreamStatus struct {
	// contains filtered or unexported fields
}

StreamStatus tracks a stream along with information regarding its internals.

func NewStreamStatus added in v0.15.3

func NewStreamStatus(
	conf stream.Config,
	strm *stream.Type,
	logger log.Modular,
	stats *metrics.Local,
) *StreamStatus

NewStreamStatus creates a new StreamStatus.

func (*StreamStatus) Config

func (s *StreamStatus) Config() stream.Config

Config returns the configuration of the stream.

func (*StreamStatus) IsRunning added in v0.15.3

func (s *StreamStatus) IsRunning() bool

IsRunning returns a boolean indicating whether the stream is currently running.

func (*StreamStatus) Logger added in v0.15.3

func (s *StreamStatus) Logger() log.Modular

Logger returns the logger of the stream.

func (*StreamStatus) Metrics added in v0.15.3

func (s *StreamStatus) Metrics() *metrics.Local

Metrics returns a metrics aggregator of the stream.

func (*StreamStatus) Uptime

func (s *StreamStatus) Uptime() time.Duration

Uptime returns a time.Duration indicating the current uptime of the stream.

type Type

type Type struct {
	// contains filtered or unexported fields
}

Type manages a collection of streams, providing APIs for CRUD operations on the streams.

func New

func New(opts ...func(*Type)) *Type

New creates a new stream manager.Type.

func (*Type) Create

func (m *Type) Create(id string, conf stream.Config) error

Create attempts to construct and run a new stream under a unique ID. If the ID already exists an error is returned.

func (*Type) Delete

func (m *Type) Delete(id string, timeout time.Duration) error

Delete attempts to stop and remove a stream by its ID. Returns an error if the stream was not found, or if clean shutdown fails in the specified period of time.

func (*Type) HandleStreamCRUD added in v0.13.0

func (m *Type) HandleStreamCRUD(w http.ResponseWriter, r *http.Request)

HandleStreamCRUD is an http.HandleFunc for performing CRUD operations on individual streams.

func (*Type) HandleStreamStats added in v0.15.3

func (m *Type) HandleStreamStats(w http.ResponseWriter, r *http.Request)

HandleStreamStats is an http.HandleFunc for obtaining metrics for a stream.

func (*Type) HandleStreamsCRUD added in v0.13.0

func (m *Type) HandleStreamsCRUD(w http.ResponseWriter, r *http.Request)

HandleStreamsCRUD is an http.HandleFunc for returning maps of active benthos streams by their id, status and uptime or overwriting the entire set of streams.

func (*Type) Read

func (m *Type) Read(id string) (*StreamStatus, error)

Read attempts to obtain the status of a managed stream. Returns an error if the stream does not exist.

func (*Type) Stop added in v0.12.0

func (m *Type) Stop(timeout time.Duration) error

Stop attempts to gracefully shut down all active streams and close the stream manager.

func (*Type) Update

func (m *Type) Update(id string, conf stream.Config, timeout time.Duration) error

Update attempts to stop an existing stream and replace it with a new version of the same stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL