Documentation ¶
Index ¶
- Variables
- func CollectConfigResult(logGroup *protocol.LogGroup)
- func CollectContainers(logGroup *protocol.LogGroup)
- func FindPort(res http.ResponseWriter, req *http.Request)
- func GetConfigFlushers(runner PluginRunner) []pipeline.Flusher
- func GetFlushCancelToken(runner PluginRunner) <-chan struct{}
- func GetFlushStoreLen(runner PluginRunner) int
- func GetMetrics() []map[string]string
- func GetPluginPriority(pluginName string) int
- func HoldOn(exitFlag bool) error
- func Init() (err error)
- func LoadLogstoreConfig(project string, logstore string, configName string, logstoreKey int64, ...) error
- func Resume() error
- type AggregatorWrapperV1
- func (p *AggregatorWrapperV1) Add(loggroup *protocol.LogGroup) error
- func (p *AggregatorWrapperV1) AddWithWait(loggroup *protocol.LogGroup, duration time.Duration) error
- func (p *AggregatorWrapperV1) Init(pluginMeta *pipeline.PluginMeta) error
- func (p *AggregatorWrapperV1) Run(control *pipeline.AsyncControl)
- type AggregatorWrapperV2
- func (p *AggregatorWrapperV2) Add(loggroup *protocol.LogGroup) error
- func (p *AggregatorWrapperV2) AddWithWait(loggroup *protocol.LogGroup, duration time.Duration) error
- func (p *AggregatorWrapperV2) GetResult(context pipeline.PipelineContext) error
- func (p *AggregatorWrapperV2) Init(pluginMeta *pipeline.PluginMeta) error
- func (p *AggregatorWrapperV2) Record(events *models.PipelineGroupEvents, context pipeline.PipelineContext) error
- type AlwaysOnlineManager
- func (aom *AlwaysOnlineManager) AddCachedConfig(config *LogstoreConfig, timeout time.Duration)
- func (aom *AlwaysOnlineManager) GetCachedConfig(configName string) (config *LogstoreConfig, ok bool)
- func (aom *AlwaysOnlineManager) GetDeletedConfigs(existConfigs map[string]*LogstoreConfig) map[string]*LogstoreConfig
- type CommonWrapper
- type ConfigVersion
- type ContextImp
- func (p *ContextImp) AddPlugin(name string)
- func (p *ContextImp) ExportMetricRecords() (results []map[string]string)
- func (p *ContextImp) GetCheckPoint(key string) (value []byte, exist bool)
- func (p *ContextImp) GetCheckPointObject(key string, obj interface{}) (exist bool)
- func (p *ContextImp) GetConfigName() string
- func (p *ContextImp) GetExtension(name string, cfg any) (pipeline.Extension, error)
- func (p *ContextImp) GetLogstore() string
- func (p *ContextImp) GetLogstoreConfigMetricRecord() *pipeline.MetricsRecord
- func (p *ContextImp) GetMetricRecord() *pipeline.MetricsRecord
- func (p *ContextImp) GetProject() string
- func (p *ContextImp) GetRuntimeContext() context.Context
- func (p *ContextImp) InitContext(project, logstore, configName string)
- func (p *ContextImp) RegisterCounterMetric(metricsRecord *pipeline.MetricsRecord, metric pipeline.CounterMetric)
- func (p *ContextImp) RegisterLatencyMetric(metricsRecord *pipeline.MetricsRecord, metric pipeline.LatencyMetric)
- func (p *ContextImp) RegisterLogstoreConfigMetricRecord(labels []pipeline.LabelPair) *pipeline.MetricsRecord
- func (p *ContextImp) RegisterMetricRecord(labels []pipeline.LabelPair) *pipeline.MetricsRecord
- func (p *ContextImp) RegisterStringMetric(metricsRecord *pipeline.MetricsRecord, metric pipeline.StringMetric)
- func (p *ContextImp) SaveCheckPoint(key string, value []byte) error
- func (p *ContextImp) SaveCheckPointObject(key string, obj interface{}) error
- type FlushData
- type FlushOutStore
- type FlusherWrapper
- type FlusherWrapperV1
- type FlusherWrapperV2
- type InputAlarm
- type InputContainer
- type InputStatistics
- type LogstoreConfig
- func (lc *LogstoreConfig) ProcessLog(logByte []byte, packID string, topic string, tags []byte) int
- func (lc *LogstoreConfig) ProcessLogGroup(logByte []byte, packID string) int
- func (lc *LogstoreConfig) ProcessRawLog(rawLog []byte, packID string, topic string) int
- func (lc *LogstoreConfig) ProcessRawLogV2(rawLog []byte, packID string, topic string, tags []byte) int
- func (lc *LogstoreConfig) Start()
- func (lc *LogstoreConfig) Stop(exitFlag bool) error
- type LogstoreStatistics
- type MetricWrapperV1
- func (p *MetricWrapperV1) AddData(tags map[string]string, fields map[string]string, t ...time.Time)
- func (p *MetricWrapperV1) AddDataArray(tags map[string]string, columns []string, values []string, t ...time.Time)
- func (p *MetricWrapperV1) AddDataArrayWithContext(tags map[string]string, columns []string, values []string, ...)
- func (p *MetricWrapperV1) AddDataWithContext(tags map[string]string, fields map[string]string, ctx map[string]interface{}, ...)
- func (p *MetricWrapperV1) AddRawLog(log *protocol.Log)
- func (p *MetricWrapperV1) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})
- func (p *MetricWrapperV1) Init(pluginMeta *pipeline.PluginMeta, inputInterval int) error
- func (p *MetricWrapperV1) Run(control *pipeline.AsyncControl)
- type MetricWrapperV2
- type PluginRunner
- type ProcessorWrapper
- type ProcessorWrapperV1
- type ProcessorWrapperV2
- type ServiceWrapperV1
- func (p *ServiceWrapperV1) AddData(tags map[string]string, fields map[string]string, t ...time.Time)
- func (p *ServiceWrapperV1) AddDataArray(tags map[string]string, columns []string, values []string, t ...time.Time)
- func (p *ServiceWrapperV1) AddDataArrayWithContext(tags map[string]string, columns []string, values []string, ...)
- func (p *ServiceWrapperV1) AddDataWithContext(tags map[string]string, fields map[string]string, ctx map[string]interface{}, ...)
- func (p *ServiceWrapperV1) AddRawLog(log *protocol.Log)
- func (p *ServiceWrapperV1) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})
- func (p *ServiceWrapperV1) Init(pluginMeta *pipeline.PluginMeta) error
- func (p *ServiceWrapperV1) Run(cc *pipeline.AsyncControl)
- func (p *ServiceWrapperV1) Stop() error
- type ServiceWrapperV2
Constants ¶
This section is empty.
Variables ¶
var CheckPointCleanInterval = flag.Int("CheckPointCleanInterval", 600, "checkpoint clean interval, second")
var CheckPointFile = flag.String("CheckPointFile", "checkpoint", "checkpoint file name, base dir(binary dir)")
var CheckPointManager checkPointManager
var DisabledLogtailConfig = make(map[string]*LogstoreConfig)
var DisabledLogtailConfigLock sync.Mutex
Configs that were disabled because of slow or hang config.
var ErrCheckPointNotInit = errors.New("checkpoint db not init")
var FetchAllInterval = time.Second * time.Duration(12*60*60)
12h
var LastLogtailConfig map[string]*LogstoreConfig
var LogtailConfig map[string]*LogstoreConfig
Following variables are exported so that tests of main package can reference them.
var MaxCleanItemPerInterval = flag.Int("MaxCleanItemPerInterval", 1000, "max clean items per interval")
Functions ¶
func CollectConfigResult ¶ added in v1.4.0
func CollectContainers ¶ added in v1.4.0
func GetConfigFlushers ¶ added in v1.8.0
func GetConfigFlushers(runner PluginRunner) []pipeline.Flusher
func GetFlushCancelToken ¶ added in v1.4.0
func GetFlushCancelToken(runner PluginRunner) <-chan struct{}
func GetFlushStoreLen ¶ added in v1.4.0
func GetFlushStoreLen(runner PluginRunner) int
func GetMetrics ¶ added in v1.8.8
func GetPluginPriority ¶
func HoldOn ¶
HoldOn stops all config instance and checkpoint manager so that it is ready to load new configs or quit. For user-defined config, timeoutStop is used to avoid hanging.
func LoadLogstoreConfig ¶
Types ¶
type AggregatorWrapperV1 ¶ added in v1.8.8
type AggregatorWrapperV1 struct { pipeline.PluginContext Aggregator pipeline.AggregatorV1 Config *LogstoreConfig LogGroupsChan chan *protocol.LogGroup Interval time.Duration // contains filtered or unexported fields }
AggregatorWrapperV1 wrappers Aggregator. It implements LogGroupQueue interface, and is passed to associated Aggregator. Aggregator uses Add function to pass log groups to wrapper, and then wrapper passes log groups to associated LogstoreConfig through channel LogGroupsChan. In fact, LogGroupsChan == (associated) LogstoreConfig.LogGroupsChan.
func (*AggregatorWrapperV1) Add ¶ added in v1.8.8
func (p *AggregatorWrapperV1) Add(loggroup *protocol.LogGroup) error
Add inserts @loggroup to LogGroupsChan if @loggroup is not empty. It is called by associated Aggregator. It returns errAggAdd when queue is full.
func (*AggregatorWrapperV1) AddWithWait ¶ added in v1.8.8
func (p *AggregatorWrapperV1) AddWithWait(loggroup *protocol.LogGroup, duration time.Duration) error
AddWithWait inserts @loggroup to LogGroupsChan, and it waits at most @duration. It works like Add but adds a timeout policy when log group queue is full. It returns errAggAdd when queue is full and timeout. NOTE: no body calls it now.
func (*AggregatorWrapperV1) Init ¶ added in v1.8.8
func (p *AggregatorWrapperV1) Init(pluginMeta *pipeline.PluginMeta) error
func (*AggregatorWrapperV1) Run ¶ added in v1.8.8
func (p *AggregatorWrapperV1) Run(control *pipeline.AsyncControl)
Run calls periodically Aggregator.Flush to get log groups from associated aggregator and pass them to LogstoreConfig through LogGroupsChan.
type AggregatorWrapperV2 ¶ added in v1.8.8
type AggregatorWrapperV2 struct { pipeline.PluginContext Aggregator pipeline.AggregatorV2 Config *LogstoreConfig LogGroupsChan chan *protocol.LogGroup Interval time.Duration // contains filtered or unexported fields }
AggregatorWrapperV2 wrappers Aggregator. It implements LogGroupQueue interface, and is passed to associated Aggregator. Aggregator uses Add function to pass log groups to wrapper, and then wrapper passes log groups to associated LogstoreConfig through channel LogGroupsChan. In fact, LogGroupsChan == (associated) LogstoreConfig.LogGroupsChan.
func (*AggregatorWrapperV2) Add ¶ added in v1.8.8
func (p *AggregatorWrapperV2) Add(loggroup *protocol.LogGroup) error
func (*AggregatorWrapperV2) AddWithWait ¶ added in v1.8.8
func (*AggregatorWrapperV2) GetResult ¶ added in v1.8.8
func (p *AggregatorWrapperV2) GetResult(context pipeline.PipelineContext) error
func (*AggregatorWrapperV2) Init ¶ added in v1.8.8
func (p *AggregatorWrapperV2) Init(pluginMeta *pipeline.PluginMeta) error
func (*AggregatorWrapperV2) Record ¶ added in v1.8.8
func (p *AggregatorWrapperV2) Record(events *models.PipelineGroupEvents, context pipeline.PipelineContext) error
type AlwaysOnlineManager ¶
type AlwaysOnlineManager struct {
// contains filtered or unexported fields
}
AlwaysOnlineManager is used to manage the plugins that do not want to stop when config reloading
func GetAlwaysOnlineManager ¶
func GetAlwaysOnlineManager() *AlwaysOnlineManager
GetAlwaysOnlineManager get a AlwaysOnlineManager instance
func (*AlwaysOnlineManager) AddCachedConfig ¶
func (aom *AlwaysOnlineManager) AddCachedConfig(config *LogstoreConfig, timeout time.Duration)
AddCachedConfig add cached config into manager, manager will stop and delete this config when timeout
func (*AlwaysOnlineManager) GetCachedConfig ¶
func (aom *AlwaysOnlineManager) GetCachedConfig(configName string) (config *LogstoreConfig, ok bool)
GetCachedConfig get cached config from manager and delete this item, so manager will not close this config
func (*AlwaysOnlineManager) GetDeletedConfigs ¶
func (aom *AlwaysOnlineManager) GetDeletedConfigs( existConfigs map[string]*LogstoreConfig) map[string]*LogstoreConfig
GetDeletedConfigs returns cached configs not in @existConfigs.
type CommonWrapper ¶ added in v1.8.8
type CommonWrapper struct { pipeline.PluginContext Config *LogstoreConfig LogGroupsChan chan *protocol.LogGroup Interval time.Duration // contains filtered or unexported fields }
type ConfigVersion ¶ added in v1.4.0
type ConfigVersion string
type ContextImp ¶
type ContextImp struct { MetricsRecords []*pipeline.MetricsRecord // contains filtered or unexported fields }
func (*ContextImp) AddPlugin ¶
func (p *ContextImp) AddPlugin(name string)
func (*ContextImp) ExportMetricRecords ¶ added in v1.8.8
func (p *ContextImp) ExportMetricRecords() (results []map[string]string)
func (*ContextImp) GetCheckPoint ¶
func (p *ContextImp) GetCheckPoint(key string) (value []byte, exist bool)
func (*ContextImp) GetCheckPointObject ¶
func (p *ContextImp) GetCheckPointObject(key string, obj interface{}) (exist bool)
func (*ContextImp) GetConfigName ¶
func (p *ContextImp) GetConfigName() string
func (*ContextImp) GetExtension ¶ added in v1.5.0
func (*ContextImp) GetLogstore ¶
func (p *ContextImp) GetLogstore() string
func (*ContextImp) GetLogstoreConfigMetricRecord ¶ added in v1.8.8
func (p *ContextImp) GetLogstoreConfigMetricRecord() *pipeline.MetricsRecord
func (*ContextImp) GetMetricRecord ¶ added in v1.8.8
func (p *ContextImp) GetMetricRecord() *pipeline.MetricsRecord
func (*ContextImp) GetProject ¶
func (p *ContextImp) GetProject() string
func (*ContextImp) GetRuntimeContext ¶
func (p *ContextImp) GetRuntimeContext() context.Context
func (*ContextImp) InitContext ¶
func (p *ContextImp) InitContext(project, logstore, configName string)
func (*ContextImp) RegisterCounterMetric ¶
func (p *ContextImp) RegisterCounterMetric(metricsRecord *pipeline.MetricsRecord, metric pipeline.CounterMetric)
func (*ContextImp) RegisterLatencyMetric ¶
func (p *ContextImp) RegisterLatencyMetric(metricsRecord *pipeline.MetricsRecord, metric pipeline.LatencyMetric)
func (*ContextImp) RegisterLogstoreConfigMetricRecord ¶ added in v1.8.8
func (p *ContextImp) RegisterLogstoreConfigMetricRecord(labels []pipeline.LabelPair) *pipeline.MetricsRecord
func (*ContextImp) RegisterMetricRecord ¶ added in v1.8.8
func (p *ContextImp) RegisterMetricRecord(labels []pipeline.LabelPair) *pipeline.MetricsRecord
func (*ContextImp) RegisterStringMetric ¶
func (p *ContextImp) RegisterStringMetric(metricsRecord *pipeline.MetricsRecord, metric pipeline.StringMetric)
func (*ContextImp) SaveCheckPoint ¶
func (p *ContextImp) SaveCheckPoint(key string, value []byte) error
func (*ContextImp) SaveCheckPointObject ¶
func (p *ContextImp) SaveCheckPointObject(key string, obj interface{}) error
type FlushData ¶ added in v1.4.0
type FlushData interface { protocol.LogGroup | models.PipelineGroupEvents }
type FlushOutStore ¶ added in v1.4.0
type FlushOutStore[T FlushData] struct { // contains filtered or unexported fields }
func NewFlushOutStore ¶ added in v1.4.0
func NewFlushOutStore[T FlushData]() *FlushOutStore[T]
func (*FlushOutStore[T]) Add ¶ added in v1.4.0
func (s *FlushOutStore[T]) Add(data ...*T)
func (*FlushOutStore[T]) Get ¶ added in v1.4.0
func (s *FlushOutStore[T]) Get() []*T
func (*FlushOutStore[T]) Len ¶ added in v1.4.0
func (s *FlushOutStore[T]) Len() int
func (*FlushOutStore[T]) Merge ¶ added in v1.4.0
func (s *FlushOutStore[T]) Merge(in *FlushOutStore[T])
func (*FlushOutStore[T]) Reset ¶ added in v1.4.0
func (s *FlushOutStore[T]) Reset()
func (*FlushOutStore[T]) Write ¶ added in v1.4.0
func (s *FlushOutStore[T]) Write(ch chan *T)
type FlusherWrapper ¶
type FlusherWrapperV1 ¶ added in v1.8.8
type FlusherWrapperV1 struct { CommonWrapper Flusher pipeline.FlusherV1 }
func (*FlusherWrapperV1) Init ¶ added in v1.8.8
func (wrapper *FlusherWrapperV1) Init(pluginMeta *pipeline.PluginMeta) error
type FlusherWrapperV2 ¶ added in v1.8.8
type FlusherWrapperV2 struct { CommonWrapper Flusher pipeline.FlusherV2 }
func (*FlusherWrapperV2) Export ¶ added in v1.8.8
func (wrapper *FlusherWrapperV2) Export(pipelineGroupEvents []*models.PipelineGroupEvents, pipelineContext pipeline.PipelineContext) error
func (*FlusherWrapperV2) Init ¶ added in v1.8.8
func (wrapper *FlusherWrapperV2) Init(pluginMeta *pipeline.PluginMeta) error
type InputAlarm ¶
type InputAlarm struct {
// contains filtered or unexported fields
}
func (*InputAlarm) Description ¶
func (r *InputAlarm) Description() string
type InputContainer ¶ added in v1.4.0
type InputContainer struct {
// contains filtered or unexported fields
}
func (*InputContainer) Collect ¶ added in v1.4.0
func (r *InputContainer) Collect(collector pipeline.Collector) error
func (*InputContainer) Description ¶ added in v1.4.0
func (r *InputContainer) Description() string
type InputStatistics ¶
type InputStatistics struct {
// contains filtered or unexported fields
}
func (*InputStatistics) Collect ¶
func (r *InputStatistics) Collect(collector pipeline.Collector) error
func (*InputStatistics) Description ¶
func (r *InputStatistics) Description() string
type LogstoreConfig ¶
type LogstoreConfig struct { // common fields ProjectName string LogstoreName string ConfigName string LogstoreKey int64 FlushOutFlag bool // Each LogstoreConfig can have its independent GlobalConfig if the "global" field // is offered in configuration, see build-in StatisticsConfig and AlarmConfig. GlobalConfig *config.GlobalConfig Version ConfigVersion Context pipeline.Context Statistics LogstoreStatistics PluginRunner PluginRunner K8sLabelSet map[string]struct{} ContainerLabelSet map[string]struct{} EnvSet map[string]struct{} CollectContainersFlag bool // contains filtered or unexported fields }
var AlarmConfig *LogstoreConfig
var ContainerConfig *LogstoreConfig
var StatisticsConfig *LogstoreConfig
Two built-in logtail configs to report statistics and alarm (from system and other logtail configs).
func (*LogstoreConfig) ProcessLog ¶ added in v1.1.1
func (*LogstoreConfig) ProcessLogGroup ¶ added in v1.8.0
func (lc *LogstoreConfig) ProcessLogGroup(logByte []byte, packID string) int
func (*LogstoreConfig) ProcessRawLog ¶
func (lc *LogstoreConfig) ProcessRawLog(rawLog []byte, packID string, topic string) int
func (*LogstoreConfig) ProcessRawLogV2 ¶
func (lc *LogstoreConfig) ProcessRawLogV2(rawLog []byte, packID string, topic string, tags []byte) int
ProcessRawLogV2 ... V1 -> V2: enable topic field, and use tags field to pass more tags. unsafe parameter: rawLog,packID and tags safe parameter: topic
func (*LogstoreConfig) Start ¶
func (lc *LogstoreConfig) Start()
Start initializes plugin instances in config and starts them. Procedures:
- Start flusher goroutine and push FlushOutLogGroups inherited from last config instance to LogGroupsChan, so that they can be flushed to flushers.
- Start aggregators, allocate new goroutine for each one.
- Start processor goroutine to process logs from LogsChan.
- Start inputs (including metrics and services), just like aggregator, each input has its own goroutine.
func (*LogstoreConfig) Stop ¶
func (lc *LogstoreConfig) Stop(exitFlag bool) error
Stop stops plugin instances and corresponding goroutines of config. @exitFlag passed from Logtail, indicates that if Logtail will quit after this. Procedures: 1. SetUrgent to all flushers to indicate them current state. 2. Stop all input plugins, stop generating logs. 3. Stop processor goroutine, pass all existing logs to aggregator. 4. Stop all aggregator plugins, make all logs to LogGroups. 5. Set stopping flag, stop flusher goroutine. 6. If Logtail is exiting and there are remaining data, try to flush once. 7. Stop flusher plugins.
type LogstoreStatistics ¶
type LogstoreStatistics struct { CollecLatencytMetric pipeline.LatencyMetric RawLogMetric pipeline.CounterMetric SplitLogMetric pipeline.CounterMetric FlushLogMetric pipeline.CounterMetric FlushLogGroupMetric pipeline.CounterMetric FlushReadyMetric pipeline.CounterMetric FlushLatencyMetric pipeline.LatencyMetric }
func (*LogstoreStatistics) Init ¶
func (p *LogstoreStatistics) Init(context pipeline.Context)
type MetricWrapperV1 ¶ added in v1.8.8
type MetricWrapperV1 struct { pipeline.PluginContext Input pipeline.MetricInputV1 Config *LogstoreConfig Tags map[string]string Interval time.Duration LogsChan chan *pipeline.LogWithContext LatencyMetric pipeline.LatencyMetric // contains filtered or unexported fields }
func (*MetricWrapperV1) AddDataArray ¶ added in v1.8.8
func (*MetricWrapperV1) AddDataArrayWithContext ¶ added in v1.8.8
func (*MetricWrapperV1) AddDataWithContext ¶ added in v1.8.8
func (*MetricWrapperV1) AddRawLog ¶ added in v1.8.8
func (p *MetricWrapperV1) AddRawLog(log *protocol.Log)
func (*MetricWrapperV1) AddRawLogWithContext ¶ added in v1.8.8
func (p *MetricWrapperV1) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})
func (*MetricWrapperV1) Init ¶ added in v1.8.8
func (p *MetricWrapperV1) Init(pluginMeta *pipeline.PluginMeta, inputInterval int) error
func (*MetricWrapperV1) Run ¶ added in v1.8.8
func (p *MetricWrapperV1) Run(control *pipeline.AsyncControl)
type MetricWrapperV2 ¶ added in v1.8.8
type MetricWrapperV2 struct { pipeline.PluginContext Input pipeline.MetricInputV2 Config *LogstoreConfig Tags map[string]string Interval time.Duration LogsChan chan *pipeline.LogWithContext LatencyMetric pipeline.LatencyMetric }
func (*MetricWrapperV2) Init ¶ added in v1.8.8
func (p *MetricWrapperV2) Init(pluginMeta *pipeline.PluginMeta, inputInterval int) error
func (*MetricWrapperV2) Read ¶ added in v1.8.8
func (p *MetricWrapperV2) Read(pipelineContext pipeline.PipelineContext) error
type PluginRunner ¶ added in v1.4.0
type PluginRunner interface { Init(inputQueueSize int, aggrQueueSize int) error AddDefaultAggregatorIfEmpty() error AddDefaultFlusherIfEmpty() error ReceiveRawLog(log *pipeline.LogWithContext) AddPlugin(pluginMeta *pipeline.PluginMeta, category pluginCategory, plugin interface{}, config map[string]interface{}) error GetExtension(name string) (pipeline.Extension, bool) Run() RunPlugins(category pluginCategory, control *pipeline.AsyncControl) Merge(p PluginRunner) Stop(exit bool) error }
type ProcessorWrapper ¶
type ProcessorWrapper struct { pipeline.PluginContext Config *LogstoreConfig LogsChan chan *pipeline.LogWithContext // contains filtered or unexported fields }
type ProcessorWrapperV1 ¶ added in v1.8.8
type ProcessorWrapperV1 struct { ProcessorWrapper Processor pipeline.ProcessorV1 }
func (*ProcessorWrapperV1) Init ¶ added in v1.8.8
func (wrapper *ProcessorWrapperV1) Init(pluginMeta *pipeline.PluginMeta) error
type ProcessorWrapperV2 ¶ added in v1.8.8
type ProcessorWrapperV2 struct { ProcessorWrapper Processor pipeline.ProcessorV2 }
func (*ProcessorWrapperV2) Init ¶ added in v1.8.8
func (wrapper *ProcessorWrapperV2) Init(pluginMeta *pipeline.PluginMeta) error
func (*ProcessorWrapperV2) Process ¶ added in v1.8.8
func (wrapper *ProcessorWrapperV2) Process(in *models.PipelineGroupEvents, context pipeline.PipelineContext)
type ServiceWrapperV1 ¶ added in v1.8.8
type ServiceWrapperV1 struct { pipeline.PluginContext Input pipeline.ServiceInputV1 Config *LogstoreConfig Tags map[string]string Interval time.Duration LogsChan chan *pipeline.LogWithContext // contains filtered or unexported fields }
func (*ServiceWrapperV1) AddDataArray ¶ added in v1.8.8
func (*ServiceWrapperV1) AddDataArrayWithContext ¶ added in v1.8.8
func (*ServiceWrapperV1) AddDataWithContext ¶ added in v1.8.8
func (*ServiceWrapperV1) AddRawLog ¶ added in v1.8.8
func (p *ServiceWrapperV1) AddRawLog(log *protocol.Log)
func (*ServiceWrapperV1) AddRawLogWithContext ¶ added in v1.8.8
func (p *ServiceWrapperV1) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})
func (*ServiceWrapperV1) Init ¶ added in v1.8.8
func (p *ServiceWrapperV1) Init(pluginMeta *pipeline.PluginMeta) error
func (*ServiceWrapperV1) Run ¶ added in v1.8.8
func (p *ServiceWrapperV1) Run(cc *pipeline.AsyncControl)
func (*ServiceWrapperV1) Stop ¶ added in v1.8.8
func (p *ServiceWrapperV1) Stop() error
type ServiceWrapperV2 ¶ added in v1.8.8
type ServiceWrapperV2 struct { pipeline.PluginContext Input pipeline.ServiceInputV2 Config *LogstoreConfig Tags map[string]string Interval time.Duration LogsChan chan *pipeline.LogWithContext }
func (*ServiceWrapperV2) Init ¶ added in v1.8.8
func (p *ServiceWrapperV2) Init(pluginMeta *pipeline.PluginMeta) error
func (*ServiceWrapperV2) StartService ¶ added in v1.8.8
func (p *ServiceWrapperV2) StartService(pipelineContext pipeline.PipelineContext) error
Source Files ¶
- aggregator_wrapper_v1.go
- aggregator_wrapper_v2.go
- always_online_manager.go
- checkpoint_manager.go
- container_config_manager.go
- context_imp.go
- flusher_out_store.go
- flusher_wrapper_v1.go
- flusher_wrapper_v2.go
- logstore_config.go
- logtail_port_manager.go
- metric_export.go
- metric_wrapper_v1.go
- metric_wrapper_v2.go
- plugin_manager.go
- plugin_runner.go
- plugin_runner_helper.go
- plugin_runner_v1.go
- plugin_runner_v2.go
- processor_wrapper_v1.go
- processor_wrapper_v2.go
- self_telemetry_alarm.go
- self_telemetry_container_config.go
- self_telemetry_statistics.go
- service_wrapper_v1.go
- service_wrapper_v2.go