Documentation ¶
Overview ¶
Package manager creates and manages multiple streams, providing an API for performing CRUD operations.
Index ¶
- Variables
- func LoadStreamConfigsFromDirectory(replaceEnvVars bool, dir string) (map[string]stream.Config, error)deprecated
- func LoadStreamConfigsFromPath(target, testSuffix string, streamMap map[string]stream.Config) ([]string, error)deprecated
- func OptAPIEnabled(b bool) func(*Type)
- type ConfigSet
- type StreamProcConstructorFunc
- type StreamStatus
- type Type
- func (m *Type) Create(id string, conf stream.Config) error
- func (m *Type) Delete(id string, timeout time.Duration) error
- func (m *Type) HandleResourceCRUD(w http.ResponseWriter, r *http.Request)
- func (m *Type) HandleStreamCRUD(w http.ResponseWriter, r *http.Request)
- func (m *Type) HandleStreamReady(w http.ResponseWriter, r *http.Request)
- func (m *Type) HandleStreamStats(w http.ResponseWriter, r *http.Request)
- func (m *Type) HandleStreamsCRUD(w http.ResponseWriter, r *http.Request)
- func (m *Type) Read(id string) (*StreamStatus, error)
- func (m *Type) Stop(timeout time.Duration) error
- func (m *Type) Update(id string, conf stream.Config, timeout time.Duration) error
Constants ¶
This section is empty.
Variables ¶
var ( ErrStreamExists = errors.New("stream already exists") ErrStreamDoesNotExist = errors.New("stream does not exist") )
Errors specifically returned by a stream manager.
Functions ¶
func LoadStreamConfigsFromDirectory
deprecated
func LoadStreamConfigsFromPath
deprecated
func LoadStreamConfigsFromPath(target, testSuffix string, streamMap map[string]stream.Config) ([]string, error)
LoadStreamConfigsFromPath reads a map of stream ids to configurations by either walking a directory of .json and .yaml files or by reading a file directly. Returns linting errors prefixed with their path.
Deprecated: The streams builder is using ./internal/config now.
func OptAPIEnabled ¶
OptAPIEnabled sets whether the stream manager registers API endpoints for CRUD operations on streams. This is enabled by default.
Types ¶
type ConfigSet ¶
ConfigSet is a map of stream configurations mapped by ID, which can be YAML parsed without losing default values inside the stream configs.
func (ConfigSet) UnmarshalYAML ¶
UnmarshalYAML ensures that when parsing configs that are in a map or slice the default values are still applied.
type StreamProcConstructorFunc ¶
StreamProcConstructorFunc is a closure type that constructs a processor type for new streams, where the id of the stream is provided as an argument.
type StreamStatus ¶
type StreamStatus struct {
// contains filtered or unexported fields
}
StreamStatus tracks a stream along with information regarding its internals.
func NewStreamStatus ¶
func NewStreamStatus( conf stream.Config, strm *stream.Type, logger log.Modular, stats *metrics.Local, ) *StreamStatus
NewStreamStatus creates a new StreamStatus.
func (*StreamStatus) Config ¶
func (s *StreamStatus) Config() stream.Config
Config returns the configuration of the stream.
func (*StreamStatus) IsReady ¶
func (s *StreamStatus) IsReady() bool
IsReady returns a boolean indicating whether the stream is connected at both the input and output level.
func (*StreamStatus) IsRunning ¶
func (s *StreamStatus) IsRunning() bool
IsRunning returns a boolean indicating whether the stream is currently running.
func (*StreamStatus) Metrics ¶
func (s *StreamStatus) Metrics() *metrics.Local
Metrics returns a metrics aggregator of the stream.
func (*StreamStatus) Uptime ¶
func (s *StreamStatus) Uptime() time.Duration
Uptime returns a time.Duration indicating the current uptime of the stream.
type Type ¶
type Type struct {
// contains filtered or unexported fields
}
Type manages a collection of streams, providing APIs for CRUD operations on the streams.
func New ¶
func New(mgr bundle.NewManagement, opts ...func(*Type)) *Type
New creates a new stream manager.Type.
func (*Type) Create ¶
Create attempts to construct and run a new stream under a unique ID. If the ID already exists an error is returned.
func (*Type) Delete ¶
Delete attempts to stop and remove a stream by its ID. Returns an error if the stream was not found, or if clean shutdown fails in the specified period of time.
func (*Type) HandleResourceCRUD ¶
func (m *Type) HandleResourceCRUD(w http.ResponseWriter, r *http.Request)
HandleResourceCRUD is an http.HandleFunc for performing CRUD operations on resource components.
func (*Type) HandleStreamCRUD ¶
func (m *Type) HandleStreamCRUD(w http.ResponseWriter, r *http.Request)
HandleStreamCRUD is an http.HandleFunc for performing CRUD operations on individual streams.
func (*Type) HandleStreamReady ¶
func (m *Type) HandleStreamReady(w http.ResponseWriter, r *http.Request)
HandleStreamReady is an http.HandleFunc for providing a ready check across all streams.
func (*Type) HandleStreamStats ¶
func (m *Type) HandleStreamStats(w http.ResponseWriter, r *http.Request)
HandleStreamStats is an http.HandleFunc for obtaining metrics for a stream.
func (*Type) HandleStreamsCRUD ¶
func (m *Type) HandleStreamsCRUD(w http.ResponseWriter, r *http.Request)
HandleStreamsCRUD is an http.HandleFunc for returning maps of active benthos streams by their id, status and uptime or overwriting the entire set of streams.
func (*Type) Read ¶
func (m *Type) Read(id string) (*StreamStatus, error)
Read attempts to obtain the status of a managed stream. Returns an error if the stream does not exist.