pipeliner

package
v1.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2020 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	METRICS_KEY_PIPELINE_UPTIME          = "_pipeline_uptime"
	METRICS_KEY_PIPELINE_STATE           = "_pipeline_state"
	METRICS_KEY_PIPELINE_RUN_TIMES       = "_pipeline_run_times"
	METRICS_KEY_PIPELINE_START_TIME      = "_pipeline_start_time"
	METRICS_KEY_PIPELINE_EXIT_TIME       = "_pipeline_exit_time"
	METRICS_KEY_PIPELINE_NEXT_RUN_TIME   = "_pipeline_next_run_time"
	METRICS_KEY_PIPELINE_LAST_START_TIME = "_pipeline_last_start_time"
	METRICS_KEY_PIPELINE_LAST_END_TIME   = "_pipeline_last_end_time"

	METRICS_KEY_STREAM_BUFFER_SIZE     = "_stream_buffer_size"
	METRICS_KEY_STREAM_REPLICA         = "_stream_replica"
	METRICS_KEY_STREAM_RUN_TIMES       = "_stream_run_times"
	METRICS_KEY_STREAM_RUNNING_REPLICA = "_stream_running_replica"
	METRICS_KEY_STREAM_START_TIME      = "_stream_start_time"
	METRICS_KEY_STREAM_EXIT_TIME       = "_stream_exit_time"
	METRICS_KEY_STREAM_LAST_START_TIME = "_stream_last_start_time"
	METRICS_KEY_STREAM_LAST_END_TIME   = "_stream_last_end_time"
	METRICS_KEY_STREAM_SUCCESS_COUNT   = "_stream_success_count"
	METRICS_KEY_STREAM_ERROR_COUNT     = "_stream_error_count"
	METRICS_KEY_STREAM_ELAPSED         = "_stream_elapsed"
	METRICS_KEY_STREAM_BREAKER_OPEN    = "_stream_breaker_open"
	METRICS_KEY_STREAM_ERROR           = "_stream_error"
)

Variables

This section is empty.

Functions

func AddVisualizer

func AddVisualizer(name string, v Visualizer) error

func DotGrgphVisualizer

func DotGrgphVisualizer(w io.Writer, p *pipeliner) error

func ListVisualizer

func ListVisualizer() map[string]Visualizer

func NewPipelineByConfig

func NewPipelineByConfig(conf Config) *pipeliner

Types

type Config

type Config struct {
	Name                  string              `yaml:"name"`
	Schedule              string              `yaml:"schedule"`                // 调度计划,为空时死循环调度,可以传入cron表达式调度
	CircuitBreakerSamples int64               `yaml:"circuit_breaker_samples"` // 熔断器采样数, 防止stream出现异常耗尽cpu资源
	CircuitBreakerRate    float64             `yaml:"circuit_breaker_rate"`    // 熔断器采样率
	Bootstrap             bool                `yaml:"bootstrap"`               // 随进程启动而启动
	Components            []map[string]string `yaml:"components"`              // key: name, value: rawConfig
	Processors            []map[string]string `yaml:"processors"`              // key: name, value: rawConfig
	Stream                StreamConfig        `yaml:"stream"`                  // key: name, value: StreamConfig
}

func (Config) NewComponents

func (c Config) NewComponents() ([]executor.Component, error)

func (Config) NewProcessors

func (c Config) NewProcessors() ([]executor.Processor, error)

type ConstantDelaySchedule

type ConstantDelaySchedule struct {
}

func (ConstantDelaySchedule) Next

func (schedule ConstantDelaySchedule) Next(t time.Time) time.Time

type ErrorGroup

type ErrorGroup []error

func (ErrorGroup) Error

func (eg ErrorGroup) Error() error

type MissingDependencyError

type MissingDependencyError struct {
	Field       string
	ReflectType string
	InjectName  string
}

func (MissingDependencyError) Error

func (e MissingDependencyError) Error() string

type PipelinerFactory

type PipelinerFactory struct {
}

func (*PipelinerFactory) Description

func (f *PipelinerFactory) Description() string

func (*PipelinerFactory) New

func (f *PipelinerFactory) New(config string) (executor.Executor, error)

func (*PipelinerFactory) SampleConfig

func (f *PipelinerFactory) SampleConfig() string

type Schedule

type Schedule interface {
	// Next returns the next activation time, later than the given time.
	// Next is invoked initially, and then each time the job is run.
	Next(time.Time) time.Time
}

type ScheduleParser

type ScheduleParser func(spec string) (Schedule, error)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream(conf StreamConfig, processors map[string]executor.Processor) (*Stream, error)

func (*Stream) Append

func (f *Stream) Append(s *Stream) *Stream

func (*Stream) AppendByParentName

func (f *Stream) AppendByParentName(parentName string, s *Stream) error

func (*Stream) Delete

func (f *Stream) Delete(name string) error

func (*Stream) Get

func (f *Stream) Get(name string) (*Stream, bool)

func (*Stream) InsertAfter

func (f *Stream) InsertAfter(broName string, s *Stream) error

func (*Stream) InsertBefore

func (f *Stream) InsertBefore(broName string, s *Stream) error

func (*Stream) Invoke

func (f *Stream) Invoke(inj inject.Injector) (outVal reflect.Value, err error)

func (*Stream) Name

func (f *Stream) Name() string

func (*Stream) Recover

func (s *Stream) Recover(f func())

type StreamConfig

type StreamConfig struct {
	Name       string         `yaml:"name"`
	Childs     []StreamConfig `yaml:"childs,omitempty"`
	Replica    int            `yaml:"replica,omitempty"`
	BufferSize int            `yaml:"buffer_size,omitempty"`
}

type Visualizer

type Visualizer func(w io.Writer, pipeline *pipeliner) error

func DotVisualizer

func DotVisualizer(format string) Visualizer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL