pipeline

package
v0.0.0-...-7b0021b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2024 License: Apache-2.0 Imports: 7 Imported by: 1

Documentation

Index

Constants

View Source
const (
	MetricInputType  = iota
	ServiceInputType = iota
	FilterType       = iota
	ProcessorType    = iota
	AggregatorType   = iota
)

logtail plugin type define

View Source
const MetricCounterPrefix = "counters"
View Source
const MetricGaugePrefix = "gauges"
View Source
const MetricLabelPrefix = "labels"
View Source
const SelfMetricNameKey = "__name__"

Variables

View Source
var Aggregators = map[string]AggregatorCreator{}
View Source
var Extensions = map[string]ExtensionCreator{}
View Source
var Flushers = map[string]FlusherCreator{}
View Source
var MetricInputs = map[string]MetricCreator{}
View Source
var Processors = map[string]ProcessorCreator{}
View Source
var ServiceInputs = map[string]ServiceCreator{}

Functions

func AddAggregatorCreator

func AddAggregatorCreator(name string, creator AggregatorCreator)

func AddExtensionCreator

func AddExtensionCreator(name string, creator ExtensionCreator)

func AddFlusherCreator

func AddFlusherCreator(name string, creator FlusherCreator)

func AddMetricCreator

func AddMetricCreator(name string, creator MetricCreator)

func AddProcessorCreator

func AddProcessorCreator(name string, creator ProcessorCreator)

func AddServiceCreator

func AddServiceCreator(name string, creator ServiceCreator)

Types

type Aggregator

type Aggregator interface {
	// Init called for init some system resources, like socket, mutex...
	// return flush interval(ms) and error flag, if interval is 0, use default interval
	Init(Context, LogGroupQueue) (int, error)

	// Description returns a one-sentence description on the Input.
	Description() string

	// Reset resets the aggregators caches and aggregates.
	Reset()
}

Aggregator is an interface for implementing an Aggregator plugin. the RunningAggregator wraps this interface and guarantees that

type AggregatorCreator

type AggregatorCreator func() Aggregator

type AggregatorV1

type AggregatorV1 interface {
	Aggregator
	// Add the metric to the aggregator.
	Add(log *protocol.Log, ctx map[string]interface{}) error

	// Flush pushes the current aggregates to the accumulator.
	Flush() []*protocol.LogGroup
}

AggregatorV1 Add, Flush, and Reset can not be called concurrently, so locking is not required when implementing an Aggregator plugin.

type AggregatorV2

type AggregatorV2 interface {
	Aggregator
	// Add the metric to the aggregator.
	Record(*models.PipelineGroupEvents, PipelineContext) error
	// GetResult the current aggregates to the accumulator.
	GetResult(PipelineContext) error
}

AggregatorV2 Apply, Push, and Reset can not be called concurrently, so locking is not required when implementing an Aggregator plugin.

type AsyncControl

type AsyncControl struct {
	// contains filtered or unexported fields
}

AsyncControl is an asynchronous execution control that can be canceled.

func NewAsyncControl

func NewAsyncControl() *AsyncControl

func (*AsyncControl) CancelToken

func (p *AsyncControl) CancelToken() <-chan struct{}

CancelToken returns a readonly channel that can be subscribed to as a cancel token

func (*AsyncControl) Notify

func (p *AsyncControl) Notify()

func (*AsyncControl) Reset

func (p *AsyncControl) Reset()

Reset cancel channal

func (*AsyncControl) Run

func (p *AsyncControl) Run(task func(*AsyncControl))

Run function as a Task

func (*AsyncControl) WaitCancel

func (p *AsyncControl) WaitCancel()

Waiting for executing task to be canceled

type Collector

type Collector interface {
	AddData(tags map[string]string,
		fields map[string]string,
		t ...time.Time)

	AddDataArray(tags map[string]string,
		columns []string,
		values []string,
		t ...time.Time)

	AddRawLog(log *protocol.Log)

	AddDataWithContext(tags map[string]string,
		fields map[string]string,
		ctx map[string]interface{},
		t ...time.Time)

	AddDataArrayWithContext(tags map[string]string,
		columns []string,
		values []string,
		ctx map[string]interface{},
		t ...time.Time)

	AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})
}

Collector ...

type CommonContext

type CommonContext struct {
	Project    string
	Logstore   string
	ConfigName string
}

type Context

type Context interface {
	InitContext(project, logstore, configName string)

	GetConfigName() string
	GetProject() string
	GetLogstore() string
	GetRuntimeContext() context.Context
	GetPipelineScopeConfig() *config.GlobalConfig
	GetExtension(name string, cfg any) (Extension, error)

	SaveCheckPoint(key string, value []byte) error
	GetCheckPoint(key string) (value []byte, exist bool)
	SaveCheckPointObject(key string, obj interface{}) error
	GetCheckPointObject(key string, obj interface{}) (exist bool)

	// APIs for self monitor
	RegisterMetricRecord(labels []LabelPair) *MetricsRecord
	GetMetricRecord() *MetricsRecord
	ExportMetricRecords() []map[string]string
	RegisterLogstoreConfigMetricRecord(labels []LabelPair) *MetricsRecord
	GetLogstoreConfigMetricRecord() *MetricsRecord
}

Context for plugin

type CounterMetric

type CounterMetric interface {
	Metric
	Add(int64)
	Collect() MetricValue[float64]
}

CounterMetric has three implementations: cumulativeCounter: a cumulative counter metric that represents a single monotonically increasing counter whose value can only increase or be reset to zero on restart. counter: the increased value in the last window. average: the cumulative average value.

type Extension

type Extension interface {
	// Description returns a one-sentence description on the Extension
	Description() string

	// Init called for init some system resources, like socket, mutex...
	Init(Context) error

	// Stop stops the services and release resources
	Stop() error
}

Extension ...

type ExtensionCreator

type ExtensionCreator func() Extension

type Flusher

type Flusher interface {
	// Init called for init some system resources, like socket, mutex...
	Init(Context) error

	// Description returns a one-sentence description on the Input.
	Description() string

	// IsReady checks if flusher is ready to accept more data.
	// @projectName, @logstoreName, @logstoreKey: meta of the corresponding data.
	// Note: If SetUrgent is called, please make some adjustment so that IsReady
	//   can return true to accept more data in time and config instance can be
	//   stopped gracefully.
	IsReady(projectName string, logstoreName string, logstoreKey int64) bool

	// SetUrgent indicates the flusher that it will be destroyed soon.
	// @flag indicates if main program (Logtail mostly) will exit after calling this.
	//
	// Note: there might be more data to flush after SetUrgent is called, and if flag
	//   is true, these data will be passed to flusher through IsReady/Export before
	//   program exits.
	//
	// Recommendation: set some state flags in this method to guide the behavior
	//   of other methods.
	SetUrgent(flag bool)

	// Stop stops flusher and release resources.
	// It is time for flusher to do cleaning jobs, includes:
	// 1. Export cached but not flushed data. For flushers that contain some kinds of
	//   aggregation or buffering, it is important to flush cached out now, otherwise
	//   data will lost.
	// 2. Release opened resources: goroutines, file handles, connections, etc.
	// 3. Maybe more, it depends.
	// In a word, flusher should only have things that can be recycled by GC after this.
	Stop() error
}

Flusher ... Sample flusher implementation: see plugin_manager/flusher_sls.gox.

type FlusherCreator

type FlusherCreator func() Flusher

type FlusherV1

type FlusherV1 interface {
	Flusher
	// Flush flushes data to destination, such as SLS, console, file, etc.
	// It is expected to return no error at most time because IsReady will be called
	// before it to make sure there is space for next data.
	Flush(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error
}

type FlusherV2

type FlusherV2 interface {
	Flusher
	// Export data to destination, such as gRPC, console, file, etc.
	// It is expected to return no error at most time because IsReady will be called
	// before it to make sure there is space for next data.
	Export([]*models.PipelineGroupEvents, PipelineContext) error
}

type GaugeMetric

type GaugeMetric interface {
	Metric
	Set(float64)
	Collect() MetricValue[float64]
}

type HistogramMetric

type HistogramMetric interface {
	Metric
	Observe(float64)
	Get() []MetricValue[float64]
}

HistogramMetric is used to compute distribution of a batch of data.

type Label

type Label struct {
	Key   string
	Value string
}

type LabelPair

type LabelPair = Label

type LatencyMetric

type LatencyMetric interface {
	Metric
	Observe(float64)
	Collect() MetricValue[float64]
}

type LogGroupQueue

type LogGroupQueue interface {
	// no blocking
	Add(loggroup *protocol.LogGroup) error
	AddWithWait(loggroup *protocol.LogGroup, duration time.Duration) error
}

LogGroupQueue for aggregator, Non blocked if aggregator's buffer is full, aggregator can add LogGroup to this queue return error if LogGroupQueue is full

type LogGroupWithContext

type LogGroupWithContext struct {
	LogGroup *protocol.LogGroup
	Context  map[string]interface{}
}

type LogWithContext

type LogWithContext struct {
	Log     *protocol.Log
	Context map[string]interface{}
}

type Metric

type Metric interface {
	// Export as a map[string]string
	Export() map[string]string
	Type() SelfMetricType
}

type MetricCollector

type MetricCollector interface {
	Collect() []Metric
}

MetricCollector is the interface implemented by anything that can be used by iLogtail to collect metrics.

type MetricCreator

type MetricCreator func() MetricInput

type MetricInput

type MetricInput interface {
	// Init called for init some system resources, like socket, mutex...
	// return call interval(ms) and error flag, if interval is 0, use default interval
	Init(Context) (int, error)

	// Description returns a one-sentence description on the Input
	Description() string
}

MetricInput ...

type MetricInputV1

type MetricInputV1 interface {
	MetricInput
	// Collect takes in an accumulator and adds the metrics that the Input
	// gathers. This is called every "interval"
	Collect(Collector) error
}

type MetricInputV2

type MetricInputV2 interface {
	MetricInput
	// Collect takes in an accumulator and adds the metrics that the Input
	// gathers. This is called every "interval"
	Read(PipelineContext) error
}

type MetricSet

type MetricSet interface {
	Name() string
	Type() SelfMetricType
	ConstLabels() []Label
	LabelKeys() []string
}

MetricSet is a Collector to bundle metrics of the same name that differ in their label values. A MetricSet has three properties: 1. Metric Name. 2. Constant Labels: Labels has constant value, e.g., "hostname=localhost", "namespace=default" 3. Label Keys is label Keys that may have different values, e.g., "status_code=200", "status_code=404".

type MetricValue

type MetricValue[T string | float64] struct {
	Name  string // Value Name, e.g. "cpu_usage"
	Value T
}

type MetricVector

type MetricVector[T Metric] interface {
	WithLabels(labels ...Label) T
}

MetricVector is a Collector to bundle metrics of the same name that differ in their label values. WithLabels will return a unique Metric that is bound to a set of label values. If the labels contain label names that are not in the MetricSet, the Metric will be invalid.

type MetricsRecord

type MetricsRecord struct {
	Context Context
	Labels  []LabelPair

	sync.RWMutex
	MetricCollectors []MetricCollector
}

func (*MetricsRecord) ExportMetricRecords

func (m *MetricsRecord) ExportMetricRecords() map[string]string

ExportMetricRecords is used for exporting metrics records. It will replace Serialize in the future.

func (*MetricsRecord) RegisterMetricCollector

func (m *MetricsRecord) RegisterMetricCollector(collector MetricCollector)

type PipelineCollector

type PipelineCollector interface {

	// Collect single group and events data belonging to this group
	Collect(groupInfo *models.GroupInfo, eventList ...models.PipelineEvent)

	// CollectList collect GroupEvents list that have been grouped
	CollectList(groupEventsList ...*models.PipelineGroupEvents)

	// ToArray returns an array containing all of the PipelineGroupEvents in this collector.
	ToArray() []*models.PipelineGroupEvents

	// Observe returns a chan that can consume PipelineGroupEvents from this collector.
	Observe() chan *models.PipelineGroupEvents

	Close()
}

PipelineCollector collect data in the plugin and send the data to the next operator

type PipelineContext

type PipelineContext interface {
	Collector() PipelineCollector
}

PipelineContext which may include collector interface、checkpoint interface、config read and many more..

type PluginContext

type PluginContext struct {
	ID           string
	Priority     int
	MetricRecord *MetricsRecord
}

type PluginMeta

type PluginMeta struct {
	PluginID         string
	PluginType       string
	PluginTypeWithID string
}

type Processor

type Processor interface {
	// Init called for init some system resources, like socket, mutex...
	Init(pipelineContext Context) error

	// Description returns a one-sentence description on the Input
	Description() string
}

Processor also can be a filter

type ProcessorCreator

type ProcessorCreator func() Processor

type ProcessorV1

type ProcessorV1 interface {
	Processor
	// ProcessLogs the filter to the given metric
	ProcessLogs(logArray []*protocol.Log) []*protocol.Log
}

type ProcessorV2

type ProcessorV2 interface {
	Processor
	Process(in *models.PipelineGroupEvents, context PipelineContext)
}

type SelfMetricType

type SelfMetricType int
const (
	CounterType           SelfMetricType // counter in the last window.
	CumulativeCounterType                // cumulative counter.
	AverageType                          // average value in the last window.
	MaxType                              // max value in the last window.
	LatencyType                          // average latency in the last window.
	StringType                           // string value.
	GaugeType                            // gauge value in the last window.

	/*
		Following Type are not used and not implemented yet.
	*/
	RateType // qps in the last window.
	SummaryType
	HistogramType
)

type ServiceCreator

type ServiceCreator func() ServiceInput

type ServiceInput

type ServiceInput interface {
	// Init called for init some system resources, like socket, mutex...
	// return interval(ms) and error flag, if interval is 0, use default interval
	Init(Context) (int, error)

	// Description returns a one-sentence description on the Input
	Description() string

	// Stop stops the services and closes any necessary channels and connections
	Stop() error
}

ServiceInput ...

type ServiceInputV1

type ServiceInputV1 interface {
	ServiceInput
	// Start starts the ServiceInput's service, whatever that may be
	Start(Collector) error
}

type ServiceInputV2

type ServiceInputV2 interface {
	ServiceInput
	// StartService starts the ServiceInput's service, whatever that may be
	StartService(PipelineContext) error
}

type StringMetric

type StringMetric interface {
	Metric
	Set(v string)
	Collect() MetricValue[string]
}

type SummaryMetric

type SummaryMetric interface {
	Metric
	Observe(float64)
	Get() []MetricValue[float64]
}

SummaryMetric is used to compute pctXX for a batch of data

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL