Documentation ¶
Index ¶
- Constants
- func AddVisualizer(name string, v Visualizer) error
- func DotGrgphVisualizer(w io.Writer, p Pipeliner) error
- func ListVisualizer() map[string]Visualizer
- func NewLoggerInjector(where string, injector inject.Injector) inject.Injector
- type Component
- type Config
- type ConstantDelaySchedule
- type ErrorGroup
- type MissingDependencyError
- type Pipeliner
- type Processor
- type Schedule
- type ScheduleParser
- type State
- type Stream
- func (f *Stream) Append(s *Stream) *Stream
- func (f *Stream) AppendByParentName(parentName string, s *Stream) error
- func (f *Stream) Delete(name string) error
- func (f *Stream) Get(name string) (*Stream, bool)
- func (f *Stream) InsertAfter(broName string, s *Stream) error
- func (f *Stream) InsertBefore(broName string, s *Stream) error
- func (f *Stream) Invoke(inj inject.Injector) (outVal reflect.Value, err error)
- func (f *Stream) Name() string
- func (s *Stream) Recover(f func())
- type StreamConfig
- type Visualizer
Constants ¶
View Source
const ( METRICS_KEY_PIPELINE_UPTIME = "_pipeline_uptime" METRICS_KEY_PIPELINE_STATE = "_pipeline_state" METRICS_KEY_PIPELINE_RUN_TIMES = "_pipeline_run_times" METRICS_KEY_PIPELINE_START_TIME = "_pipeline_start_time" METRICS_KEY_PIPELINE_EXIT_TIME = "_pipeline_exit_time" METRICS_KEY_PIPELINE_NEXT_RUN_TIME = "_pipeline_next_run_time" METRICS_KEY_PIPELINE_LAST_START_TIME = "_pipeline_last_start_time" METRICS_KEY_PIPELINE_LAST_END_TIME = "_pipeline_last_end_time" METRICS_KEY_STREAM_BUFFER_SIZE = "_stream_buffer_size" METRICS_KEY_STREAM_REPLICA = "_stream_replica" METRICS_KEY_STREAM_RUN_TIMES = "_stream_run_times" METRICS_KEY_STREAM_RUNNING_REPLICA = "_stream_running_replica" METRICS_KEY_STREAM_START_TIME = "_stream_start_time" METRICS_KEY_STREAM_EXIT_TIME = "_stream_exit_time" METRICS_KEY_STREAM_LAST_START_TIME = "_stream_last_start_time" METRICS_KEY_STREAM_LAST_END_TIME = "_stream_last_end_time" METRICS_KEY_STREAM_SUCCESS_COUNT = "_stream_success_count" METRICS_KEY_STREAM_ERROR_COUNT = "_stream_error_count" METRICS_KEY_STREAM_ELAPSED = "_stream_elapsed" METRICS_KEY_STREAM_BREAKER_OPEN = "_stream_breaker_open" METRICS_KEY_STREAM_ERROR = "_stream_error" )
Variables ¶
This section is empty.
Functions ¶
func AddVisualizer ¶
func AddVisualizer(name string, v Visualizer) error
func ListVisualizer ¶
func ListVisualizer() map[string]Visualizer
Types ¶
type Config ¶
type Config struct { Name string `yaml:"name"` Schedule string `yaml:"schedule"` // 调度计划,为空时死循环调度,可以传入cron表达式调度 CircuitBreakerSamples int64 `yaml:"circuit_breaker_samples"` // 熔断器采样数, 防止stream出现异常耗尽cpu资源 CircuitBreakerRate float64 `yaml:"circuit_breaker_rate"` // 熔断器采样率 Bootstrap bool `yaml:"bootstrap"` // 随进程启动而启动 Components []map[string]string `yaml:"components"` // key: name, value: rawConfig Processors []map[string]string `yaml:"processors"` // key: name, value: rawConfig Stream StreamConfig `yaml:"stream"` // key: name, value: StreamConfig }
func (Config) NewComponents ¶
func (Config) NewProcessors ¶
type ConstantDelaySchedule ¶
type ConstantDelaySchedule struct { }
type ErrorGroup ¶ added in v1.0.4
type ErrorGroup []error
func (ErrorGroup) Error ¶ added in v1.0.4
func (eg ErrorGroup) Error() error
type MissingDependencyError ¶
func (MissingDependencyError) Error ¶
func (e MissingDependencyError) Error() string
type Pipeliner ¶
type Pipeliner interface { Name() string Start() error Stop() State() State ListComponents() []Component ListProcessors() []Processor Monitor() monitor.Monitor GetConfig() Config SetConfig(config Config) error Visualize(w io.Writer, format string) error CheckDependence() []error Error() error }
func NewPipelineByConfig ¶
type ScheduleParser ¶
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
func NewStream ¶
func NewStream(conf StreamConfig, processors map[string]Processor) (*Stream, error)
func (*Stream) AppendByParentName ¶
type StreamConfig ¶
type StreamConfig struct { Name string `yaml:"name"` Childs []StreamConfig `yaml:"childs,omitempty"` Replica int `yaml:"replica,omitempty"` BufferSize int `yaml:"buffer_size,omitempty"` }
type Visualizer ¶
func DotVisualizer ¶
func DotVisualizer(format string) Visualizer
Click to show internal directories.
Click to hide internal directories.