Documentation ¶
Index ¶
- Constants
- Variables
- func AddMetadata(collector pipeline.Collector, time time.Time, node *MetaNode)
- func AtomicAddFloat64(dst *float64, n float64)
- func AtomicLoadFloat64(dst *float64) float64
- func AtomicStoreFloat64(dst *float64, n float64)
- func ContainerCenterInit()
- func ContainerProcessAlive(pid int) bool
- func CreateDockerClient(opt ...docker.Opt) (client *docker.Client, err error)
- func CreateLog(t time.Time, enableTimestampNano bool, configTag map[string]string, ...) (*protocol.Log, error)
- func CreateLogByArray(t time.Time, enableTimestampNano bool, configTag map[string]string, ...) (*protocol.Log, error)
- func CreateLogEvent(t time.Time, enableTimestampNano bool, fields map[string]string) (*protocol.LogEvent, error)
- func CreateLogEventByArray(t time.Time, enableTimestampNano bool, columns []string, values []string) (*protocol.LogEvent, error)
- func CreateLogEventByLegacyRawLog(log *protocol.Log) (*protocol.LogEvent, error)
- func CreatePipelineEventGroupByLegacyRawLog(logEvents []*protocol.LogEvent, configTag map[string]string, ...) (*protocol.PipelineEventGroup, error)
- func ExtractPodWorkload(name string) string
- func ExtractStatefulSetNum(pod string) int
- func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout time.Duration) (net.Conn, error), error)
- func GetContainerByAcceptedInfo(includeLabel map[string]string, excludeLabel map[string]string, ...) map[string]*DockerInfoDetail
- func GetContainerByAcceptedInfoV2(fullList map[string]bool, matchList map[string]*DockerInfoDetail, ...) (newCount, delCount int, matchAddedList, matchDeletedList []string)
- func GetContainerMap() map[string]*DockerInfoDetail
- func GetContainersLastUpdateTime() int64
- func GetDiffContainers(fullList map[string]struct{}) (fullAddedList, fullDeletedList []string)
- func GetFileListByPrefix(dirPath, prefix string, needDir bool, num int) ([]string, error)
- func GetFileOffsetTag(log *protocol.Log) *protocol.Log_Content
- func GetFreePort() (port int, err error)
- func GetMetricName(log *protocol.Log) string
- func GetMountedFilePath(logPath string) string
- func GetMountedFilePathWithBasePath(basePath, logPath string) string
- func GetPluginCommonLabels(context pipeline.Context, pluginMeta *pipeline.PluginMeta) []pipeline.LabelPair
- func GetShortID(fullID string) string
- func GetStringFromList(list []string) string
- func HasEnvTags(tagKey string, tagValue string) bool
- func InitContainer()
- func IsCRIRuntimeValid(criRuntimeEndpoint string) bool
- func IsCRIStatusValid(criRuntimeEndpoint string) bool
- func LoadEnvTags()
- func LogContentsToMap(contents []*protocol.Log_Content) map[string]string
- func Max[T constraints.IntUintFloat](x T, y T) T
- func Min[T constraints.IntUintFloat](x T, y T) T
- func NewAverageMetric(n string, lables ...*protocol.Log_Content) pipeline.CounterMetric
- func NewAverageMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.CounterMetric
- func NewCounterMetric(n string, lables ...*protocol.Log_Content) pipeline.CounterMetric
- func NewCounterMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.CounterMetric
- func NewCumulativeCounterMetric(n string, lables ...*protocol.Log_Content) pipeline.CounterMetric
- func NewCumulativeCounterMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.CounterMetric
- func NewGaugeMetric(n string, lables ...*protocol.Log_Content) pipeline.GaugeMetric
- func NewGaugeMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.GaugeMetric
- func NewGoKitLogWrapper(context pipeline.Context, alarmType string) log.Logger
- func NewGroupedPipelineConext() pipeline.PipelineContext
- func NewLatencyMetric(n string, lables ...*protocol.Log_Content) pipeline.LatencyMetric
- func NewLatencyMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.LatencyMetric
- func NewMaxMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.GaugeMetric
- func NewMetricLog(name string, t int64, value float64, labels *MetricLabels) *protocol.Log
- func NewMetricLogStringVal(name string, t int64, value string, labels *MetricLabels) *protocol.Log
- func NewNoopPipelineConext() pipeline.PipelineContext
- func NewObservePipelineConext(queueSize int) pipeline.PipelineContext
- func NewStringMetric(n string, lables ...*protocol.Log_Content) pipeline.StringMetric
- func NewStringMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.StringMetric
- func NormalizeWindowsPath(path string) string
- func ProcessContainerAllInfo(processor func(*DockerInfoDetail))
- func ReadOpen(path string) (*os.File, error)
- func RecordContainerConfigResult()
- func RecordContainerConfigResultIncrement(message *ContainerConfigResult)
- func RecordContainerConfigResultMap(message *ContainerConfigResult)
- func RegisterDockerEventListener(c chan events.Message)
- func ReplaceInvalidChars(in *string)
- func ReverseBytesSlice(s [][]byte)
- func ReverseStringSlice(s []string)
- func ReviseFileOffset(log *protocol.Log, offset int64, enableMeta bool)
- func SerializeContainerConfigResultToPb(logGroup *protocol.LogGroup)
- func SerializeContainerToPb(logGroup *protocol.LogGroup, addedContainers []*ContainerDetail)
- func SerializeDeleteContainerToPb(logGroup *protocol.LogGroup, project string, containerIDsStr string)
- func SetEnvConfigPrefix(prefix string)
- func SetMetricVectorCacheFactory(factory func(pipeline.MetricSet) MetricVectorCache)
- func SplitRegexFromMap(input map[string]string) (staticResult map[string]string, regexResult map[string]*regexp.Regexp, ...)
- func StartService(name string, context pipeline.Context, f func())
- func TransferLogEventToPB(log *models.Log) (*protocol.LogEvent, error)
- func TransferMetricEventToPB(metric *models.Metric) (*protocol.MetricEvent, error)
- func TransferPipelineEventGroupToPB(groupInfo *models.GroupInfo, events []models.PipelineEvent) (*protocol.PipelineEventGroup, error)
- func TransferSpanEventToPB(span *models.Span) (*protocol.SpanEvent, error)
- func TryGetRealPath(path string) (string, fs.FileInfo)
- func UnRegisterDockerEventListener(c chan events.Message)
- type Attributes
- type AverageMetricVector
- type AvgMetric
- type CRIRuntimeWrapper
- type ContainerConfigResult
- type ContainerDetail
- type ContainerDiscoverManager
- func (c *ContainerDiscoverManager) Clean()
- func (c *ContainerDiscoverManager) FetchAll()
- func (c *ContainerDiscoverManager) FetchOne(containerID string) error
- func (c *ContainerDiscoverManager) Init() bool
- func (c *ContainerDiscoverManager) LogAlarm(err error, msg string)
- func (c *ContainerDiscoverManager) StartSyncContainers()
- func (c *ContainerDiscoverManager) TimerFetch()
- type ContainerMeta
- type ConvertConfig
- type CounterMetricVector
- type CumulativeCounterMetricVector
- func NewCumulativeCounterMetricVector(metricName string, constLabels map[string]string, labelNames []string) CumulativeCounterMetricVectordeprecated
- func NewCumulativeCounterMetricVectorAndRegister(mr *pipeline.MetricsRecord, metricName string, constLabels map[string]string, ...) CumulativeCounterMetricVector
- type DefBucket
- type DockerCenter
- type DockerInfoDetail
- func (did *DockerInfoDetail) DiffMount(other *DockerInfoDetail) bool
- func (did *DockerInfoDetail) DiffName(other *DockerInfoDetail) bool
- func (did *DockerInfoDetail) FindAllEnvConfig(envConfigPrefix string, selfConfigFlag bool)
- func (did *DockerInfoDetail) FindBestMatchedPath(pth string) (sourcePath, containerPath string)
- func (did *DockerInfoDetail) FinishedAt() string
- func (did *DockerInfoDetail) GetCustomExternalTags(tags, envs, k8sLabels map[string]string)
- func (did *DockerInfoDetail) GetEnv(key string) string
- func (did *DockerInfoDetail) GetExternalTags(envs, k8sLabels map[string]string) map[string]string
- func (did *DockerInfoDetail) IDPrefix() string
- func (did *DockerInfoDetail) IsTimeout() bool
- func (did *DockerInfoDetail) MakeSureEnvConfigExist(configName string) *EnvConfigInfo
- func (did *DockerInfoDetail) PodName() string
- func (did *DockerInfoDetail) Status() string
- type DockerInfoDetailWithFilteredEnvAndLabel
- func CastContainerDetail(containerInfo *DockerInfoDetail, ...) *DockerInfoDetailWithFilteredEnvAndLabel
- func GetAllContainerIncludeEnvAndLabelToRecord(...) []*DockerInfoDetailWithFilteredEnvAndLabel
- func GetAllContainerToRecord(envSet, labelSet, k8sLabelSet map[string]struct{}, ...) []*DockerInfoDetailWithFilteredEnvAndLabel
- type DumpData
- type DumpDataReq
- type DumpDataResp
- type Dumper
- type EnvConfigInfo
- type GRPCServerSettings
- type GaugeMetricVector
- type GenericPool
- type GrpcClientConfig
- type HistogramData
- type K8SFilter
- type K8SInfo
- type Label
- type Labels
- type LatMetric
- type LatencyMetricVector
- type LocalCollector
- func (p *LocalCollector) AddData(tags map[string]string, fields map[string]string, t ...time.Time)
- func (p *LocalCollector) AddDataArray(tags map[string]string, columns []string, values []string, t ...time.Time)
- func (p *LocalCollector) AddDataArrayWithContext(tags map[string]string, columns []string, values []string, ...)
- func (p *LocalCollector) AddDataWithContext(tags map[string]string, fields map[string]string, ctx map[string]interface{}, ...)
- func (p *LocalCollector) AddRawLog(log *protocol.Log)
- func (p *LocalCollector) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})
- type LocalContext
- func (p *LocalContext) AddPlugin(name string)
- func (p *LocalContext) ExportMetricRecords() []map[string]string
- func (p *LocalContext) GetCheckPoint(key string) (value []byte, exist bool)
- func (p *LocalContext) GetCheckPointObject(key string, obj interface{}) (exist bool)
- func (p *LocalContext) GetConfigName() string
- func (p *LocalContext) GetExtension(name string, cfg any) (pipeline.Extension, error)
- func (p *LocalContext) GetLogstore() string
- func (p *LocalContext) GetLogstoreConfigMetricRecord() *pipeline.MetricsRecord
- func (p *LocalContext) GetMetricRecord() *pipeline.MetricsRecord
- func (p *LocalContext) GetPipelineScopeConfig() *config.GlobalConfig
- func (p *LocalContext) GetProject() string
- func (p *LocalContext) GetRuntimeContext() context.Context
- func (p *LocalContext) InitContext(project, logstore, configName string)
- func (p *LocalContext) RegisterLogstoreConfigMetricRecord(labels []pipeline.LabelPair) *pipeline.MetricsRecord
- func (p *LocalContext) RegisterMetricRecord(labels []pipeline.LabelPair) *pipeline.MetricsRecord
- func (p *LocalContext) SaveCheckPoint(key string, value []byte) error
- func (p *LocalContext) SaveCheckPointObject(key string, obj interface{}) error
- type LogFileProcessor
- type LogFileReader
- func (r *LogFileReader) CheckFileChange() bool
- func (r *LogFileReader) CloseFile(reason string)
- func (r *LogFileReader) GetCheckpoint() (checkpoint LogFileReaderCheckPoint, updateFlag bool)
- func (r *LogFileReader) GetLastEndOfLine(n int) int
- func (r *LogFileReader) GetProcessor() LogFileProcessor
- func (r *LogFileReader) ProcessAfterRead(n int)
- func (r *LogFileReader) ReadAndProcess(once bool)
- func (r *LogFileReader) ReadOpen() error
- func (r *LogFileReader) Run()
- func (r *LogFileReader) SetForceRead()
- func (r *LogFileReader) Start()
- func (r *LogFileReader) Stop()
- func (r *LogFileReader) UpdateProcessResult(readN, processedN int)
- type LogFileReaderCheckPoint
- type LogFileReaderConfig
- type ManagerMeta
- type MapCache
- type MaxMetricVector
- type MetaNode
- func (v MetaNode) MarshalEasyJSON(w *jwriter.Writer)
- func (v MetaNode) MarshalJSON() ([]byte, error)
- func (v *MetaNode) UnmarshalEasyJSON(l *jlexer.Lexer)
- func (v *MetaNode) UnmarshalJSON(data []byte) error
- func (n *MetaNode) WithAttribute(k string, v interface{}) *MetaNode
- func (n *MetaNode) WithAttributes(attributes Attributes) *MetaNode
- func (n *MetaNode) WithLabel(k, v string) *MetaNode
- func (n *MetaNode) WithLabels(labels Labels) *MetaNode
- func (n *MetaNode) WithParent(key, parentID, parentName string) *MetaNode
- func (n *MetaNode) WithParents(parents Parents) *MetaNode
- type MetricLabel
- type MetricLabels
- func (kv *MetricLabels) Append(key, value string)
- func (kv *MetricLabels) AppendMap(mapVal map[string]string)
- func (kv *MetricLabels) Clone() *MetricLabels
- func (kv *MetricLabels) CloneInto(dst *MetricLabels) *MetricLabels
- func (kv *MetricLabels) Len() int
- func (kv *MetricLabels) Less(i int, j int) bool
- func (kv *MetricLabels) Replace(key, value string)
- func (kv *MetricLabels) String() string
- func (kv *MetricLabels) SubSlice(begin, end int)
- func (kv *MetricLabels) Swap(i int, j int)
- type MetricVectorAndCollector
- type MetricVectorCache
- type MetricVectorImpl
- func (v MetricVectorImpl) Collect() []pipeline.Metric
- func (v MetricVectorImpl) ConstLabels() []pipeline.Label
- func (v MetricVectorImpl) LabelKeys() []string
- func (v MetricVectorImpl) Name() string
- func (v MetricVectorImpl) Type() pipeline.SelfMetricType
- func (m *MetricVectorImpl[T]) WithLabels(labels ...pipeline.Label) T
- type NormalMetric
- type Parents
- type ReaderMetricTracker
- type RetryConfig
- type RetryInfo
- type Series
- type StateOS
- type StrMetric
- type StringMetricVector
Constants ¶
const ( ContainerStatusRunning = "running" ContainerStatusExited = "exited" )
const ( // StaleNaN is a signaling NaN, due to the MSB of the mantissa being 0. // This value is chosen with many leading 0s, so we have scope to store more // complicated values in the future. It is 2 rather than 1 to make // it easier to distinguish from the NormalNaN by a human when debugging. StaleNaN uint64 = 0x7ff0000000000002 StaleNan = "__STALE_NAN__" SlsMetricstoreInvalidReplaceCharacter = '_' )
const ( MetricAgentMemoryGo = "go_memory_used_mb" MetricAgentGoRoutinesTotal = "go_routines_total" )
metric keys
const ( MetricLabelKeyMetricCategory = "metric_category" MetricLabelKeyProject = "project" MetricLabelKeyLogstore = "logstore" MetricLabelKeyPipelineName = "pipeline_name" MetricLabelKeyPluginType = "plugin_type" MetricLabelKeyPluginID = "plugin_id" )
label keys
const ( MetricPluginInEventsTotal = "in_events_total" MetricPluginInEventGroupsTotal = "in_event_groups_total" MetricPluginInSizeBytes = "in_size_bytes" MetricPluginOutEventsTotal = "out_events_total" MetricPluginOutEventGroupsTotal = "out_event_groups_total" MetricPluginOutSizeBytes = "out_size_bytes" MetricPluginTotalDelayMs = "total_delay_ms" MetricPluginTotalProcessTimeMs = "total_process_time_ms" )
metric keys
const ( MetricPluginBinlogRotate = "binlog_rotate" MetricPluginBinlogSync = "binlog_sync" MetricPluginBinlogDdl = "binlog_ddl" MetricPluginBinlogRow = "binlog_row" MetricPluginBinlogXgid = "binlog_xgid" MetricPluginBinlogCheckpoint = "binlog_checkpoint" MetricPluginBinlogFilename = "binlog_filename" MetricPluginBinlogGtid = "binlog_gtid" )
********************************************************* * input_canal *********************************************************
const ( MetricPluginContainerTotal = "container_total" MetricPluginAddContainerTotal = "add_container_total" MetricPluginRemoveContainerTotal = "remove_container_total" MetricPluginUpdateContainerTotal = "update_container_total" )
********************************************************* * metric_container_info * service_docker_stdout_v2 *********************************************************
const ( MetricPluginCollectAvgCostTimeMs = "collect_avg_cost_time_ms" MetricPluginCollectTotal = "collect_total" )
********************************************************* * service_mysql * service_rdb *********************************************************
const ( MetricCollectEntityTotal = "collect_entity_total" MetricCollectLinkTotal = "collect_link_total" )
********************************************************* * service_k8s_meta *********************************************************
const ( MetricPluginDiscardedEventsTotal = "discarded_events_total" MetricPluginOutFailedEventsTotal = "out_failed_events_total" MetricPluginOutKeyNotFoundEventsTotal = "out_key_not_found_events_total" MetricPluginOutSuccessfulEventsTotal = "out_successful_events_total" )
********************************************************* * all processor(所有解析类的处理插件通用指标。Todo:目前统计还不全、不准确) *********************************************************
const ( MetricRunnerK8sMetaAddEventTotal = "add_event_total" MetricRunnerK8sMetaUpdateEventTotal = "update_event_total" MetricRunnerK8sMetaDeleteEventTotal = "delete_event_total" MetricRunnerK8sMetaCacheSize = "cache_size" MetricRunnerK8sMetaQueueSize = "queue_size" MetricRunnerK8sMetaHTTPRequestTotal = "http_request_total" MetricRunnerK8sMetaHTTPAvgDelayMs = "avg_delay_ms" MetricRunnerK8sMetaHTTPMaxDelayMs = "max_delay_ms" )
metric keys
const ContainerIDPrefixSize = 12
const DockerTimeFormat = "2006-01-02T15:04:05.999999999Z"
const (
MetricLabelKeyClusterID = "cluster_id"
)
label keys
const (
MetricLabelKeyRunnerName = "runner_name"
)
lebel keys
const (
MetricLabelValueMetricCategoryPlugin = "plugin"
)
label values
const (
MetricLabelValueMetricCategoryRunner = "runner"
)
label values
const (
MetricLabelValueRunnerNameK8sMeta = "k8s_meta"
)
label values
const (
PluginPairsPerLogTotal = "pairs_per_log_total"
)
********************************************************* * processor_anchor * processor_regex * processor_string_replace *********************************************************
Variables ¶
var AddedContainerConfigResult []*ContainerConfigResult
新增的采集配置结果
var AddedContainerConfigResultMap map[string]*ContainerConfigResult
采集配置结果内存存储
var ContainerInfoDeletedTimeout = time.Second * time.Duration(120)
var (
DefaultCacheFactory = NewMapCache
)
var DefaultLogFileReaderConfig = LogFileReaderConfig{ ReadIntervalMs: 1000, MaxReadBlockSize: 512 * 1024, CloseFileSec: 60, Tracker: nil, }
var DefaultLogtailMountPath string
var DefaultSyncContainersPeriod = time.Second * 3 // should be same as docker_config_update_interval gflag in C
var DockerCenterTimeout = time.Second * time.Duration(30)
var EnvTags []string
EnvTags to be add to every logroup
var EventListenerTimeout = time.Second * time.Duration(3600)
var FetchAllInterval = time.Second * time.Duration(300)
var FileOffsetKey = "__tag__:__file_offset__"
var LogEventPool = sync.Pool{ New: func() interface{} { return new(protocol.LogEvent) }, }
var MaxFetchOneTriggerPerSecond int32 = 200
var ReservedFileOffsetKey = "__file_offset__"
Functions ¶
func AddMetadata ¶
AddMetadata to the collector.
func AtomicAddFloat64 ¶
func AtomicLoadFloat64 ¶
func AtomicStoreFloat64 ¶
func ContainerCenterInit ¶
func ContainerCenterInit()
func ContainerProcessAlive ¶
func CreateDockerClient ¶
func CreateLogByArray ¶
func CreateLogEvent ¶
func CreateLogEventByArray ¶
func ExtractPodWorkload ¶
func ExtractStatefulSetNum ¶
func GetAddressAndDialer ¶
func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout time.Duration) (net.Conn, error), error)
GetAddressAndDialer returns the address parsed from the given endpoint and a dialer.
func GetContainerByAcceptedInfo ¶
func GetContainerByAcceptedInfo( includeLabel map[string]string, excludeLabel map[string]string, includeLabelRegex map[string]*regexp.Regexp, excludeLabelRegex map[string]*regexp.Regexp, includeEnv map[string]string, excludeEnv map[string]string, includeEnvRegex map[string]*regexp.Regexp, excludeEnvRegex map[string]*regexp.Regexp, k8sFilter *K8SFilter, ) map[string]*DockerInfoDetail
GetContainerByAcceptedInfo gathers all info of containers that match the input parameters. Two conditions (&&) for matched container: 1. has a label in @includeLabel and don't have any label in @excludeLabel. 2. has a env in @includeEnv and don't have any env in @excludeEnv. If the input parameters is empty, then all containers are matched. It returns a map contains docker container info.
func GetContainerByAcceptedInfoV2 ¶
func GetContainerByAcceptedInfoV2( fullList map[string]bool, matchList map[string]*DockerInfoDetail, includeLabel map[string]string, excludeLabel map[string]string, includeLabelRegex map[string]*regexp.Regexp, excludeLabelRegex map[string]*regexp.Regexp, includeEnv map[string]string, excludeEnv map[string]string, includeEnvRegex map[string]*regexp.Regexp, excludeEnvRegex map[string]*regexp.Regexp, k8sFilter *K8SFilter, ) (newCount, delCount int, matchAddedList, matchDeletedList []string)
GetContainerByAcceptedInfoV2 works like GetContainerByAcceptedInfo, but uses less CPU. It reduces CPU cost by using full list and match list to find containers that need to be check.
deleted = fullList - containerMap newList = containerMap - fullList matchList -= deleted + filter(newList) matchAddedList: new container ID for current config matchDeletedList: deleted container ID for current config fullAddedList = newList fullDeletedList = deleted return len(deleted), len(filter(newList)), matchAddedList, matchDeletedList, fullAddedList, fullDeletedList
@param fullList [in,out]: all containers. @param matchList [in,out]: all matched containers.
It returns two integers and four list two integers: the number of new matched containers and deleted containers. four list: new matched containers list, deleted matched containers list, added containers list, delete containers list
func GetContainersLastUpdateTime ¶
func GetContainersLastUpdateTime() int64
func GetDiffContainers ¶
func GetFileListByPrefix ¶
func GetFileOffsetTag ¶
func GetFileOffsetTag(log *protocol.Log) *protocol.Log_Content
func GetFreePort ¶
func GetMetricName ¶
func GetMountedFilePath ¶
func GetPluginCommonLabels ¶
func GetShortID ¶
func GetStringFromList ¶
func HasEnvTags ¶
HasEnvTags check if specific tags exist in envTags
func InitContainer ¶
func InitContainer()
func IsCRIRuntimeValid ¶
func IsCRIStatusValid ¶
func LogContentsToMap ¶
func LogContentsToMap(contents []*protocol.Log_Content) map[string]string
func Max ¶
func Max[T constraints.IntUintFloat](x T, y T) T
func Min ¶
func Min[T constraints.IntUintFloat](x T, y T) T
func NewAverageMetric ¶
func NewAverageMetric(n string, lables ...*protocol.Log_Content) pipeline.CounterMetric
NewAverageMetric creates a new AverageMetric.
func NewAverageMetricAndRegister ¶
func NewAverageMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.CounterMetric
NewAverageMetricAndRegister creates a new AverageMetric and register it's metricVector to the MetricsRecord.
func NewCounterMetric ¶
func NewCounterMetric(n string, lables ...*protocol.Log_Content) pipeline.CounterMetric
NewCounterMetric creates a new DeltaMetric.
func NewCounterMetricAndRegister ¶
func NewCounterMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.CounterMetric
NewCounterMetricAndRegister creates a new DeltaMetric and register it's metricVector to the MetricsRecord.
func NewCumulativeCounterMetric ¶
func NewCumulativeCounterMetric(n string, lables ...*protocol.Log_Content) pipeline.CounterMetric
NewCumulativeCounterMetric creates a new CounterMetric.
func NewCumulativeCounterMetricAndRegister ¶
func NewCumulativeCounterMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.CounterMetric
NewCumulativeCounterMetricAndRegister creates a new CounterMetric and register it's metricVector to the MetricsRecord.
func NewGaugeMetric ¶
func NewGaugeMetric(n string, lables ...*protocol.Log_Content) pipeline.GaugeMetric
NewGaugeMetric creates a new GaugeMetric.
func NewGaugeMetricAndRegister ¶
func NewGaugeMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.GaugeMetric
NewGaugeMetricAndRegister creates a new GaugeMetric and register it's metricVector to the MetricsRecord.
func NewGoKitLogWrapper ¶
NewGoKitLogWrapper returns a logger that log with context.
func NewGroupedPipelineConext ¶
func NewGroupedPipelineConext() pipeline.PipelineContext
func NewLatencyMetric ¶
func NewLatencyMetric(n string, lables ...*protocol.Log_Content) pipeline.LatencyMetric
NewLatencyMetric creates a new LatencyMetric.
func NewLatencyMetricAndRegister ¶
func NewLatencyMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.LatencyMetric
NewLatencyMetricAndRegister creates a new LatencyMetric and register it's metricVector to the MetricsRecord.
func NewMaxMetricAndRegister ¶
func NewMaxMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.GaugeMetric
NewMaxMetricAndRegister creates a new MaxMetric and register it's metricVector to the MetricsRecord.
func NewMetricLog ¶
NewMetricLog create a metric log, time support unix milliseconds and unix nanoseconds. Note: must pass safe string
func NewMetricLogStringVal ¶
NewMetricLogStringVal create a metric log with val string, time support unix milliseconds and unix nanoseconds. Note: must pass safe string
func NewNoopPipelineConext ¶
func NewNoopPipelineConext() pipeline.PipelineContext
func NewObservePipelineConext ¶
func NewObservePipelineConext(queueSize int) pipeline.PipelineContext
func NewStringMetric ¶
func NewStringMetric(n string, lables ...*protocol.Log_Content) pipeline.StringMetric
NewStringMetric creates a new StringMetric.
func NewStringMetricAndRegister ¶
func NewStringMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.StringMetric
NewStringMetricAndRegister creates a new StringMetric and register it's metricVector to the MetricsRecord.
func NormalizeWindowsPath ¶
NormalizeWindowsPath returns the normal path in heterogeneous platform. parses the root path with windows system driver.
func ProcessContainerAllInfo ¶
func ProcessContainerAllInfo(processor func(*DockerInfoDetail))
func RecordContainerConfigResultIncrement ¶
func RecordContainerConfigResultIncrement(message *ContainerConfigResult)
增量记录采集配置结果
func RecordContainerConfigResultMap ¶
func RecordContainerConfigResultMap(message *ContainerConfigResult)
内存中记录每个采集配置的结果,用于RecordContainerConfigResult的时候全量输出一遍
func ReplaceInvalidChars ¶
func ReplaceInvalidChars(in *string)
ReplaceInvalidChars analog of invalidChars = regexp.MustCompile("[^a-zA-Z0-9_]")
func ReverseBytesSlice ¶
func ReverseBytesSlice(s [][]byte)
func ReverseStringSlice ¶
func ReverseStringSlice(s []string)
func SerializeContainerToPb ¶
func SerializeContainerToPb(logGroup *protocol.LogGroup, addedContainers []*ContainerDetail)
func SetEnvConfigPrefix ¶
func SetEnvConfigPrefix(prefix string)
func SetMetricVectorCacheFactory ¶
func SetMetricVectorCacheFactory(factory func(pipeline.MetricSet) MetricVectorCache)
SetMetricVectorCacheFactory allows users to set the cache factory for the metric vector, like Prometheus SDK.
func SplitRegexFromMap ¶
func SplitRegexFromMap(input map[string]string) (staticResult map[string]string, regexResult map[string]*regexp.Regexp, err error)
SplitRegexFromMap extract regex from user config regex must begin with ^ and end with $(we only check ^)
func StartService ¶
StartService ..
func TransferMetricEventToPB ¶
func TransferMetricEventToPB(metric *models.Metric) (*protocol.MetricEvent, error)
func TransferPipelineEventGroupToPB ¶
func TransferPipelineEventGroupToPB(groupInfo *models.GroupInfo, events []models.PipelineEvent) (*protocol.PipelineEventGroup, error)
func TransferSpanEventToPB ¶
Types ¶
type Attributes ¶
type Attributes map[string]interface{}
Attributes used to store attributes in common conditions.
func (Attributes) MarshalEasyJSON ¶
func (v Attributes) MarshalEasyJSON(w *jwriter.Writer)
MarshalEasyJSON supports easyjson.Marshaler interface
func (Attributes) MarshalJSON ¶
func (v Attributes) MarshalJSON() ([]byte, error)
MarshalJSON supports json.Marshaler interface
func (*Attributes) UnmarshalEasyJSON ¶
func (v *Attributes) UnmarshalEasyJSON(l *jlexer.Lexer)
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*Attributes) UnmarshalJSON ¶
func (v *Attributes) UnmarshalJSON(data []byte) error
UnmarshalJSON supports json.Unmarshaler interface
type AverageMetricVector ¶
type AverageMetricVector = pipeline.MetricVector[pipeline.CounterMetric]
func NewAverageMetricVector ¶
func NewAverageMetricVector(metricName string, constLabels map[string]string, labelNames []string) AverageMetricVector
NewAverageMetricVector creates a new AverageMetricVector. Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually.
func NewAverageMetricVectorAndRegister ¶
func NewAverageMetricVectorAndRegister(mr *pipeline.MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) AverageMetricVector
NewAverageMetricVectorAndRegister creates a new AverageMetricVector and register it to the MetricsRecord.
type CRIRuntimeWrapper ¶
type CRIRuntimeWrapper struct {
// contains filtered or unexported fields
}
func NewCRIRuntimeWrapper ¶
func NewCRIRuntimeWrapper(dockerCenter *DockerCenter) (*CRIRuntimeWrapper, error)
NewCRIRuntimeWrapper ...
type ContainerConfigResult ¶
type ContainerDetail ¶
type ContainerDetail struct { DataType string Project string ContainerID string ContainerIP string ContainerName string RawContainerName string LogPath string Driver string Namespace string ImageName string PodName string RootPath string Hostname string HostsPath string Env map[string]string ContainerLabels map[string]string K8sLabels map[string]string }
type ContainerDiscoverManager ¶
type ContainerDiscoverManager struct {
// contains filtered or unexported fields
}
func NewContainerDiscoverManager ¶
func NewContainerDiscoverManager() *ContainerDiscoverManager
func (*ContainerDiscoverManager) Clean ¶
func (c *ContainerDiscoverManager) Clean()
func (*ContainerDiscoverManager) FetchAll ¶
func (c *ContainerDiscoverManager) FetchAll()
FetchAll Currently, there are 3 ways to find containers, which are docker interface, cri interface and static container info file.
func (*ContainerDiscoverManager) FetchOne ¶
func (c *ContainerDiscoverManager) FetchOne(containerID string) error
func (*ContainerDiscoverManager) Init ¶
func (c *ContainerDiscoverManager) Init() bool
func (*ContainerDiscoverManager) LogAlarm ¶
func (c *ContainerDiscoverManager) LogAlarm(err error, msg string)
func (*ContainerDiscoverManager) StartSyncContainers ¶
func (c *ContainerDiscoverManager) StartSyncContainers()
func (*ContainerDiscoverManager) TimerFetch ¶
func (c *ContainerDiscoverManager) TimerFetch()
type ContainerMeta ¶
type ContainerMeta struct { PodName string K8sNamespace string ContainerName string Image string K8sLabels map[string]string ContainerLabels map[string]string Env map[string]string }
func GetContainerMeta ¶
func GetContainerMeta(containerID string) *ContainerMeta
GetContainerMeta get a thread safe container meta struct.
type ConvertConfig ¶
type ConvertConfig struct { TagFieldsRename map[string]string // Rename one or more fields from tags. ProtocolFieldsRename map[string]string // Rename one or more fields, The protocol field options can only be: contents, tags, time Separator string // Convert separator Protocol string // Convert protocol Encoding string // Convert encoding IgnoreUnExpectedData bool // IgnoreUnExpectedData will skip on unexpected data if set to true, or will return error and stop processing the whole batch data if set to false }
type CounterMetricVector ¶
type CounterMetricVector = pipeline.MetricVector[pipeline.CounterMetric]
func NewCounterMetricVector ¶
func NewCounterMetricVector(metricName string, constLabels map[string]string, labelNames []string) CounterMetricVector
NewCounterMetricVector creates a new DeltaMetricVector. Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually.
func NewCounterMetricVectorAndRegister ¶
func NewCounterMetricVectorAndRegister(mr *pipeline.MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) CounterMetricVector
NewCounterMetricVectorAndRegister creates a new DeltaMetricVector and register it to the MetricsRecord.
type CumulativeCounterMetricVector ¶
type CumulativeCounterMetricVector = pipeline.MetricVector[pipeline.CounterMetric]
func NewCumulativeCounterMetricVector
deprecated
func NewCumulativeCounterMetricVector(metricName string, constLabels map[string]string, labelNames []string) CumulativeCounterMetricVector
Deprecated: use NewCounterMetricVector instead. NewCumulativeCounterMetricVector creates a new CounterMetricVector. Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually.
func NewCumulativeCounterMetricVectorAndRegister ¶
func NewCumulativeCounterMetricVectorAndRegister(mr *pipeline.MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) CumulativeCounterMetricVector
NewCumulativeCounterMetricVectorAndRegister creates a new CounterMetricVector and register it to the MetricsRecord.
type DockerCenter ¶
type DockerCenter struct {
// contains filtered or unexported fields
}
func (*DockerCenter) CreateInfoDetail ¶
func (dc *DockerCenter) CreateInfoDetail(info types.ContainerJSON, envConfigPrefix string, selfConfigFlag bool) *DockerInfoDetail
CreateInfoDetail create DockerInfoDetail with docker.Container Container property used in this function : HostsPath, Config.Hostname, Name, Config.Image, Config.Env, Mounts ContainerInfo.GraphDriver.Data["UpperDir"] Config.Labels
type DockerInfoDetail ¶
type DockerInfoDetail struct { StdoutPath string ContainerInfo types.ContainerJSON ContainerNameTag map[string]string K8SInfo *K8SInfo EnvConfigInfoMap map[string]*EnvConfigInfo ContainerIP string DefaultRootPath string // contains filtered or unexported fields }
func CreateContainerInfoDetail ¶
func CreateContainerInfoDetail(info types.ContainerJSON, envConfigPrefix string, selfConfigFlag bool) *DockerInfoDetail
func GetContainerBySpecificInfo ¶
func GetContainerBySpecificInfo(filter func(*DockerInfoDetail) bool) (infoList []*DockerInfoDetail)
func (*DockerInfoDetail) DiffMount ¶
func (did *DockerInfoDetail) DiffMount(other *DockerInfoDetail) bool
func (*DockerInfoDetail) DiffName ¶
func (did *DockerInfoDetail) DiffName(other *DockerInfoDetail) bool
func (*DockerInfoDetail) FindAllEnvConfig ¶
func (did *DockerInfoDetail) FindAllEnvConfig(envConfigPrefix string, selfConfigFlag bool)
FindAllEnvConfig find and pre process all env config, add tags for docker info
func (*DockerInfoDetail) FindBestMatchedPath ¶
func (did *DockerInfoDetail) FindBestMatchedPath(pth string) (sourcePath, containerPath string)
func (*DockerInfoDetail) FinishedAt ¶
func (did *DockerInfoDetail) FinishedAt() string
func (*DockerInfoDetail) GetCustomExternalTags ¶
func (did *DockerInfoDetail) GetCustomExternalTags(tags, envs, k8sLabels map[string]string)
func (*DockerInfoDetail) GetEnv ¶
func (did *DockerInfoDetail) GetEnv(key string) string
func (*DockerInfoDetail) GetExternalTags ¶
func (did *DockerInfoDetail) GetExternalTags(envs, k8sLabels map[string]string) map[string]string
func (*DockerInfoDetail) IDPrefix ¶
func (did *DockerInfoDetail) IDPrefix() string
func (*DockerInfoDetail) IsTimeout ¶
func (did *DockerInfoDetail) IsTimeout() bool
func (*DockerInfoDetail) MakeSureEnvConfigExist ¶
func (did *DockerInfoDetail) MakeSureEnvConfigExist(configName string) *EnvConfigInfo
func (*DockerInfoDetail) PodName ¶
func (did *DockerInfoDetail) PodName() string
func (*DockerInfoDetail) Status ¶
func (did *DockerInfoDetail) Status() string
type DockerInfoDetailWithFilteredEnvAndLabel ¶
type DockerInfoDetailWithFilteredEnvAndLabel struct { Detail *DockerInfoDetail Env map[string]string ContainerLabels map[string]string K8sLabels map[string]string }
func CastContainerDetail ¶
func CastContainerDetail(containerInfo *DockerInfoDetail, envSet, labelSet, k8sLabelSet map[string]struct{}) *DockerInfoDetailWithFilteredEnvAndLabel
func GetAllContainerIncludeEnvAndLabelToRecord ¶
func GetAllContainerIncludeEnvAndLabelToRecord(envSet, labelSet, k8sLabelSet, diffEnvSet, diffLabelSet, diffK8sLabelSet map[string]struct{}) []*DockerInfoDetailWithFilteredEnvAndLabel
func GetAllContainerToRecord ¶
func GetAllContainerToRecord(envSet, labelSet, k8sLabelSet map[string]struct{}, containerIds map[string]struct{}) []*DockerInfoDetailWithFilteredEnvAndLabel
type DumpData ¶
type DumpData struct { Req DumpDataReq Resp DumpDataResp }
DumpData current only for http protocol
type DumpDataResp ¶
type Dumper ¶
type Dumper struct {
// contains filtered or unexported fields
}
func (*Dumper) InputChannel ¶
type EnvConfigInfo ¶
type GRPCServerSettings ¶
type GRPCServerSettings struct { Endpoint string `json:"Endpoint"` MaxRecvMsgSizeMiB int `json:"MaxRecvMsgSizeMiB"` MaxConcurrentStreams int `json:"MaxConcurrentStreams"` ReadBufferSize int `json:"ReadBufferSize"` WriteBufferSize int `json:"WriteBufferSize"` Compression string `json:"Compression"` Decompression string `json:"Decompression"` TLSConfig tls_helper.ServerConfig `json:"TLSConfig"` }
func (*GRPCServerSettings) GetServerOption ¶
func (cfg *GRPCServerSettings) GetServerOption() ([]grpc.ServerOption, error)
type GaugeMetricVector ¶
type GaugeMetricVector = pipeline.MetricVector[pipeline.GaugeMetric]
func NewGaugeMetricVector ¶
func NewGaugeMetricVector(metricName string, constLabels map[string]string, labelNames []string) GaugeMetricVector
NewGaugeMetricVector creates a new GaugeMetricVector. Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually.
func NewGaugeMetricVectorAndRegister ¶
func NewGaugeMetricVectorAndRegister(mr *pipeline.MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) GaugeMetricVector
NewGaugeMetricVectorAndRegister creates a new GaugeMetricVector and register it to the MetricsRecord.
type GenericPool ¶
GenericPool is a pool for *[]T
func NewGenericPool ¶
func NewGenericPool[T any](fn func() []T) GenericPool[T]
NewGenericPool returns a GenericPool. It accepts a function that returns a new []T. e.g., func() []byte {return make([]byte, 0, 128)}
func (*GenericPool[T]) Get ¶
func (p *GenericPool[T]) Get() *[]T
func (*GenericPool[T]) Put ¶
func (p *GenericPool[T]) Put(t *[]T)
type GrpcClientConfig ¶
type GrpcClientConfig struct { Endpoint string `json:"Endpoint"` // The compression key for supported compression types within collector. Compression string `json:"Compression"` // The headers associated with gRPC requests. Headers map[string]string `json:"Headers"` // Sets the balancer in grpclb_policy to discover the servers. Default is pick_first. // https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md BalancerName string `json:"BalancerName"` // WaitForReady parameter configures client to wait for ready state before sending data. // (https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md) WaitForReady bool `json:"WaitForReady"` // ReadBufferSize for gRPC client. See grpchelper.WithReadBufferSize. // (https://godoc.org/google.golang.org/grpc#WithReadBufferSize). ReadBufferSize int `json:"ReadBufferSize"` // WriteBufferSize for gRPC gRPC. See grpchelper.WithWriteBufferSize. // (https://godoc.org/google.golang.org/grpc#WithWriteBufferSize). WriteBufferSize int `json:"WriteBufferSize"` // Send retry setting Retry RetryConfig `json:"Retry"` Timeout int `json:"Timeout"` }
func (*GrpcClientConfig) GetDialOptions ¶
func (cfg *GrpcClientConfig) GetDialOptions() ([]grpc.DialOption, error)
GetDialOptions maps GrpcClientConfig to a slice of dial options for gRPC.
func (*GrpcClientConfig) GetEndpoint ¶
func (cfg *GrpcClientConfig) GetEndpoint() string
func (*GrpcClientConfig) GetTimeout ¶
func (cfg *GrpcClientConfig) GetTimeout() time.Duration
type HistogramData ¶
HistogramData ...
func (*HistogramData) ToMetricLogs ¶
func (hd *HistogramData) ToMetricLogs(name string, timeMs int64, labels *MetricLabels) []*protocol.Log
ToMetricLogs ..
type K8SFilter ¶
type K8SFilter struct { NamespaceReg *regexp.Regexp PodReg *regexp.Regexp ContainerReg *regexp.Regexp IncludeLabels map[string]string ExcludeLabels map[string]string IncludeLabelRegs map[string]*regexp.Regexp ExcludeLabelRegs map[string]*regexp.Regexp // contains filtered or unexported fields }
K8SFilter used for find specific container
type K8SInfo ¶
type K8SInfo struct { Namespace string Pod string ContainerName string Labels map[string]string PausedContainer bool // contains filtered or unexported fields }
"io.kubernetes.container.logpath": "/var/log/pods/222e88ff-8f08-11e8-851d-00163f008685/logtail_0.log", "io.kubernetes.container.name": "logtail", "io.kubernetes.docker.type": "container", "io.kubernetes.pod.name": "logtail-z2224", "io.kubernetes.pod.namespace": "kube-system", "io.kubernetes.pod.uid": "222e88ff-8f08-11e8-851d-00163f008685",
func (*K8SInfo) ExtractK8sLabels ¶
func (info *K8SInfo) ExtractK8sLabels(containerInfo types.ContainerJSON)
ExtractK8sLabels only work for original docker container.
type Labels ¶
func (Labels) MarshalEasyJSON ¶
MarshalEasyJSON supports easyjson.Marshaler interface
func (Labels) MarshalJSON ¶
MarshalJSON supports json.Marshaler interface
func (*Labels) UnmarshalEasyJSON ¶
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*Labels) UnmarshalJSON ¶
UnmarshalJSON supports json.Unmarshaler interface
type LatencyMetricVector ¶
type LatencyMetricVector = pipeline.MetricVector[pipeline.LatencyMetric]
func NewLatencyMetricVector ¶
func NewLatencyMetricVector(metricName string, constLabels map[string]string, labelNames []string) LatencyMetricVector
NewLatencyMetricVector creates a new LatencyMetricVector. Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually.
func NewLatencyMetricVectorAndRegister ¶
func NewLatencyMetricVectorAndRegister(mr *pipeline.MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) LatencyMetricVector
NewLatencyMetricVectorAndRegister creates a new LatencyMetricVector and register it to the MetricsRecord.
type LocalCollector ¶
LocalCollector for unit test
func (*LocalCollector) AddDataArray ¶
func (*LocalCollector) AddDataArrayWithContext ¶
func (*LocalCollector) AddDataWithContext ¶
func (*LocalCollector) AddRawLog ¶
func (p *LocalCollector) AddRawLog(log *protocol.Log)
func (*LocalCollector) AddRawLogWithContext ¶
func (p *LocalCollector) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})
type LocalContext ¶
type LocalContext struct { MetricsRecords []*pipeline.MetricsRecord AllCheckPoint map[string][]byte // contains filtered or unexported fields }
func (*LocalContext) AddPlugin ¶
func (p *LocalContext) AddPlugin(name string)
func (*LocalContext) ExportMetricRecords ¶
func (p *LocalContext) ExportMetricRecords() []map[string]string
ExportMetricRecords is used for exporting metrics records. Each metric is a map[string]string
func (*LocalContext) GetCheckPoint ¶
func (p *LocalContext) GetCheckPoint(key string) (value []byte, exist bool)
func (*LocalContext) GetCheckPointObject ¶
func (p *LocalContext) GetCheckPointObject(key string, obj interface{}) (exist bool)
func (*LocalContext) GetConfigName ¶
func (p *LocalContext) GetConfigName() string
func (*LocalContext) GetExtension ¶
func (*LocalContext) GetLogstore ¶
func (p *LocalContext) GetLogstore() string
func (*LocalContext) GetLogstoreConfigMetricRecord ¶
func (p *LocalContext) GetLogstoreConfigMetricRecord() *pipeline.MetricsRecord
func (*LocalContext) GetMetricRecord ¶
func (p *LocalContext) GetMetricRecord() *pipeline.MetricsRecord
func (*LocalContext) GetPipelineScopeConfig ¶
func (p *LocalContext) GetPipelineScopeConfig() *config.GlobalConfig
func (*LocalContext) GetProject ¶
func (p *LocalContext) GetProject() string
func (*LocalContext) GetRuntimeContext ¶
func (p *LocalContext) GetRuntimeContext() context.Context
func (*LocalContext) InitContext ¶
func (p *LocalContext) InitContext(project, logstore, configName string)
func (*LocalContext) RegisterLogstoreConfigMetricRecord ¶
func (p *LocalContext) RegisterLogstoreConfigMetricRecord(labels []pipeline.LabelPair) *pipeline.MetricsRecord
func (*LocalContext) RegisterMetricRecord ¶
func (p *LocalContext) RegisterMetricRecord(labels []pipeline.LabelPair) *pipeline.MetricsRecord
func (*LocalContext) SaveCheckPoint ¶
func (p *LocalContext) SaveCheckPoint(key string, value []byte) error
func (*LocalContext) SaveCheckPointObject ¶
func (p *LocalContext) SaveCheckPointObject(key string, obj interface{}) error
type LogFileProcessor ¶
type LogFileProcessor interface { // Process the file block and return how many bytes are processed // LogFileReader will find last '\n' and call Process // @note fileBlock may be nil, in this situation, processor should check multi line timeout Process(fileBlock []byte, noChangeInterval time.Duration) int }
LogFileProcessor interface
type LogFileReader ¶
type LogFileReader struct { Config LogFileReaderConfig // contains filtered or unexported fields }
func NewLogFileReader ¶
func NewLogFileReader(context context.Context, checkpoint LogFileReaderCheckPoint, config LogFileReaderConfig, processor LogFileProcessor) (*LogFileReader, error)
func (*LogFileReader) CheckFileChange ¶
func (r *LogFileReader) CheckFileChange() bool
func (*LogFileReader) CloseFile ¶
func (r *LogFileReader) CloseFile(reason string)
func (*LogFileReader) GetCheckpoint ¶
func (r *LogFileReader) GetCheckpoint() (checkpoint LogFileReaderCheckPoint, updateFlag bool)
func (*LogFileReader) GetLastEndOfLine ¶
func (r *LogFileReader) GetLastEndOfLine(n int) int
GetLastEndOfLine return new read bytes end with '\n' @note will return n + r.lastBufferSize when n + r.lastBufferSize == len(r.nowBlock)
func (*LogFileReader) GetProcessor ¶
func (r *LogFileReader) GetProcessor() LogFileProcessor
func (*LogFileReader) ProcessAfterRead ¶
func (r *LogFileReader) ProcessAfterRead(n int)
func (*LogFileReader) ReadAndProcess ¶
func (r *LogFileReader) ReadAndProcess(once bool)
func (*LogFileReader) ReadOpen ¶
func (r *LogFileReader) ReadOpen() error
func (*LogFileReader) Run ¶
func (r *LogFileReader) Run()
func (*LogFileReader) SetForceRead ¶
func (r *LogFileReader) SetForceRead()
SetForceRead force read file when reader start
func (*LogFileReader) Start ¶
func (r *LogFileReader) Start()
func (*LogFileReader) Stop ¶
func (r *LogFileReader) Stop()
func (*LogFileReader) UpdateProcessResult ¶
func (r *LogFileReader) UpdateProcessResult(readN, processedN int)
type LogFileReaderCheckPoint ¶
func (*LogFileReaderCheckPoint) IsSame ¶
func (checkpoint *LogFileReaderCheckPoint) IsSame(other *LogFileReaderCheckPoint) bool
IsSame check if the checkpoints is same
type LogFileReaderConfig ¶
type LogFileReaderConfig struct { ReadIntervalMs int MaxReadBlockSize int CloseFileSec int Tracker *ReaderMetricTracker }
type ManagerMeta ¶
type ManagerMeta struct { Metas map[string]map[string]map[string]struct{} // contains filtered or unexported fields }
ManagerMeta is designed for a special input plugin to log self telemetry data, such as telegraf. The kind of plugin would connect with other agents, so most of them have a global manager to control or connect with other agents.
func NewmanagerMeta ¶
func NewmanagerMeta(configName string) *ManagerMeta
func (*ManagerMeta) Add ¶
func (b *ManagerMeta) Add(prj, logstore, cfg string)
func (*ManagerMeta) Delete ¶
func (b *ManagerMeta) Delete(prj, logstore, cfg string)
func (*ManagerMeta) GetAlarm ¶
func (b *ManagerMeta) GetAlarm() *util.Alarm
func (*ManagerMeta) GetContext ¶
func (b *ManagerMeta) GetContext() context.Context
func (*ManagerMeta) UpdateAlarm ¶
func (b *ManagerMeta) UpdateAlarm()
type MaxMetricVector ¶
type MaxMetricVector = pipeline.MetricVector[pipeline.GaugeMetric]
func NewMaxMetricVector ¶
func NewMaxMetricVector(metricName string, constLabels map[string]string, labelNames []string) MaxMetricVector
NewMaxMetricVector creates a new MaxMetricVector. Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually.
type MetaNode ¶
type MetaNode struct { ID string Type string Attributes Attributes Labels Labels Parents Parents }
MetaNode describes a superset of the metadata that probes can collect about a given node in a given topology, along with the edges (aka adjacency) emanating from the node.
func NewMetaNode ¶
func (MetaNode) MarshalEasyJSON ¶
MarshalEasyJSON supports easyjson.Marshaler interface
func (MetaNode) MarshalJSON ¶
MarshalJSON supports json.Marshaler interface
func (*MetaNode) UnmarshalEasyJSON ¶
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*MetaNode) UnmarshalJSON ¶
UnmarshalJSON supports json.Unmarshaler interface
func (*MetaNode) WithAttribute ¶
func (*MetaNode) WithAttributes ¶
func (n *MetaNode) WithAttributes(attributes Attributes) *MetaNode
func (*MetaNode) WithLabels ¶
func (*MetaNode) WithParent ¶
func (*MetaNode) WithParents ¶
type MetricLabels ¶
type MetricLabels struct {
// contains filtered or unexported fields
}
Labels for metric labels
func (*MetricLabels) AppendMap ¶
func (kv *MetricLabels) AppendMap(mapVal map[string]string)
AppendMap ...
func (*MetricLabels) Clone ¶
func (kv *MetricLabels) Clone() *MetricLabels
func (*MetricLabels) CloneInto ¶
func (kv *MetricLabels) CloneInto(dst *MetricLabels) *MetricLabels
func (*MetricLabels) Len ¶
func (kv *MetricLabels) Len() int
func (*MetricLabels) Replace ¶
func (kv *MetricLabels) Replace(key, value string)
func (*MetricLabels) String ¶
func (kv *MetricLabels) String() string
func (*MetricLabels) SubSlice ¶
func (kv *MetricLabels) SubSlice(begin, end int)
func (*MetricLabels) Swap ¶
func (kv *MetricLabels) Swap(i int, j int)
type MetricVectorAndCollector ¶
type MetricVectorAndCollector[T pipeline.Metric] interface { pipeline.MetricVector[T] pipeline.MetricCollector }
func NewMetricVector ¶
func NewMetricVector[T pipeline.Metric](metricName string, metricType pipeline.SelfMetricType, constLabels map[string]string, labelNames []string) MetricVectorAndCollector[T]
NewMetricVector creates a new MetricVector. It returns a MetricVectorAndCollector, which is a MetricVector and a MetricCollector. For plugin developers, they should use MetricVector APIs to create metrics. For agent itself, it uses MetricCollector APIs to collect metrics.
type MetricVectorCache ¶
type MetricVectorCache interface { // return a metric with the given label values. // Note that the label values are sorted according to the label keys in MetricSet. WithLabelValues([]string) pipeline.Metric pipeline.MetricCollector }
MetricVectorCache is a cache for MetricVector.
func NewMapCache ¶
func NewMapCache(metricSet pipeline.MetricSet) MetricVectorCache
type MetricVectorImpl ¶
func (MetricVectorImpl) ConstLabels ¶
func (MetricVectorImpl) Type ¶
func (v MetricVectorImpl) Type() pipeline.SelfMetricType
func (*MetricVectorImpl[T]) WithLabels ¶
func (m *MetricVectorImpl[T]) WithLabels(labels ...pipeline.Label) T
type NormalMetric ¶
type NormalMetric struct {
// contains filtered or unexported fields
}
func (*NormalMetric) Add ¶
func (s *NormalMetric) Add(v int64)
func (*NormalMetric) Clear ¶
func (s *NormalMetric) Clear(v int64)
func (*NormalMetric) Collect ¶
func (s *NormalMetric) Collect() int64
func (*NormalMetric) Get ¶
func (s *NormalMetric) Get() int64
func (*NormalMetric) Name ¶
func (s *NormalMetric) Name() string
type Parents ¶
type Parents []string
func (Parents) MarshalEasyJSON ¶
MarshalEasyJSON supports easyjson.Marshaler interface
func (Parents) MarshalJSON ¶
MarshalJSON supports json.Marshaler interface
func (*Parents) UnmarshalEasyJSON ¶
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*Parents) UnmarshalJSON ¶
UnmarshalJSON supports json.Unmarshaler interface
type ReaderMetricTracker ¶
type ReaderMetricTracker struct { OpenCounter pipeline.CounterMetric CloseCounter pipeline.CounterMetric FileSizeCounter pipeline.CounterMetric FileRotatorCounter pipeline.CounterMetric ReadCounter pipeline.CounterMetric ReadSizeCounter pipeline.CounterMetric ProcessLatency pipeline.LatencyMetric }
func NewReaderMetricTracker ¶
func NewReaderMetricTracker(mr *pipeline.MetricsRecord) *ReaderMetricTracker
type RetryConfig ¶
type RetryInfo ¶
type RetryInfo struct {
// contains filtered or unexported fields
}
RetryInfo Handle retry for grpc. Refer to https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/otlpexporter/otlp.go#L121
func GetRetryInfo ¶
type StateOS ¶
func GetOSState ¶
GetOSState returns the FileStateOS for non windows systems
func (StateOS) IsFileChange ¶
type StringMetricVector ¶
type StringMetricVector = pipeline.MetricVector[pipeline.StringMetric]
func NewStringMetricVector ¶
func NewStringMetricVector(metricName string, constLabels map[string]string, labelNames []string) StringMetricVector
NewStringMetricVector creates a new StringMetricVector. Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually.
func NewStringMetricVectorAndRegister ¶
func NewStringMetricVectorAndRegister(mr *pipeline.MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) StringMetricVector
NewStringMetricVectorAndRegister creates a new StringMetricVector and register it to the MetricsRecord.
Source Files ¶
- collector_imp.go
- container_config.go
- container_discover_controller.go
- container_export.go
- converter_helper.go
- cri_helper.go
- docker_center.go
- docker_center_file_discover.go
- docker_cri_adapter.go
- dumper.go
- env_tags.go
- file_helper_linux.go
- go_kit_log_wrapper.go
- grpc_helper.go
- input_manager_helper.go
- k8s.go
- local_collector.go
- local_context.go
- log_file_processor.go
- log_file_reader.go
- log_helper.go
- math_helper.go
- meta_helper.go
- meta_helper_easyjson.go
- mount_others.go
- net.go
- path_helper.go
- pipeline_event_helper.go
- pool_helper.go
- probe_http.go
- process_helper_linux.go
- self_metric_imp.go
- self_metrics_agent_constants.go
- self_metrics_plugin_constants.go
- self_metrics_runner_constants.go
- self_metrics_v2_imp.go
- self_metrics_vector_imp.go
- service_helper.go
- slice_helper.go
- split_log_helper.go