helper

package
v0.0.0-...-4030fd3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2024 License: Apache-2.0 Imports: 55 Imported by: 3

Documentation

Index

Constants

View Source
const (
	ContainerStatusRunning = "running"
	ContainerStatusExited  = "exited"
)
View Source
const (
	// StaleNaN is a signaling NaN, due to the MSB of the mantissa being 0.
	// This value is chosen with many leading 0s, so we have scope to store more
	// complicated values in the future. It is 2 rather than 1 to make
	// it easier to distinguish from the NormalNaN by a human when debugging.
	StaleNaN                              uint64 = 0x7ff0000000000002
	StaleNan                                     = "__STALE_NAN__"
	SlsMetricstoreInvalidReplaceCharacter        = '_'
)
View Source
const (
	MetricAgentMemoryGo        = "go_memory_used_mb"
	MetricAgentGoRoutinesTotal = "go_routines_total"
)

metric keys

View Source
const (
	MetricLabelKeyMetricCategory = "metric_category"
	MetricLabelKeyProject        = "project"
	MetricLabelKeyLogstore       = "logstore"
	MetricLabelKeyPipelineName   = "pipeline_name"
	MetricLabelKeyPluginType     = "plugin_type"
	MetricLabelKeyPluginID       = "plugin_id"
)

label keys

View Source
const (
	MetricPluginInEventsTotal       = "in_events_total"
	MetricPluginInEventGroupsTotal  = "in_event_groups_total"
	MetricPluginInSizeBytes         = "in_size_bytes"
	MetricPluginOutEventsTotal      = "out_events_total"
	MetricPluginOutEventGroupsTotal = "out_event_groups_total"
	MetricPluginOutSizeBytes        = "out_size_bytes"
	MetricPluginTotalDelayMs        = "total_delay_ms"
	MetricPluginTotalProcessTimeMs  = "total_process_time_ms"
)

metric keys

View Source
const (
	MetricPluginBinlogRotate     = "binlog_rotate"
	MetricPluginBinlogSync       = "binlog_sync"
	MetricPluginBinlogDdl        = "binlog_ddl"
	MetricPluginBinlogRow        = "binlog_row"
	MetricPluginBinlogXgid       = "binlog_xgid"
	MetricPluginBinlogCheckpoint = "binlog_checkpoint"
	MetricPluginBinlogFilename   = "binlog_filename"
	MetricPluginBinlogGtid       = "binlog_gtid"
)

********************************************************* * input_canal *********************************************************

View Source
const (
	MetricPluginContainerTotal       = "container_total"
	MetricPluginAddContainerTotal    = "add_container_total"
	MetricPluginRemoveContainerTotal = "remove_container_total"
	MetricPluginUpdateContainerTotal = "update_container_total"
)

********************************************************* * metric_container_info * service_docker_stdout_v2 *********************************************************

View Source
const (
	MetricPluginCollectAvgCostTimeMs = "collect_avg_cost_time_ms"
	MetricPluginCollectTotal         = "collect_total"
)

********************************************************* * service_mysql * service_rdb *********************************************************

View Source
const (
	MetricCollectEntityTotal = "collect_entity_total"
	MetricCollectLinkTotal   = "collect_link_total"
)

********************************************************* * service_k8s_meta *********************************************************

View Source
const (
	MetricPluginDiscardedEventsTotal      = "discarded_events_total"
	MetricPluginOutFailedEventsTotal      = "out_failed_events_total"
	MetricPluginOutKeyNotFoundEventsTotal = "out_key_not_found_events_total"
	MetricPluginOutSuccessfulEventsTotal  = "out_successful_events_total"
)

********************************************************* * all processor(所有解析类的处理插件通用指标。Todo:目前统计还不全、不准确) *********************************************************

View Source
const (
	MetricRunnerK8sMetaAddEventTotal    = "add_event_total"
	MetricRunnerK8sMetaUpdateEventTotal = "update_event_total"
	MetricRunnerK8sMetaDeleteEventTotal = "delete_event_total"
	MetricRunnerK8sMetaCacheSize        = "cache_size"
	MetricRunnerK8sMetaQueueSize        = "queue_size"
	MetricRunnerK8sMetaHTTPRequestTotal = "http_request_total"
	MetricRunnerK8sMetaHTTPAvgDelayMs   = "avg_delay_ms"
	MetricRunnerK8sMetaHTTPMaxDelayMs   = "max_delay_ms"
)

metric keys

View Source
const ContainerIDPrefixSize = 12
View Source
const DockerTimeFormat = "2006-01-02T15:04:05.999999999Z"
View Source
const (
	MetricLabelKeyClusterID = "cluster_id"
)

label keys

View Source
const (
	MetricLabelKeyRunnerName = "runner_name"
)

lebel keys

View Source
const (
	MetricLabelValueMetricCategoryPlugin = "plugin"
)

label values

View Source
const (
	MetricLabelValueMetricCategoryRunner = "runner"
)

label values

View Source
const (
	MetricLabelValueRunnerNameK8sMeta = "k8s_meta"
)

label values

View Source
const (
	PluginPairsPerLogTotal = "pairs_per_log_total"
)

********************************************************* * processor_anchor * processor_regex * processor_string_replace *********************************************************

Variables

View Source
var AddedContainerConfigResult []*ContainerConfigResult

新增的采集配置结果

View Source
var AddedContainerConfigResultMap map[string]*ContainerConfigResult

采集配置结果内存存储

View Source
var ContainerInfoDeletedTimeout = time.Second * time.Duration(120)
View Source
var (
	DefaultCacheFactory = NewMapCache
)
View Source
var DefaultLogFileReaderConfig = LogFileReaderConfig{
	ReadIntervalMs:   1000,
	MaxReadBlockSize: 512 * 1024,
	CloseFileSec:     60,
	Tracker:          nil,
}
View Source
var DefaultLogtailMountPath string
View Source
var DefaultSyncContainersPeriod = time.Second * 3 // should be same as docker_config_update_interval gflag in C
View Source
var DockerCenterTimeout = time.Second * time.Duration(30)
View Source
var EnvTags []string

EnvTags to be add to every logroup

View Source
var EventListenerTimeout = time.Second * time.Duration(3600)
View Source
var FetchAllInterval = time.Second * time.Duration(300)
View Source
var FileOffsetKey = "__tag__:__file_offset__"
View Source
var LogEventPool = sync.Pool{
	New: func() interface{} {
		return new(protocol.LogEvent)
	},
}
View Source
var MaxFetchOneTriggerPerSecond int32 = 200
View Source
var ReservedFileOffsetKey = "__file_offset__"

Functions

func AddMetadata

func AddMetadata(collector pipeline.Collector, time time.Time, node *MetaNode)

AddMetadata to the collector.

func AtomicAddFloat64

func AtomicAddFloat64(dst *float64, n float64)

func AtomicLoadFloat64

func AtomicLoadFloat64(dst *float64) float64

func AtomicStoreFloat64

func AtomicStoreFloat64(dst *float64, n float64)

func ContainerCenterInit

func ContainerCenterInit()

func ContainerProcessAlive

func ContainerProcessAlive(pid int) bool

func CreateDockerClient

func CreateDockerClient(opt ...docker.Opt) (client *docker.Client, err error)

func CreateLog

func CreateLog(t time.Time, enableTimestampNano bool, configTag map[string]string, logTags map[string]string, fields map[string]string) (*protocol.Log, error)

func CreateLogByArray

func CreateLogByArray(t time.Time, enableTimestampNano bool, configTag map[string]string, logTags map[string]string, columns []string, values []string) (*protocol.Log, error)

func CreateLogEvent

func CreateLogEvent(t time.Time, enableTimestampNano bool, fields map[string]string) (*protocol.LogEvent, error)

func CreateLogEventByArray

func CreateLogEventByArray(t time.Time, enableTimestampNano bool, columns []string, values []string) (*protocol.LogEvent, error)

func CreateLogEventByLegacyRawLog

func CreateLogEventByLegacyRawLog(log *protocol.Log) (*protocol.LogEvent, error)

func CreatePipelineEventGroupByLegacyRawLog

func CreatePipelineEventGroupByLegacyRawLog(logEvents []*protocol.LogEvent, configTag map[string]string, logTags map[string]string, ctx map[string]interface{}) (*protocol.PipelineEventGroup, error)

func ExtractPodWorkload

func ExtractPodWorkload(name string) string

func ExtractStatefulSetNum

func ExtractStatefulSetNum(pod string) int

func GetAddressAndDialer

func GetAddressAndDialer(endpoint string) (string, func(addr string, timeout time.Duration) (net.Conn, error), error)

GetAddressAndDialer returns the address parsed from the given endpoint and a dialer.

func GetContainerByAcceptedInfo

func GetContainerByAcceptedInfo(
	includeLabel map[string]string,
	excludeLabel map[string]string,
	includeLabelRegex map[string]*regexp.Regexp,
	excludeLabelRegex map[string]*regexp.Regexp,
	includeEnv map[string]string,
	excludeEnv map[string]string,
	includeEnvRegex map[string]*regexp.Regexp,
	excludeEnvRegex map[string]*regexp.Regexp,
	k8sFilter *K8SFilter,
) map[string]*DockerInfoDetail

GetContainerByAcceptedInfo gathers all info of containers that match the input parameters. Two conditions (&&) for matched container: 1. has a label in @includeLabel and don't have any label in @excludeLabel. 2. has a env in @includeEnv and don't have any env in @excludeEnv. If the input parameters is empty, then all containers are matched. It returns a map contains docker container info.

func GetContainerByAcceptedInfoV2

func GetContainerByAcceptedInfoV2(
	fullList map[string]bool,
	matchList map[string]*DockerInfoDetail,
	includeLabel map[string]string,
	excludeLabel map[string]string,
	includeLabelRegex map[string]*regexp.Regexp,
	excludeLabelRegex map[string]*regexp.Regexp,
	includeEnv map[string]string,
	excludeEnv map[string]string,
	includeEnvRegex map[string]*regexp.Regexp,
	excludeEnvRegex map[string]*regexp.Regexp,
	k8sFilter *K8SFilter,
) (newCount, delCount int, matchAddedList, matchDeletedList []string)

GetContainerByAcceptedInfoV2 works like GetContainerByAcceptedInfo, but uses less CPU. It reduces CPU cost by using full list and match list to find containers that need to be check.

  deleted = fullList - containerMap
  newList = containerMap - fullList
  matchList -= deleted + filter(newList)
  matchAddedList: new container ID for current config
  matchDeletedList: deleted container ID for current config
  fullAddedList = newList
  fullDeletedList = deleted
	 return len(deleted), len(filter(newList)), matchAddedList, matchDeletedList, fullAddedList, fullDeletedList

@param fullList [in,out]: all containers. @param matchList [in,out]: all matched containers.

It returns two integers and four list two integers: the number of new matched containers and deleted containers. four list: new matched containers list, deleted matched containers list, added containers list, delete containers list

func GetContainerMap

func GetContainerMap() map[string]*DockerInfoDetail

for test

func GetContainersLastUpdateTime

func GetContainersLastUpdateTime() int64

func GetDiffContainers

func GetDiffContainers(fullList map[string]struct{}) (fullAddedList, fullDeletedList []string)

func GetFileListByPrefix

func GetFileListByPrefix(dirPath, prefix string, needDir bool, num int) ([]string, error)

func GetFileOffsetTag

func GetFileOffsetTag(log *protocol.Log) *protocol.Log_Content

func GetFreePort

func GetFreePort() (port int, err error)

func GetMetricName

func GetMetricName(log *protocol.Log) string

func GetMountedFilePath

func GetMountedFilePath(logPath string) string

func GetMountedFilePathWithBasePath

func GetMountedFilePathWithBasePath(basePath, logPath string) string

func GetPluginCommonLabels

func GetPluginCommonLabels(context pipeline.Context, pluginMeta *pipeline.PluginMeta) []pipeline.LabelPair

func GetShortID

func GetShortID(fullID string) string

func GetStringFromList

func GetStringFromList(list []string) string

func HasEnvTags

func HasEnvTags(tagKey string, tagValue string) bool

HasEnvTags check if specific tags exist in envTags

func InitContainer

func InitContainer()

func IsCRIRuntimeValid

func IsCRIRuntimeValid(criRuntimeEndpoint string) bool

func IsCRIStatusValid

func IsCRIStatusValid(criRuntimeEndpoint string) bool

func LoadEnvTags

func LoadEnvTags()

LoadEnvTags load tags from env

func LogContentsToMap

func LogContentsToMap(contents []*protocol.Log_Content) map[string]string

func Max

func Max[T constraints.IntUintFloat](x T, y T) T

func Min

func Min[T constraints.IntUintFloat](x T, y T) T

func NewAverageMetric

func NewAverageMetric(n string, lables ...*protocol.Log_Content) pipeline.CounterMetric

NewAverageMetric creates a new AverageMetric.

func NewAverageMetricAndRegister

func NewAverageMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.CounterMetric

NewAverageMetricAndRegister creates a new AverageMetric and register it's metricVector to the MetricsRecord.

func NewCounterMetric

func NewCounterMetric(n string, lables ...*protocol.Log_Content) pipeline.CounterMetric

NewCounterMetric creates a new DeltaMetric.

func NewCounterMetricAndRegister

func NewCounterMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.CounterMetric

NewCounterMetricAndRegister creates a new DeltaMetric and register it's metricVector to the MetricsRecord.

func NewCumulativeCounterMetric

func NewCumulativeCounterMetric(n string, lables ...*protocol.Log_Content) pipeline.CounterMetric

NewCumulativeCounterMetric creates a new CounterMetric.

func NewCumulativeCounterMetricAndRegister

func NewCumulativeCounterMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.CounterMetric

NewCumulativeCounterMetricAndRegister creates a new CounterMetric and register it's metricVector to the MetricsRecord.

func NewGaugeMetric

func NewGaugeMetric(n string, lables ...*protocol.Log_Content) pipeline.GaugeMetric

NewGaugeMetric creates a new GaugeMetric.

func NewGaugeMetricAndRegister

func NewGaugeMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.GaugeMetric

NewGaugeMetricAndRegister creates a new GaugeMetric and register it's metricVector to the MetricsRecord.

func NewGoKitLogWrapper

func NewGoKitLogWrapper(context pipeline.Context, alarmType string) log.Logger

NewGoKitLogWrapper returns a logger that log with context.

func NewGroupedPipelineConext

func NewGroupedPipelineConext() pipeline.PipelineContext

func NewLatencyMetric

func NewLatencyMetric(n string, lables ...*protocol.Log_Content) pipeline.LatencyMetric

NewLatencyMetric creates a new LatencyMetric.

func NewLatencyMetricAndRegister

func NewLatencyMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.LatencyMetric

NewLatencyMetricAndRegister creates a new LatencyMetric and register it's metricVector to the MetricsRecord.

func NewMaxMetricAndRegister

func NewMaxMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.GaugeMetric

NewMaxMetricAndRegister creates a new MaxMetric and register it's metricVector to the MetricsRecord.

func NewMetricLog

func NewMetricLog(name string, t int64, value float64, labels *MetricLabels) *protocol.Log

NewMetricLog create a metric log, time support unix milliseconds and unix nanoseconds. Note: must pass safe string

func NewMetricLogStringVal

func NewMetricLogStringVal(name string, t int64, value string, labels *MetricLabels) *protocol.Log

NewMetricLogStringVal create a metric log with val string, time support unix milliseconds and unix nanoseconds. Note: must pass safe string

func NewNoopPipelineConext

func NewNoopPipelineConext() pipeline.PipelineContext

func NewObservePipelineConext

func NewObservePipelineConext(queueSize int) pipeline.PipelineContext

func NewStringMetric

func NewStringMetric(n string, lables ...*protocol.Log_Content) pipeline.StringMetric

NewStringMetric creates a new StringMetric.

func NewStringMetricAndRegister

func NewStringMetricAndRegister(c *pipeline.MetricsRecord, n string, lables ...*protocol.Log_Content) pipeline.StringMetric

NewStringMetricAndRegister creates a new StringMetric and register it's metricVector to the MetricsRecord.

func NormalizeWindowsPath

func NormalizeWindowsPath(path string) string

NormalizeWindowsPath returns the normal path in heterogeneous platform. parses the root path with windows system driver.

func ProcessContainerAllInfo

func ProcessContainerAllInfo(processor func(*DockerInfoDetail))

func ReadOpen

func ReadOpen(path string) (*os.File, error)

ReadOpen opens a file for reading only

func RecordContainerConfigResult

func RecordContainerConfigResult()

将内存Map中的数据转化到list中,用于输出

func RecordContainerConfigResultIncrement

func RecordContainerConfigResultIncrement(message *ContainerConfigResult)

增量记录采集配置结果

func RecordContainerConfigResultMap

func RecordContainerConfigResultMap(message *ContainerConfigResult)

内存中记录每个采集配置的结果,用于RecordContainerConfigResult的时候全量输出一遍

func RegisterDockerEventListener

func RegisterDockerEventListener(c chan events.Message)

func ReplaceInvalidChars

func ReplaceInvalidChars(in *string)

ReplaceInvalidChars analog of invalidChars = regexp.MustCompile("[^a-zA-Z0-9_]")

func ReverseBytesSlice

func ReverseBytesSlice(s [][]byte)

func ReverseStringSlice

func ReverseStringSlice(s []string)

func ReviseFileOffset

func ReviseFileOffset(log *protocol.Log, offset int64, enableMeta bool)

func SerializeContainerConfigResultToPb

func SerializeContainerConfigResultToPb(logGroup *protocol.LogGroup)

func SerializeContainerToPb

func SerializeContainerToPb(logGroup *protocol.LogGroup, addedContainers []*ContainerDetail)

func SerializeDeleteContainerToPb

func SerializeDeleteContainerToPb(logGroup *protocol.LogGroup, project string, containerIDsStr string)

func SetEnvConfigPrefix

func SetEnvConfigPrefix(prefix string)

func SetMetricVectorCacheFactory

func SetMetricVectorCacheFactory(factory func(pipeline.MetricSet) MetricVectorCache)

SetMetricVectorCacheFactory allows users to set the cache factory for the metric vector, like Prometheus SDK.

func SplitRegexFromMap

func SplitRegexFromMap(input map[string]string) (staticResult map[string]string, regexResult map[string]*regexp.Regexp, err error)

SplitRegexFromMap extract regex from user config regex must begin with ^ and end with $(we only check ^)

func StartService

func StartService(name string, context pipeline.Context, f func())

StartService ..

func TransferLogEventToPB

func TransferLogEventToPB(log *models.Log) (*protocol.LogEvent, error)

func TransferMetricEventToPB

func TransferMetricEventToPB(metric *models.Metric) (*protocol.MetricEvent, error)

func TransferPipelineEventGroupToPB

func TransferPipelineEventGroupToPB(groupInfo *models.GroupInfo, events []models.PipelineEvent) (*protocol.PipelineEventGroup, error)

func TransferSpanEventToPB

func TransferSpanEventToPB(span *models.Span) (*protocol.SpanEvent, error)

func TryGetRealPath

func TryGetRealPath(path string) (string, fs.FileInfo)

func UnRegisterDockerEventListener

func UnRegisterDockerEventListener(c chan events.Message)

Types

type Attributes

type Attributes map[string]interface{}

Attributes used to store attributes in common conditions.

func (Attributes) MarshalEasyJSON

func (v Attributes) MarshalEasyJSON(w *jwriter.Writer)

MarshalEasyJSON supports easyjson.Marshaler interface

func (Attributes) MarshalJSON

func (v Attributes) MarshalJSON() ([]byte, error)

MarshalJSON supports json.Marshaler interface

func (*Attributes) UnmarshalEasyJSON

func (v *Attributes) UnmarshalEasyJSON(l *jlexer.Lexer)

UnmarshalEasyJSON supports easyjson.Unmarshaler interface

func (*Attributes) UnmarshalJSON

func (v *Attributes) UnmarshalJSON(data []byte) error

UnmarshalJSON supports json.Unmarshaler interface

type AverageMetricVector

type AverageMetricVector = pipeline.MetricVector[pipeline.CounterMetric]

func NewAverageMetricVector

func NewAverageMetricVector(metricName string, constLabels map[string]string, labelNames []string) AverageMetricVector

NewAverageMetricVector creates a new AverageMetricVector. Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually.

func NewAverageMetricVectorAndRegister

func NewAverageMetricVectorAndRegister(mr *pipeline.MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) AverageMetricVector

NewAverageMetricVectorAndRegister creates a new AverageMetricVector and register it to the MetricsRecord.

type AvgMetric

type AvgMetric struct {
	// contains filtered or unexported fields
}

func (*AvgMetric) Add

func (s *AvgMetric) Add(v int64)

func (*AvgMetric) Clear

func (s *AvgMetric) Clear(v int64)

func (*AvgMetric) Collect

func (s *AvgMetric) Collect() int64

func (*AvgMetric) Get

func (s *AvgMetric) Get() int64

func (*AvgMetric) GetAvg

func (s *AvgMetric) GetAvg() float64

func (*AvgMetric) Name

func (s *AvgMetric) Name() string

type CRIRuntimeWrapper

type CRIRuntimeWrapper struct {
	// contains filtered or unexported fields
}

func NewCRIRuntimeWrapper

func NewCRIRuntimeWrapper(dockerCenter *DockerCenter) (*CRIRuntimeWrapper, error)

NewCRIRuntimeWrapper ...

type ContainerConfigResult

type ContainerConfigResult struct {
	DataType                      string
	Project                       string
	Logstore                      string
	ConfigName                    string
	PathNotExistInputContainerIDs string
	PathExistInputContainerIDs    string
	SourceAddress                 string
	InputType                     string
	InputIsContainerFile          string
	FlusherType                   string
	FlusherTargetAddress          string
}

type ContainerDetail

type ContainerDetail struct {
	DataType         string
	Project          string
	ContainerID      string
	ContainerIP      string
	ContainerName    string
	RawContainerName string
	LogPath          string
	Driver           string
	Namespace        string
	ImageName        string
	PodName          string
	RootPath         string
	Hostname         string
	HostsPath        string
	Env              map[string]string
	ContainerLabels  map[string]string
	K8sLabels        map[string]string
}

type ContainerDiscoverManager

type ContainerDiscoverManager struct {
	// contains filtered or unexported fields
}

func NewContainerDiscoverManager

func NewContainerDiscoverManager() *ContainerDiscoverManager

func (*ContainerDiscoverManager) Clean

func (c *ContainerDiscoverManager) Clean()

func (*ContainerDiscoverManager) FetchAll

func (c *ContainerDiscoverManager) FetchAll()

FetchAll Currently, there are 3 ways to find containers, which are docker interface, cri interface and static container info file.

func (*ContainerDiscoverManager) FetchOne

func (c *ContainerDiscoverManager) FetchOne(containerID string) error

func (*ContainerDiscoverManager) Init

func (c *ContainerDiscoverManager) Init() bool

func (*ContainerDiscoverManager) LogAlarm

func (c *ContainerDiscoverManager) LogAlarm(err error, msg string)

func (*ContainerDiscoverManager) StartSyncContainers

func (c *ContainerDiscoverManager) StartSyncContainers()

func (*ContainerDiscoverManager) TimerFetch

func (c *ContainerDiscoverManager) TimerFetch()

type ContainerMeta

type ContainerMeta struct {
	PodName         string
	K8sNamespace    string
	ContainerName   string
	Image           string
	K8sLabels       map[string]string
	ContainerLabels map[string]string
	Env             map[string]string
}

func GetContainerMeta

func GetContainerMeta(containerID string) *ContainerMeta

GetContainerMeta get a thread safe container meta struct.

type ConvertConfig

type ConvertConfig struct {
	TagFieldsRename      map[string]string // Rename one or more fields from tags.
	ProtocolFieldsRename map[string]string // Rename one or more fields, The protocol field options can only be: contents, tags, time
	Separator            string            // Convert separator
	Protocol             string            // Convert protocol
	Encoding             string            // Convert encoding
	IgnoreUnExpectedData bool              // IgnoreUnExpectedData will skip on unexpected data if set to true, or will return error and stop processing the whole batch data if set to false
}

type CounterMetricVector

type CounterMetricVector = pipeline.MetricVector[pipeline.CounterMetric]

func NewCounterMetricVector

func NewCounterMetricVector(metricName string, constLabels map[string]string, labelNames []string) CounterMetricVector

NewCounterMetricVector creates a new DeltaMetricVector. Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually.

func NewCounterMetricVectorAndRegister

func NewCounterMetricVectorAndRegister(mr *pipeline.MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) CounterMetricVector

NewCounterMetricVectorAndRegister creates a new DeltaMetricVector and register it to the MetricsRecord.

type CumulativeCounterMetricVector

type CumulativeCounterMetricVector = pipeline.MetricVector[pipeline.CounterMetric]

func NewCumulativeCounterMetricVector deprecated

func NewCumulativeCounterMetricVector(metricName string, constLabels map[string]string, labelNames []string) CumulativeCounterMetricVector

Deprecated: use NewCounterMetricVector instead. NewCumulativeCounterMetricVector creates a new CounterMetricVector. Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually.

func NewCumulativeCounterMetricVectorAndRegister

func NewCumulativeCounterMetricVectorAndRegister(mr *pipeline.MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) CumulativeCounterMetricVector

NewCumulativeCounterMetricVectorAndRegister creates a new CounterMetricVector and register it to the MetricsRecord.

type DefBucket

type DefBucket struct {
	Le    float64
	Count int64
}

DefBucket ...

type DockerCenter

type DockerCenter struct {
	// contains filtered or unexported fields
}

func (*DockerCenter) CreateInfoDetail

func (dc *DockerCenter) CreateInfoDetail(info types.ContainerJSON, envConfigPrefix string, selfConfigFlag bool) *DockerInfoDetail

CreateInfoDetail create DockerInfoDetail with docker.Container Container property used in this function : HostsPath, Config.Hostname, Name, Config.Image, Config.Env, Mounts ContainerInfo.GraphDriver.Data["UpperDir"] Config.Labels

type DockerInfoDetail

type DockerInfoDetail struct {
	StdoutPath       string
	ContainerInfo    types.ContainerJSON
	ContainerNameTag map[string]string
	K8SInfo          *K8SInfo
	EnvConfigInfoMap map[string]*EnvConfigInfo
	ContainerIP      string
	DefaultRootPath  string
	// contains filtered or unexported fields
}

func CreateContainerInfoDetail

func CreateContainerInfoDetail(info types.ContainerJSON, envConfigPrefix string, selfConfigFlag bool) *DockerInfoDetail

func GetContainerBySpecificInfo

func GetContainerBySpecificInfo(filter func(*DockerInfoDetail) bool) (infoList []*DockerInfoDetail)

func (*DockerInfoDetail) DiffMount

func (did *DockerInfoDetail) DiffMount(other *DockerInfoDetail) bool

func (*DockerInfoDetail) DiffName

func (did *DockerInfoDetail) DiffName(other *DockerInfoDetail) bool

func (*DockerInfoDetail) FindAllEnvConfig

func (did *DockerInfoDetail) FindAllEnvConfig(envConfigPrefix string, selfConfigFlag bool)

FindAllEnvConfig find and pre process all env config, add tags for docker info

func (*DockerInfoDetail) FindBestMatchedPath

func (did *DockerInfoDetail) FindBestMatchedPath(pth string) (sourcePath, containerPath string)

func (*DockerInfoDetail) FinishedAt

func (did *DockerInfoDetail) FinishedAt() string

func (*DockerInfoDetail) GetCustomExternalTags

func (did *DockerInfoDetail) GetCustomExternalTags(tags, envs, k8sLabels map[string]string)

func (*DockerInfoDetail) GetEnv

func (did *DockerInfoDetail) GetEnv(key string) string

func (*DockerInfoDetail) GetExternalTags

func (did *DockerInfoDetail) GetExternalTags(envs, k8sLabels map[string]string) map[string]string

func (*DockerInfoDetail) IDPrefix

func (did *DockerInfoDetail) IDPrefix() string

func (*DockerInfoDetail) IsTimeout

func (did *DockerInfoDetail) IsTimeout() bool

func (*DockerInfoDetail) MakeSureEnvConfigExist

func (did *DockerInfoDetail) MakeSureEnvConfigExist(configName string) *EnvConfigInfo

func (*DockerInfoDetail) PodName

func (did *DockerInfoDetail) PodName() string

func (*DockerInfoDetail) Status

func (did *DockerInfoDetail) Status() string

type DockerInfoDetailWithFilteredEnvAndLabel

type DockerInfoDetailWithFilteredEnvAndLabel struct {
	Detail          *DockerInfoDetail
	Env             map[string]string
	ContainerLabels map[string]string
	K8sLabels       map[string]string
}

func CastContainerDetail

func CastContainerDetail(containerInfo *DockerInfoDetail, envSet, labelSet, k8sLabelSet map[string]struct{}) *DockerInfoDetailWithFilteredEnvAndLabel

func GetAllContainerIncludeEnvAndLabelToRecord

func GetAllContainerIncludeEnvAndLabelToRecord(envSet, labelSet, k8sLabelSet, diffEnvSet, diffLabelSet, diffK8sLabelSet map[string]struct{}) []*DockerInfoDetailWithFilteredEnvAndLabel

func GetAllContainerToRecord

func GetAllContainerToRecord(envSet, labelSet, k8sLabelSet map[string]struct{}, containerIds map[string]struct{}) []*DockerInfoDetailWithFilteredEnvAndLabel

type DumpData

type DumpData struct {
	Req  DumpDataReq
	Resp DumpDataResp
}

DumpData current only for http protocol

type DumpDataReq

type DumpDataReq struct {
	Body   []byte
	URL    string
	Header map[string][]string
}

type DumpDataResp

type DumpDataResp struct {
	Body   []byte
	Header map[string]string
}

type Dumper

type Dumper struct {
	// contains filtered or unexported fields
}

func NewDumper

func NewDumper(prefix string, maxFiles int) *Dumper

func (*Dumper) AddDelta

func (d *Dumper) AddDelta(num int64)

func (*Dumper) Begin

func (d *Dumper) Begin(callback func()) bool

func (*Dumper) Close

func (d *Dumper) Close()

func (*Dumper) End

func (d *Dumper) End(timeout time.Duration, expectNum int64) error

func (*Dumper) Init

func (d *Dumper) Init()

func (*Dumper) InputChannel

func (d *Dumper) InputChannel() chan *DumpData

func (*Dumper) Start

func (d *Dumper) Start()

type EnvConfigInfo

type EnvConfigInfo struct {
	ConfigName    string
	ConfigItemMap map[string]string
}

type GRPCServerSettings

type GRPCServerSettings struct {
	Endpoint string `json:"Endpoint"`

	MaxRecvMsgSizeMiB int `json:"MaxRecvMsgSizeMiB"`

	MaxConcurrentStreams int `json:"MaxConcurrentStreams"`

	ReadBufferSize int `json:"ReadBufferSize"`

	WriteBufferSize int `json:"WriteBufferSize"`

	Compression string `json:"Compression"`

	Decompression string `json:"Decompression"`

	TLSConfig tls_helper.ServerConfig `json:"TLSConfig"`
}

func (*GRPCServerSettings) GetServerOption

func (cfg *GRPCServerSettings) GetServerOption() ([]grpc.ServerOption, error)

type GaugeMetricVector

type GaugeMetricVector = pipeline.MetricVector[pipeline.GaugeMetric]

func NewGaugeMetricVector

func NewGaugeMetricVector(metricName string, constLabels map[string]string, labelNames []string) GaugeMetricVector

NewGaugeMetricVector creates a new GaugeMetricVector. Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually.

func NewGaugeMetricVectorAndRegister

func NewGaugeMetricVectorAndRegister(mr *pipeline.MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) GaugeMetricVector

NewGaugeMetricVectorAndRegister creates a new GaugeMetricVector and register it to the MetricsRecord.

type GenericPool

type GenericPool[T any] struct {
	sync.Pool
}

GenericPool is a pool for *[]T

func NewGenericPool

func NewGenericPool[T any](fn func() []T) GenericPool[T]

NewGenericPool returns a GenericPool. It accepts a function that returns a new []T. e.g., func() []byte {return make([]byte, 0, 128)}

func (*GenericPool[T]) Get

func (p *GenericPool[T]) Get() *[]T

func (*GenericPool[T]) Put

func (p *GenericPool[T]) Put(t *[]T)

type GrpcClientConfig

type GrpcClientConfig struct {
	Endpoint string `json:"Endpoint"`

	// The compression key for supported compression types within collector.
	Compression string `json:"Compression"`

	// The headers associated with gRPC requests.
	Headers map[string]string `json:"Headers"`

	// Sets the balancer in grpclb_policy to discover the servers. Default is pick_first.
	// https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md
	BalancerName string `json:"BalancerName"`

	// WaitForReady parameter configures client to wait for ready state before sending data.
	// (https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md)
	WaitForReady bool `json:"WaitForReady"`

	// ReadBufferSize for gRPC client. See grpchelper.WithReadBufferSize.
	// (https://godoc.org/google.golang.org/grpc#WithReadBufferSize).
	ReadBufferSize int `json:"ReadBufferSize"`

	// WriteBufferSize for gRPC gRPC. See grpchelper.WithWriteBufferSize.
	// (https://godoc.org/google.golang.org/grpc#WithWriteBufferSize).
	WriteBufferSize int `json:"WriteBufferSize"`

	// Send retry setting
	Retry RetryConfig `json:"Retry"`

	Timeout int `json:"Timeout"`
}

func (*GrpcClientConfig) GetDialOptions

func (cfg *GrpcClientConfig) GetDialOptions() ([]grpc.DialOption, error)

GetDialOptions maps GrpcClientConfig to a slice of dial options for gRPC.

func (*GrpcClientConfig) GetEndpoint

func (cfg *GrpcClientConfig) GetEndpoint() string

func (*GrpcClientConfig) GetTimeout

func (cfg *GrpcClientConfig) GetTimeout() time.Duration

type HistogramData

type HistogramData struct {
	Buckets []DefBucket
	Count   int64
	Sum     float64
}

HistogramData ...

func (*HistogramData) ToMetricLogs

func (hd *HistogramData) ToMetricLogs(name string, timeMs int64, labels *MetricLabels) []*protocol.Log

ToMetricLogs ..

type K8SFilter

type K8SFilter struct {
	NamespaceReg     *regexp.Regexp
	PodReg           *regexp.Regexp
	ContainerReg     *regexp.Regexp
	IncludeLabels    map[string]string
	ExcludeLabels    map[string]string
	IncludeLabelRegs map[string]*regexp.Regexp
	ExcludeLabelRegs map[string]*regexp.Regexp
	// contains filtered or unexported fields
}

K8SFilter used for find specific container

func CreateK8SFilter

func CreateK8SFilter(ns, pod, container string, includeK8sLabels, excludeK8sLabels map[string]string) (*K8SFilter, error)

CreateK8SFilter ...

type K8SInfo

type K8SInfo struct {
	Namespace       string
	Pod             string
	ContainerName   string
	Labels          map[string]string
	PausedContainer bool
	// contains filtered or unexported fields
}

"io.kubernetes.container.logpath": "/var/log/pods/222e88ff-8f08-11e8-851d-00163f008685/logtail_0.log", "io.kubernetes.container.name": "logtail", "io.kubernetes.docker.type": "container", "io.kubernetes.pod.name": "logtail-z2224", "io.kubernetes.pod.namespace": "kube-system", "io.kubernetes.pod.uid": "222e88ff-8f08-11e8-851d-00163f008685",

func (*K8SInfo) ExtractK8sLabels

func (info *K8SInfo) ExtractK8sLabels(containerInfo types.ContainerJSON)

ExtractK8sLabels only work for original docker container.

func (*K8SInfo) GetLabel

func (info *K8SInfo) GetLabel(key string) string

func (*K8SInfo) IsMatch

func (info *K8SInfo) IsMatch(filter *K8SFilter) bool

IsMatch ...

func (*K8SInfo) IsSamePod

func (info *K8SInfo) IsSamePod(o *K8SInfo) bool

func (*K8SInfo) Merge

func (info *K8SInfo) Merge(o *K8SInfo)

type Label

type Label = pipeline.Label

type Labels

type Labels map[string]string

func (Labels) MarshalEasyJSON

func (v Labels) MarshalEasyJSON(w *jwriter.Writer)

MarshalEasyJSON supports easyjson.Marshaler interface

func (Labels) MarshalJSON

func (v Labels) MarshalJSON() ([]byte, error)

MarshalJSON supports json.Marshaler interface

func (*Labels) UnmarshalEasyJSON

func (v *Labels) UnmarshalEasyJSON(l *jlexer.Lexer)

UnmarshalEasyJSON supports easyjson.Unmarshaler interface

func (*Labels) UnmarshalJSON

func (v *Labels) UnmarshalJSON(data []byte) error

UnmarshalJSON supports json.Unmarshaler interface

type LatMetric

type LatMetric struct {
	// contains filtered or unexported fields
}

func (*LatMetric) Begin

func (s *LatMetric) Begin()

func (*LatMetric) Clear

func (s *LatMetric) Clear()

func (*LatMetric) Collect

func (s *LatMetric) Collect() int64

func (*LatMetric) End

func (s *LatMetric) End()

func (*LatMetric) Get

func (s *LatMetric) Get() int64

func (*LatMetric) Name

func (s *LatMetric) Name() string

type LatencyMetricVector

type LatencyMetricVector = pipeline.MetricVector[pipeline.LatencyMetric]

func NewLatencyMetricVector

func NewLatencyMetricVector(metricName string, constLabels map[string]string, labelNames []string) LatencyMetricVector

NewLatencyMetricVector creates a new LatencyMetricVector. Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually.

func NewLatencyMetricVectorAndRegister

func NewLatencyMetricVectorAndRegister(mr *pipeline.MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) LatencyMetricVector

NewLatencyMetricVectorAndRegister creates a new LatencyMetricVector and register it to the MetricsRecord.

type LocalCollector

type LocalCollector struct {
	Logs []*protocol.Log
}

LocalCollector for unit test

func (*LocalCollector) AddData

func (p *LocalCollector) AddData(tags map[string]string, fields map[string]string, t ...time.Time)

func (*LocalCollector) AddDataArray

func (p *LocalCollector) AddDataArray(tags map[string]string,
	columns []string,
	values []string,
	t ...time.Time)

func (*LocalCollector) AddDataArrayWithContext

func (p *LocalCollector) AddDataArrayWithContext(tags map[string]string,
	columns []string,
	values []string,
	ctx map[string]interface{},
	t ...time.Time)

func (*LocalCollector) AddDataWithContext

func (p *LocalCollector) AddDataWithContext(tags map[string]string, fields map[string]string, ctx map[string]interface{}, t ...time.Time)

func (*LocalCollector) AddRawLog

func (p *LocalCollector) AddRawLog(log *protocol.Log)

func (*LocalCollector) AddRawLogWithContext

func (p *LocalCollector) AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})

type LocalContext

type LocalContext struct {
	MetricsRecords []*pipeline.MetricsRecord

	AllCheckPoint map[string][]byte
	// contains filtered or unexported fields
}

func (*LocalContext) AddPlugin

func (p *LocalContext) AddPlugin(name string)

func (*LocalContext) ExportMetricRecords

func (p *LocalContext) ExportMetricRecords() []map[string]string

ExportMetricRecords is used for exporting metrics records. Each metric is a map[string]string

func (*LocalContext) GetCheckPoint

func (p *LocalContext) GetCheckPoint(key string) (value []byte, exist bool)

func (*LocalContext) GetCheckPointObject

func (p *LocalContext) GetCheckPointObject(key string, obj interface{}) (exist bool)

func (*LocalContext) GetConfigName

func (p *LocalContext) GetConfigName() string

func (*LocalContext) GetExtension

func (p *LocalContext) GetExtension(name string, cfg any) (pipeline.Extension, error)

func (*LocalContext) GetLogstore

func (p *LocalContext) GetLogstore() string

func (*LocalContext) GetLogstoreConfigMetricRecord

func (p *LocalContext) GetLogstoreConfigMetricRecord() *pipeline.MetricsRecord

func (*LocalContext) GetMetricRecord

func (p *LocalContext) GetMetricRecord() *pipeline.MetricsRecord

func (*LocalContext) GetPipelineScopeConfig

func (p *LocalContext) GetPipelineScopeConfig() *config.GlobalConfig

func (*LocalContext) GetProject

func (p *LocalContext) GetProject() string

func (*LocalContext) GetRuntimeContext

func (p *LocalContext) GetRuntimeContext() context.Context

func (*LocalContext) InitContext

func (p *LocalContext) InitContext(project, logstore, configName string)

func (*LocalContext) RegisterLogstoreConfigMetricRecord

func (p *LocalContext) RegisterLogstoreConfigMetricRecord(labels []pipeline.LabelPair) *pipeline.MetricsRecord

func (*LocalContext) RegisterMetricRecord

func (p *LocalContext) RegisterMetricRecord(labels []pipeline.LabelPair) *pipeline.MetricsRecord

func (*LocalContext) SaveCheckPoint

func (p *LocalContext) SaveCheckPoint(key string, value []byte) error

func (*LocalContext) SaveCheckPointObject

func (p *LocalContext) SaveCheckPointObject(key string, obj interface{}) error

type LogFileProcessor

type LogFileProcessor interface {
	// Process the file block and return how many bytes are processed
	// LogFileReader will find last '\n' and call Process
	// @note fileBlock may be nil, in this situation, processor should check multi line timeout
	Process(fileBlock []byte, noChangeInterval time.Duration) int
}

LogFileProcessor interface

type LogFileReader

type LogFileReader struct {
	Config LogFileReaderConfig
	// contains filtered or unexported fields
}

func NewLogFileReader

func NewLogFileReader(context context.Context, checkpoint LogFileReaderCheckPoint, config LogFileReaderConfig, processor LogFileProcessor) (*LogFileReader, error)

func (*LogFileReader) CheckFileChange

func (r *LogFileReader) CheckFileChange() bool

func (*LogFileReader) CloseFile

func (r *LogFileReader) CloseFile(reason string)

func (*LogFileReader) GetCheckpoint

func (r *LogFileReader) GetCheckpoint() (checkpoint LogFileReaderCheckPoint, updateFlag bool)

func (*LogFileReader) GetLastEndOfLine

func (r *LogFileReader) GetLastEndOfLine(n int) int

GetLastEndOfLine return new read bytes end with '\n' @note will return n + r.lastBufferSize when n + r.lastBufferSize == len(r.nowBlock)

func (*LogFileReader) GetProcessor

func (r *LogFileReader) GetProcessor() LogFileProcessor

func (*LogFileReader) ProcessAfterRead

func (r *LogFileReader) ProcessAfterRead(n int)

func (*LogFileReader) ReadAndProcess

func (r *LogFileReader) ReadAndProcess(once bool)

func (*LogFileReader) ReadOpen

func (r *LogFileReader) ReadOpen() error

func (*LogFileReader) Run

func (r *LogFileReader) Run()

func (*LogFileReader) SetForceRead

func (r *LogFileReader) SetForceRead()

SetForceRead force read file when reader start

func (*LogFileReader) Start

func (r *LogFileReader) Start()

func (*LogFileReader) Stop

func (r *LogFileReader) Stop()

func (*LogFileReader) UpdateProcessResult

func (r *LogFileReader) UpdateProcessResult(readN, processedN int)

type LogFileReaderCheckPoint

type LogFileReaderCheckPoint struct {
	Path   string
	Offset int64
	State  StateOS
}

func (*LogFileReaderCheckPoint) IsSame

func (checkpoint *LogFileReaderCheckPoint) IsSame(other *LogFileReaderCheckPoint) bool

IsSame check if the checkpoints is same

type LogFileReaderConfig

type LogFileReaderConfig struct {
	ReadIntervalMs   int
	MaxReadBlockSize int
	CloseFileSec     int
	Tracker          *ReaderMetricTracker
}

type ManagerMeta

type ManagerMeta struct {
	Metas map[string]map[string]map[string]struct{}
	// contains filtered or unexported fields
}

ManagerMeta is designed for a special input plugin to log self telemetry data, such as telegraf. The kind of plugin would connect with other agents, so most of them have a global manager to control or connect with other agents.

func NewmanagerMeta

func NewmanagerMeta(configName string) *ManagerMeta

func (*ManagerMeta) Add

func (b *ManagerMeta) Add(prj, logstore, cfg string)

func (*ManagerMeta) Delete

func (b *ManagerMeta) Delete(prj, logstore, cfg string)

func (*ManagerMeta) GetAlarm

func (b *ManagerMeta) GetAlarm() *util.Alarm

func (*ManagerMeta) GetContext

func (b *ManagerMeta) GetContext() context.Context

func (*ManagerMeta) UpdateAlarm

func (b *ManagerMeta) UpdateAlarm()

type MapCache

type MapCache struct {
	pipeline.MetricSet

	sync.Map
	// contains filtered or unexported fields
}

func (*MapCache) Collect

func (v *MapCache) Collect() []pipeline.Metric

func (*MapCache) WithLabelValues

func (v *MapCache) WithLabelValues(labelValues []string) pipeline.Metric

type MaxMetricVector

type MaxMetricVector = pipeline.MetricVector[pipeline.GaugeMetric]

func NewMaxMetricVector

func NewMaxMetricVector(metricName string, constLabels map[string]string, labelNames []string) MaxMetricVector

NewMaxMetricVector creates a new MaxMetricVector. Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually.

type MetaNode

type MetaNode struct {
	ID         string
	Type       string
	Attributes Attributes
	Labels     Labels
	Parents    Parents
}

MetaNode describes a superset of the metadata that probes can collect about a given node in a given topology, along with the edges (aka adjacency) emanating from the node.

func NewMetaNode

func NewMetaNode(id, nodeType string) *MetaNode

func (MetaNode) MarshalEasyJSON

func (v MetaNode) MarshalEasyJSON(w *jwriter.Writer)

MarshalEasyJSON supports easyjson.Marshaler interface

func (MetaNode) MarshalJSON

func (v MetaNode) MarshalJSON() ([]byte, error)

MarshalJSON supports json.Marshaler interface

func (*MetaNode) UnmarshalEasyJSON

func (v *MetaNode) UnmarshalEasyJSON(l *jlexer.Lexer)

UnmarshalEasyJSON supports easyjson.Unmarshaler interface

func (*MetaNode) UnmarshalJSON

func (v *MetaNode) UnmarshalJSON(data []byte) error

UnmarshalJSON supports json.Unmarshaler interface

func (*MetaNode) WithAttribute

func (n *MetaNode) WithAttribute(k string, v interface{}) *MetaNode

func (*MetaNode) WithAttributes

func (n *MetaNode) WithAttributes(attributes Attributes) *MetaNode

func (*MetaNode) WithLabel

func (n *MetaNode) WithLabel(k, v string) *MetaNode

func (*MetaNode) WithLabels

func (n *MetaNode) WithLabels(labels Labels) *MetaNode

func (*MetaNode) WithParent

func (n *MetaNode) WithParent(key, parentID, parentName string) *MetaNode

func (*MetaNode) WithParents

func (n *MetaNode) WithParents(parents Parents) *MetaNode

type MetricLabel

type MetricLabel struct {
	Name  string
	Value string
}

Label for metric label

type MetricLabels

type MetricLabels struct {
	// contains filtered or unexported fields
}

Labels for metric labels

func (*MetricLabels) Append

func (kv *MetricLabels) Append(key, value string)

Append ...

func (*MetricLabels) AppendMap

func (kv *MetricLabels) AppendMap(mapVal map[string]string)

AppendMap ...

func (*MetricLabels) Clone

func (kv *MetricLabels) Clone() *MetricLabels

func (*MetricLabels) CloneInto

func (kv *MetricLabels) CloneInto(dst *MetricLabels) *MetricLabels

func (*MetricLabels) Len

func (kv *MetricLabels) Len() int

func (*MetricLabels) Less

func (kv *MetricLabels) Less(i int, j int) bool

func (*MetricLabels) Replace

func (kv *MetricLabels) Replace(key, value string)

func (*MetricLabels) String

func (kv *MetricLabels) String() string

func (*MetricLabels) SubSlice

func (kv *MetricLabels) SubSlice(begin, end int)

func (*MetricLabels) Swap

func (kv *MetricLabels) Swap(i int, j int)

type MetricVectorAndCollector

type MetricVectorAndCollector[T pipeline.Metric] interface {
	pipeline.MetricVector[T]
	pipeline.MetricCollector
}

func NewMetricVector

func NewMetricVector[T pipeline.Metric](metricName string, metricType pipeline.SelfMetricType, constLabels map[string]string, labelNames []string) MetricVectorAndCollector[T]

NewMetricVector creates a new MetricVector. It returns a MetricVectorAndCollector, which is a MetricVector and a MetricCollector. For plugin developers, they should use MetricVector APIs to create metrics. For agent itself, it uses MetricCollector APIs to collect metrics.

type MetricVectorCache

type MetricVectorCache interface {
	// return a metric with the given label values.
	// Note that the label values are sorted according to the label keys in MetricSet.
	WithLabelValues([]string) pipeline.Metric

	pipeline.MetricCollector
}

MetricVectorCache is a cache for MetricVector.

func NewMapCache

func NewMapCache(metricSet pipeline.MetricSet) MetricVectorCache

type MetricVectorImpl

type MetricVectorImpl[T pipeline.Metric] struct {
	// contains filtered or unexported fields
}

func (MetricVectorImpl) Collect

func (v MetricVectorImpl) Collect() []pipeline.Metric

func (MetricVectorImpl) ConstLabels

func (v MetricVectorImpl) ConstLabels() []pipeline.Label

func (MetricVectorImpl) LabelKeys

func (v MetricVectorImpl) LabelKeys() []string

func (MetricVectorImpl) Name

func (v MetricVectorImpl) Name() string

func (MetricVectorImpl) Type

func (v MetricVectorImpl) Type() pipeline.SelfMetricType

func (*MetricVectorImpl[T]) WithLabels

func (m *MetricVectorImpl[T]) WithLabels(labels ...pipeline.Label) T

type NormalMetric

type NormalMetric struct {
	// contains filtered or unexported fields
}

func (*NormalMetric) Add

func (s *NormalMetric) Add(v int64)

func (*NormalMetric) Clear

func (s *NormalMetric) Clear(v int64)

func (*NormalMetric) Collect

func (s *NormalMetric) Collect() int64

func (*NormalMetric) Get

func (s *NormalMetric) Get() int64

func (*NormalMetric) Name

func (s *NormalMetric) Name() string

type Parents

type Parents []string

func (Parents) MarshalEasyJSON

func (v Parents) MarshalEasyJSON(w *jwriter.Writer)

MarshalEasyJSON supports easyjson.Marshaler interface

func (Parents) MarshalJSON

func (v Parents) MarshalJSON() ([]byte, error)

MarshalJSON supports json.Marshaler interface

func (*Parents) UnmarshalEasyJSON

func (v *Parents) UnmarshalEasyJSON(l *jlexer.Lexer)

UnmarshalEasyJSON supports easyjson.Unmarshaler interface

func (*Parents) UnmarshalJSON

func (v *Parents) UnmarshalJSON(data []byte) error

UnmarshalJSON supports json.Unmarshaler interface

type ReaderMetricTracker

type ReaderMetricTracker struct {
	OpenCounter        pipeline.CounterMetric
	CloseCounter       pipeline.CounterMetric
	FileSizeCounter    pipeline.CounterMetric
	FileRotatorCounter pipeline.CounterMetric
	ReadCounter        pipeline.CounterMetric
	ReadSizeCounter    pipeline.CounterMetric
	ProcessLatency     pipeline.LatencyMetric
}

func NewReaderMetricTracker

func NewReaderMetricTracker(mr *pipeline.MetricsRecord) *ReaderMetricTracker

type RetryConfig

type RetryConfig struct {
	Enable       bool
	MaxCount     int           `json:"MaxCount"`
	DefaultDelay time.Duration `json:"DefaultDelay"`
}

type RetryInfo

type RetryInfo struct {
	// contains filtered or unexported fields
}

RetryInfo Handle retry for grpc. Refer to https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/otlpexporter/otlp.go#L121

func GetRetryInfo

func GetRetryInfo(err error) *RetryInfo

func (*RetryInfo) Error

func (r *RetryInfo) Error() error

func (*RetryInfo) ShouldDelay

func (r *RetryInfo) ShouldDelay(delay time.Duration) time.Duration

type Series

type Series struct {
	pipeline.MetricSet
	// contains filtered or unexported fields
}

func (Series) Export

func (s Series) Export(metricName, metricValue string) map[string]string

func (Series) SerializeWithStr

func (s Series) SerializeWithStr(log *protocol.Log, metricName, metricValueStr string)

type StateOS

type StateOS struct {
	Inode      uint64
	Device     uint64
	Size       int64
	ModifyTime uint64
}

func GetOSState

func GetOSState(info os.FileInfo) StateOS

GetOSState returns the FileStateOS for non windows systems

func (StateOS) IsChange

func (fs StateOS) IsChange(state StateOS) bool

IsChange file checks if the files are changed

func (StateOS) IsEmpty

func (fs StateOS) IsEmpty() bool

func (StateOS) IsFileChange

func (fs StateOS) IsFileChange(state StateOS) bool

func (StateOS) IsSame

func (fs StateOS) IsSame(state StateOS) bool

IsSame file checks if the files are identical

func (StateOS) String

func (fs StateOS) String() string

type StrMetric

type StrMetric struct {
	// contains filtered or unexported fields
}

func (*StrMetric) Collect

func (s *StrMetric) Collect() string

func (*StrMetric) Get

func (s *StrMetric) Get() string

func (*StrMetric) Name

func (s *StrMetric) Name() string

func (*StrMetric) Set

func (s *StrMetric) Set(v string)

type StringMetricVector

type StringMetricVector = pipeline.MetricVector[pipeline.StringMetric]

func NewStringMetricVector

func NewStringMetricVector(metricName string, constLabels map[string]string, labelNames []string) StringMetricVector

NewStringMetricVector creates a new StringMetricVector. Note that MetricVector doesn't expose Collect API by default. Plugins Developers should be careful to collect metrics manually.

func NewStringMetricVectorAndRegister

func NewStringMetricVectorAndRegister(mr *pipeline.MetricsRecord, metricName string, constLabels map[string]string, labelNames []string) StringMetricVector

NewStringMetricVectorAndRegister creates a new StringMetricVector and register it to the MetricsRecord.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL