Documentation ¶
Overview ¶
Package manager creates and manages multiple streams, providing an API for performing CRUD operations.
Index ¶
- Variables
- func LoadStreamConfigsFromDirectory(replaceEnvVars bool, dir string) (map[string]stream.Config, error)
- func LoadStreamConfigsFromPath(target, testSuffix string, streamMap map[string]stream.Config) ([]string, error)
- func OptAddProcessors(procs ...StreamProcConstructorFunc) func(*Type)
- func OptSetAPITimeout(tout time.Duration) func(*Type)
- func OptSetLogger(log log.Modular) func(*Type)
- func OptSetManager(mgr types.Manager) func(*Type)
- func OptSetStats(stats metrics.Type) func(*Type)
- type ConfigSet
- type NamespacedManager
- func (n *NamespacedManager) GetCache(name string) (types.Cache, error)
- func (n *NamespacedManager) GetCondition(name string) (types.Condition, error)
- func (n *NamespacedManager) GetInput(name string) (types.Input, error)
- func (n *NamespacedManager) GetOutput(name string) (types.OutputWriter, error)
- func (n *NamespacedManager) GetPipe(name string) (<-chan types.Transaction, error)
- func (n *NamespacedManager) GetPlugin(name string) (interface{}, error)
- func (n *NamespacedManager) GetProcessor(name string) (types.Processor, error)
- func (n *NamespacedManager) GetRateLimit(name string) (types.RateLimit, error)
- func (n *NamespacedManager) GetUnderlying() types.Manager
- func (n *NamespacedManager) RegisterEndpoint(p, desc string, h http.HandlerFunc)
- func (n *NamespacedManager) SetPipe(name string, t <-chan types.Transaction)
- func (n *NamespacedManager) UnsetPipe(name string, t <-chan types.Transaction)
- type StreamProcConstructorFunc
- type StreamStatus
- type Type
- func (m *Type) Create(id string, conf stream.Config) error
- func (m *Type) Delete(id string, timeout time.Duration) error
- func (m *Type) HandleStreamCRUD(w http.ResponseWriter, r *http.Request)
- func (m *Type) HandleStreamReady(w http.ResponseWriter, r *http.Request)
- func (m *Type) HandleStreamStats(w http.ResponseWriter, r *http.Request)
- func (m *Type) HandleStreamsCRUD(w http.ResponseWriter, r *http.Request)
- func (m *Type) Read(id string) (*StreamStatus, error)
- func (m *Type) Stop(timeout time.Duration) error
- func (m *Type) Update(id string, conf stream.Config, timeout time.Duration) error
Constants ¶
This section is empty.
Variables ¶
var ( ErrStreamExists = errors.New("stream already exists") ErrStreamDoesNotExist = errors.New("stream does not exist") )
Errors specifically returned by a stream manager.
Functions ¶
func LoadStreamConfigsFromDirectory ¶
func LoadStreamConfigsFromDirectory(replaceEnvVars bool, dir string) (map[string]stream.Config, error)
LoadStreamConfigsFromDirectory reads a map of stream ids to configurations by walking a directory of .json and .yaml files.
func LoadStreamConfigsFromPath ¶ added in v3.12.0
func LoadStreamConfigsFromPath(target, testSuffix string, streamMap map[string]stream.Config) ([]string, error)
LoadStreamConfigsFromPath reads a map of stream ids to configurations by either walking a directory of .json and .yaml files or by reading a file directly. Returns linting errors prefixed with their path.
func OptAddProcessors ¶
func OptAddProcessors(procs ...StreamProcConstructorFunc) func(*Type)
OptAddProcessors adds processor constructors that will be called for every new stream and attached to the processor pipelines. The constructor is given the name of the stream as an argument.
func OptSetAPITimeout ¶
OptSetAPITimeout sets the default timeout for HTTP API requests.
func OptSetLogger ¶
OptSetLogger sets the logging output to be used by the manager and all child streams.
func OptSetManager ¶
OptSetManager sets the service manager to be used by the stream manager and all child streams.
func OptSetStats ¶
OptSetStats sets the metrics aggregator to be used by the manager and all child streams.
Types ¶
type ConfigSet ¶
ConfigSet is a map of stream configurations mapped by ID, which can be YAML parsed without losing default values inside the stream configs.
func (ConfigSet) UnmarshalYAML ¶
UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.
type NamespacedManager ¶
type NamespacedManager struct {
// contains filtered or unexported fields
}
NamespacedManager is a types.Manager implementation that wraps an underlying implementation with a namespace that prefixes registered endpoints, etc.
func (*NamespacedManager) GetCache ¶
func (n *NamespacedManager) GetCache(name string) (types.Cache, error)
GetCache attempts to find a service wide cache by its name.
func (*NamespacedManager) GetCondition ¶
func (n *NamespacedManager) GetCondition(name string) (types.Condition, error)
GetCondition attempts to find a service wide condition by its name.
func (*NamespacedManager) GetInput ¶ added in v3.16.0
func (n *NamespacedManager) GetInput(name string) (types.Input, error)
GetInput attempts to find a service wide input by its name.
func (*NamespacedManager) GetOutput ¶ added in v3.16.0
func (n *NamespacedManager) GetOutput(name string) (types.OutputWriter, error)
GetOutput attempts to find a service wide output by its name.
func (*NamespacedManager) GetPipe ¶
func (n *NamespacedManager) GetPipe(name string) (<-chan types.Transaction, error)
GetPipe returns a named pipe transaction channel.
func (*NamespacedManager) GetPlugin ¶
func (n *NamespacedManager) GetPlugin(name string) (interface{}, error)
GetPlugin attempts to find a service wide resource plugin by its name.
func (*NamespacedManager) GetProcessor ¶ added in v3.6.0
func (n *NamespacedManager) GetProcessor(name string) (types.Processor, error)
GetProcessor attempts to find a service wide processor by its name.
func (*NamespacedManager) GetRateLimit ¶
func (n *NamespacedManager) GetRateLimit(name string) (types.RateLimit, error)
GetRateLimit attempts to find a service wide rate limit by its name.
func (*NamespacedManager) GetUnderlying ¶
func (n *NamespacedManager) GetUnderlying() types.Manager
GetUnderlying returns the underlying types.Manager implementation.
func (*NamespacedManager) RegisterEndpoint ¶
func (n *NamespacedManager) RegisterEndpoint(p, desc string, h http.HandlerFunc)
RegisterEndpoint registers a server wide HTTP endpoint.
func (*NamespacedManager) SetPipe ¶
func (n *NamespacedManager) SetPipe(name string, t <-chan types.Transaction)
SetPipe sets a named pipe.
func (*NamespacedManager) UnsetPipe ¶
func (n *NamespacedManager) UnsetPipe(name string, t <-chan types.Transaction)
UnsetPipe unsets a named pipe.
type StreamProcConstructorFunc ¶
StreamProcConstructorFunc is a closure type that constructs a processor type for new streams, where the id of the stream is provided as an argument.
type StreamStatus ¶
type StreamStatus struct {
// contains filtered or unexported fields
}
StreamStatus tracks a stream along with information regarding its internals.
func NewStreamStatus ¶
func NewStreamStatus( conf stream.Config, strm *stream.Type, logger log.Modular, stats *metrics.Local, ) *StreamStatus
NewStreamStatus creates a new StreamStatus.
func (*StreamStatus) Config ¶
func (s *StreamStatus) Config() stream.Config
Config returns the configuration of the stream.
func (*StreamStatus) IsReady ¶ added in v3.24.0
func (s *StreamStatus) IsReady() bool
IsReady returns a boolean indicating whether the stream is connected at both the input and output level.
func (*StreamStatus) IsRunning ¶
func (s *StreamStatus) IsRunning() bool
IsRunning returns a boolean indicating whether the stream is currently running.
func (*StreamStatus) Logger ¶
func (s *StreamStatus) Logger() log.Modular
Logger returns the logger of the stream.
func (*StreamStatus) Metrics ¶
func (s *StreamStatus) Metrics() *metrics.Local
Metrics returns a metrics aggregator of the stream.
func (*StreamStatus) Uptime ¶
func (s *StreamStatus) Uptime() time.Duration
Uptime returns a time.Duration indicating the current uptime of the stream.
type Type ¶
type Type struct {
// contains filtered or unexported fields
}
Type manages a collection of streams, providing APIs for CRUD operations on the streams.
func (*Type) Create ¶
Create attempts to construct and run a new stream under a unique ID. If the ID already exists an error is returned.
func (*Type) Delete ¶
Delete attempts to stop and remove a stream by its ID. Returns an error if the stream was not found, or if clean shutdown fails in the specified period of time.
func (*Type) HandleStreamCRUD ¶
func (m *Type) HandleStreamCRUD(w http.ResponseWriter, r *http.Request)
HandleStreamCRUD is an http.HandleFunc for performing CRUD operations on individual streams.
func (*Type) HandleStreamReady ¶ added in v3.24.0
func (m *Type) HandleStreamReady(w http.ResponseWriter, r *http.Request)
HandleStreamReady is an http.HandleFunc for providing a ready check across all streams.
func (*Type) HandleStreamStats ¶
func (m *Type) HandleStreamStats(w http.ResponseWriter, r *http.Request)
HandleStreamStats is an http.HandleFunc for obtaining metrics for a stream.
func (*Type) HandleStreamsCRUD ¶
func (m *Type) HandleStreamsCRUD(w http.ResponseWriter, r *http.Request)
HandleStreamsCRUD is an http.HandleFunc for returning maps of active benthos streams by their id, status and uptime or overwriting the entire set of streams.
func (*Type) Read ¶
func (m *Type) Read(id string) (*StreamStatus, error)
Read attempts to obtain the status of a managed stream. Returns an error if the stream does not exist.