Documentation ¶
Index ¶
- Constants
- Variables
- func AddAggregatorCreator(name string, creator AggregatorCreator)
- func AddExtensionCreator(name string, creator ExtensionCreator)
- func AddFlusherCreator(name string, creator FlusherCreator)
- func AddMetricCreator(name string, creator MetricCreator)
- func AddProcessorCreator(name string, creator ProcessorCreator)
- func AddServiceCreator(name string, creator ServiceCreator)
- type Aggregator
- type AggregatorCreator
- type AggregatorV1
- type AggregatorV2
- type AsyncControl
- type Collector
- type CommonContext
- type Context
- type CounterMetric
- type Extension
- type ExtensionCreator
- type Flusher
- type FlusherCreator
- type FlusherV1
- type FlusherV2
- type GaugeMetric
- type HistogramMetric
- type Label
- type LabelPair
- type LatencyMetric
- type LogGroupQueue
- type LogGroupWithContext
- type LogWithContext
- type Metric
- type MetricCollector
- type MetricCreator
- type MetricInput
- type MetricInputV1
- type MetricInputV2
- type MetricSet
- type MetricValue
- type MetricVector
- type MetricsRecord
- type PipelineCollector
- type PipelineContext
- type PluginContext
- type PluginMeta
- type Processor
- type ProcessorCreator
- type ProcessorV1
- type ProcessorV2
- type SelfMetricType
- type ServiceCreator
- type ServiceInput
- type ServiceInputV1
- type ServiceInputV2
- type StringMetric
- type SummaryMetric
Constants ¶
const ( MetricInputType = iota ServiceInputType = iota FilterType = iota ProcessorType = iota AggregatorType = iota )
logtail plugin type define
const MetricCounterPrefix = "counters"
const MetricGaugePrefix = "gauges"
const MetricLabelPrefix = "labels"
const SelfMetricNameKey = "__name__"
Variables ¶
var Aggregators = map[string]AggregatorCreator{}
var Extensions = map[string]ExtensionCreator{}
var Flushers = map[string]FlusherCreator{}
var MetricInputs = map[string]MetricCreator{}
var Processors = map[string]ProcessorCreator{}
var ServiceInputs = map[string]ServiceCreator{}
Functions ¶
func AddAggregatorCreator ¶
func AddAggregatorCreator(name string, creator AggregatorCreator)
func AddExtensionCreator ¶
func AddExtensionCreator(name string, creator ExtensionCreator)
func AddFlusherCreator ¶
func AddFlusherCreator(name string, creator FlusherCreator)
func AddMetricCreator ¶
func AddMetricCreator(name string, creator MetricCreator)
func AddProcessorCreator ¶
func AddProcessorCreator(name string, creator ProcessorCreator)
func AddServiceCreator ¶
func AddServiceCreator(name string, creator ServiceCreator)
Types ¶
type Aggregator ¶
type Aggregator interface { // Init called for init some system resources, like socket, mutex... // return flush interval(ms) and error flag, if interval is 0, use default interval Init(Context, LogGroupQueue) (int, error) // Description returns a one-sentence description on the Input. Description() string // Reset resets the aggregators caches and aggregates. Reset() }
Aggregator is an interface for implementing an Aggregator plugin. the RunningAggregator wraps this interface and guarantees that
type AggregatorCreator ¶
type AggregatorCreator func() Aggregator
type AggregatorV1 ¶
type AggregatorV1 interface { Aggregator // Add the metric to the aggregator. Add(log *protocol.Log, ctx map[string]interface{}) error // Flush pushes the current aggregates to the accumulator. Flush() []*protocol.LogGroup }
AggregatorV1 Add, Flush, and Reset can not be called concurrently, so locking is not required when implementing an Aggregator plugin.
type AggregatorV2 ¶
type AggregatorV2 interface { Aggregator // Add the metric to the aggregator. Record(*models.PipelineGroupEvents, PipelineContext) error // GetResult the current aggregates to the accumulator. GetResult(PipelineContext) error }
AggregatorV2 Apply, Push, and Reset can not be called concurrently, so locking is not required when implementing an Aggregator plugin.
type AsyncControl ¶
type AsyncControl struct {
// contains filtered or unexported fields
}
AsyncControl is an asynchronous execution control that can be canceled.
func NewAsyncControl ¶
func NewAsyncControl() *AsyncControl
func (*AsyncControl) CancelToken ¶
func (p *AsyncControl) CancelToken() <-chan struct{}
CancelToken returns a readonly channel that can be subscribed to as a cancel token
func (*AsyncControl) Notify ¶
func (p *AsyncControl) Notify()
func (*AsyncControl) Run ¶
func (p *AsyncControl) Run(task func(*AsyncControl))
Run function as a Task
func (*AsyncControl) WaitCancel ¶
func (p *AsyncControl) WaitCancel()
Waiting for executing task to be canceled
type Collector ¶
type Collector interface { AddData(tags map[string]string, fields map[string]string, t ...time.Time) AddDataArray(tags map[string]string, columns []string, values []string, t ...time.Time) AddRawLog(log *protocol.Log) AddDataWithContext(tags map[string]string, fields map[string]string, ctx map[string]interface{}, t ...time.Time) AddDataArrayWithContext(tags map[string]string, columns []string, values []string, ctx map[string]interface{}, t ...time.Time) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{}) }
Collector ...
type CommonContext ¶
type Context ¶
type Context interface { InitContext(project, logstore, configName string) GetConfigName() string GetProject() string GetLogstore() string GetRuntimeContext() context.Context GetPipelineScopeConfig() *config.GlobalConfig GetExtension(name string, cfg any) (Extension, error) SaveCheckPoint(key string, value []byte) error GetCheckPoint(key string) (value []byte, exist bool) SaveCheckPointObject(key string, obj interface{}) error GetCheckPointObject(key string, obj interface{}) (exist bool) // APIs for self monitor RegisterMetricRecord(labels []LabelPair) *MetricsRecord GetMetricRecord() *MetricsRecord ExportMetricRecords() []map[string]string RegisterLogstoreConfigMetricRecord(labels []LabelPair) *MetricsRecord GetLogstoreConfigMetricRecord() *MetricsRecord }
Context for plugin
type CounterMetric ¶
type CounterMetric interface { Metric Add(int64) Collect() MetricValue[float64] }
CounterMetric has three implementations: cumulativeCounter: a cumulative counter metric that represents a single monotonically increasing counter whose value can only increase or be reset to zero on restart. counter: the increased value in the last window. average: the cumulative average value.
type Extension ¶
type Extension interface { // Description returns a one-sentence description on the Extension Description() string // Init called for init some system resources, like socket, mutex... Init(Context) error // Stop stops the services and release resources Stop() error }
Extension ...
type ExtensionCreator ¶
type ExtensionCreator func() Extension
type Flusher ¶
type Flusher interface { // Init called for init some system resources, like socket, mutex... Init(Context) error // Description returns a one-sentence description on the Input. Description() string // IsReady checks if flusher is ready to accept more data. // @projectName, @logstoreName, @logstoreKey: meta of the corresponding data. // Note: If SetUrgent is called, please make some adjustment so that IsReady // can return true to accept more data in time and config instance can be // stopped gracefully. IsReady(projectName string, logstoreName string, logstoreKey int64) bool // SetUrgent indicates the flusher that it will be destroyed soon. // @flag indicates if main program (Logtail mostly) will exit after calling this. // // Note: there might be more data to flush after SetUrgent is called, and if flag // is true, these data will be passed to flusher through IsReady/Export before // program exits. // // Recommendation: set some state flags in this method to guide the behavior // of other methods. SetUrgent(flag bool) // Stop stops flusher and release resources. // It is time for flusher to do cleaning jobs, includes: // 1. Export cached but not flushed data. For flushers that contain some kinds of // aggregation or buffering, it is important to flush cached out now, otherwise // data will lost. // 2. Release opened resources: goroutines, file handles, connections, etc. // 3. Maybe more, it depends. // In a word, flusher should only have things that can be recycled by GC after this. Stop() error }
Flusher ... Sample flusher implementation: see plugin_manager/flusher_sls.gox.
type FlusherCreator ¶
type FlusherCreator func() Flusher
type FlusherV1 ¶
type FlusherV1 interface { Flusher // Flush flushes data to destination, such as SLS, console, file, etc. // It is expected to return no error at most time because IsReady will be called // before it to make sure there is space for next data. Flush(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error }
type FlusherV2 ¶
type FlusherV2 interface { Flusher // Export data to destination, such as gRPC, console, file, etc. // It is expected to return no error at most time because IsReady will be called // before it to make sure there is space for next data. Export([]*models.PipelineGroupEvents, PipelineContext) error }
type GaugeMetric ¶
type GaugeMetric interface { Metric Set(float64) Collect() MetricValue[float64] }
type HistogramMetric ¶
type HistogramMetric interface { Metric Observe(float64) Get() []MetricValue[float64] }
HistogramMetric is used to compute distribution of a batch of data.
type LatencyMetric ¶
type LatencyMetric interface { Metric Observe(float64) Collect() MetricValue[float64] }
type LogGroupQueue ¶
type LogGroupQueue interface { // no blocking Add(loggroup *protocol.LogGroup) error AddWithWait(loggroup *protocol.LogGroup, duration time.Duration) error }
LogGroupQueue for aggregator, Non blocked if aggregator's buffer is full, aggregator can add LogGroup to this queue return error if LogGroupQueue is full
type LogGroupWithContext ¶
type LogWithContext ¶
type Metric ¶
type Metric interface { // Export as a map[string]string Export() map[string]string Type() SelfMetricType }
type MetricCollector ¶
type MetricCollector interface {
Collect() []Metric
}
MetricCollector is the interface implemented by anything that can be used by iLogtail to collect metrics.
type MetricCreator ¶
type MetricCreator func() MetricInput
type MetricInput ¶
type MetricInput interface { // Init called for init some system resources, like socket, mutex... // return call interval(ms) and error flag, if interval is 0, use default interval Init(Context) (int, error) // Description returns a one-sentence description on the Input Description() string }
MetricInput ...
type MetricInputV1 ¶
type MetricInputV1 interface { MetricInput // Collect takes in an accumulator and adds the metrics that the Input // gathers. This is called every "interval" Collect(Collector) error }
type MetricInputV2 ¶
type MetricInputV2 interface { MetricInput // Collect takes in an accumulator and adds the metrics that the Input // gathers. This is called every "interval" Read(PipelineContext) error }
type MetricSet ¶
type MetricSet interface { Name() string Type() SelfMetricType ConstLabels() []Label LabelKeys() []string }
MetricSet is a Collector to bundle metrics of the same name that differ in their label values. A MetricSet has three properties: 1. Metric Name. 2. Constant Labels: Labels has constant value, e.g., "hostname=localhost", "namespace=default" 3. Label Keys is label Keys that may have different values, e.g., "status_code=200", "status_code=404".
type MetricValue ¶
type MetricVector ¶
MetricVector is a Collector to bundle metrics of the same name that differ in their label values. WithLabels will return a unique Metric that is bound to a set of label values. If the labels contain label names that are not in the MetricSet, the Metric will be invalid.
type MetricsRecord ¶
type MetricsRecord struct { Context Context Labels []LabelPair sync.RWMutex MetricCollectors []MetricCollector }
func (*MetricsRecord) ExportMetricRecords ¶
func (m *MetricsRecord) ExportMetricRecords() map[string]string
ExportMetricRecords is used for exporting metrics records. It will replace Serialize in the future.
func (*MetricsRecord) RegisterMetricCollector ¶
func (m *MetricsRecord) RegisterMetricCollector(collector MetricCollector)
type PipelineCollector ¶
type PipelineCollector interface { // Collect single group and events data belonging to this group Collect(groupInfo *models.GroupInfo, eventList ...models.PipelineEvent) // CollectList collect GroupEvents list that have been grouped CollectList(groupEventsList ...*models.PipelineGroupEvents) // ToArray returns an array containing all of the PipelineGroupEvents in this collector. ToArray() []*models.PipelineGroupEvents // Observe returns a chan that can consume PipelineGroupEvents from this collector. Observe() chan *models.PipelineGroupEvents Close() }
PipelineCollector collect data in the plugin and send the data to the next operator
type PipelineContext ¶
type PipelineContext interface {
Collector() PipelineCollector
}
PipelineContext which may include collector interface、checkpoint interface、config read and many more..
type PluginContext ¶
type PluginContext struct { ID string Priority int MetricRecord *MetricsRecord }
type PluginMeta ¶
type Processor ¶
type Processor interface { // Init called for init some system resources, like socket, mutex... Init(pipelineContext Context) error // Description returns a one-sentence description on the Input Description() string }
Processor also can be a filter
type ProcessorCreator ¶
type ProcessorCreator func() Processor
type ProcessorV1 ¶
type ProcessorV2 ¶
type ProcessorV2 interface { Processor Process(in *models.PipelineGroupEvents, context PipelineContext) }
type SelfMetricType ¶
type SelfMetricType int
const ( CounterType SelfMetricType // counter in the last window. CumulativeCounterType // cumulative counter. AverageType // average value in the last window. MaxType // max value in the last window. LatencyType // average latency in the last window. StringType // string value. GaugeType // gauge value in the last window. /* Following Type are not used and not implemented yet. */ RateType // qps in the last window. SummaryType HistogramType )
type ServiceCreator ¶
type ServiceCreator func() ServiceInput
type ServiceInput ¶
type ServiceInput interface { // Init called for init some system resources, like socket, mutex... // return interval(ms) and error flag, if interval is 0, use default interval Init(Context) (int, error) // Description returns a one-sentence description on the Input Description() string // Stop stops the services and closes any necessary channels and connections Stop() error }
ServiceInput ...
type ServiceInputV1 ¶
type ServiceInputV1 interface { ServiceInput // Start starts the ServiceInput's service, whatever that may be Start(Collector) error }
type ServiceInputV2 ¶
type ServiceInputV2 interface { ServiceInput // StartService starts the ServiceInput's service, whatever that may be StartService(PipelineContext) error }
type StringMetric ¶
type StringMetric interface { Metric Set(v string) Collect() MetricValue[string] }
type SummaryMetric ¶
type SummaryMetric interface { Metric Observe(float64) Get() []MetricValue[float64] }
SummaryMetric is used to compute pctXX for a batch of data