executor

package
v0.0.0-...-5c7ffcf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2024 License: Apache-2.0 Imports: 44 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TypeAuto                 = "auto"
	TypeProcessTime          = "processTime"
	TypeElect                = "elect"
	FormatUnix               = "unix"
	FormatUnixMilli          = "unixMilli"
	FormatGolangLayout       = "golangLayout"
	FormatAuto               = "auto"
	TimeParseError     int64 = -2
)
View Source
const (

	// ReportTaskInfoInterval indicates the time interval of reporting task info.
	ReportTaskInfoInterval = 10
)

Variables

View Source
var ErrIndexOutOfBound = errors.New("index ouf of bound")
View Source
var (
	LogParseNotMatched = errors.New("LogParseNotMatched")
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	Select     XSelect
	Where      XWhere
	GroupBy    XGroupBy
	Window     *XWindow
	LogParser  LogParser
	TimeParser TimeParser

	BeforeParseWhere XWhere
	// contains filtered or unexported fields
}

Consumer 的设计是不用锁的, 它完全靠上层(Pipeline)来调度, 由 Pipeline 负责保证 Consumer 的调用是安全的

func (*Consumer) AddBatchDetailDatus

func (c *Consumer) AddBatchDetailDatus(expectedTs int64, datum []*model.DetailData)

func (*Consumer) Consume

func (c *Consumer) Consume(resp *logstream.ReadResponse, iw *inputWrapper, err error)

func (*Consumer) LoadState

func (c *Consumer) LoadState(state *consumerStateObj) error

func (*Consumer) SaveState

func (c *Consumer) SaveState() (*consumerStateObj, error)

func (*Consumer) SetOutput

func (c *Consumer) SetOutput(output output.Output)

func (*Consumer) SetStorage

func (c *Consumer) SetStorage(s *storage.Storage)

func (*Consumer) Start

func (c *Consumer) Start()

func (*Consumer) Stop

func (c *Consumer) Stop()

func (*Consumer) Update

func (c *Consumer) Update(o *Consumer)

type ConsumerStat

type ConsumerStat struct {
	IoTotal int32
	IoError int32
	// IO read empty count
	IoEmpty int32
	// IO read bytes
	Bytes int64
	// Read log lines
	Lines int32
	// Read log groups
	Groups int32
	Broken bool
	// File is missing
	Miss                   bool
	Processed              int32
	FilterBeforeParseWhere int32
	FilterLogParseError    int32
	FilterTimeParseError   int32
	FilterWhere            int32
	FilterGroup            int32
	FilterGroupMaxKeys     int32
	FilterIgnore           int32
	FilterMultiline        int32
	FilterDelay            int32
	Emit                   int32
	EmitSuccess            int32
	EmitError              int32
	// error count when agg
	AggWhereError int32
	// error count when select
	SelectError int32
	ZeroBytes   int
}

ConsumerStat holds consumer running stats. All fields are public in order to be encoded by gob.

type DataAccumulator

type DataAccumulator interface {
	AddBatchDetailDatus([]*model.DetailData)
}

type DataNode

type DataNode interface {
	GetString() string
	GetCount() int32
	GetNumber() float64
}

type DataNodeImpl

type DataNodeImpl struct {
	String string
	Value  float64
	Count  int32
}

func (*DataNodeImpl) GetCount

func (d *DataNodeImpl) GetCount() int32

func (*DataNodeImpl) GetNumber

func (d *DataNodeImpl) GetNumber() float64

func (*DataNodeImpl) GetString

func (d *DataNodeImpl) GetString() string

type DelayCalculator

type DelayCalculator struct{}

func NewDelayCalculator

func NewDelayCalculator() *DelayCalculator

type DryRunExecutor

type DryRunExecutor struct {
	RootEvent *event.Event
	// contains filtered or unexported fields
}

func NewDryRunExecutor

func NewDryRunExecutor(request *DryRunRequest) (*DryRunExecutor, error)

func (*DryRunExecutor) Run

func (e *DryRunExecutor) Run() *DryRunResponse

type DryRunRequest

type DryRunRequest struct {
	Task  *collecttask.CollectTask
	Input *Input `json:"input,omitempty"`
}

TODO 请求体改造, 考虑这个是prod发给reg的

type DryRunResponse

type DryRunResponse struct {
	Event       *event.Event   `json:"event,omitempty"`
	GroupResult []*GroupResult `json:"groupResult,omitempty"`
}

type EmitContext

type EmitContext struct{}

type GroupResult

type GroupResult struct {
	// 一个 group 表示一组相关的日志, 比如一个错误堆栈是一组日志
	// 对于非多行日志场景, 一个group只会包含一行日志
	GroupLines []string `json:"groupLines,omitempty"`
	// 是否满足(黑白名单)过滤条件, 如果为false, 那么 selectedValues 和 groupBy 都会是空的
	Paas bool `json:"paas,omitempty"`
	// 数值的切分结果
	SelectedValues map[string]interface{} `json:"selectedValues,omitempty"`
	// 维度的切分结果
	GroupBy map[string]interface{} `json:"groupBy,omitempty"`
}

type Input

type Input struct {
	// read: 上层直接提供日志原文
	// read: 需要去pod上真实读取日志原文
	Type  string       `json:"type,omitempty"`
	Plain *Input_Plain `json:"plain,omitempty"`
	Read  *Input_Read  `json:"read,omitempty"`
}

type Input_Plain

type Input_Plain struct {
	// 原始日志
	Lines []string `json:"lines,omitempty"`
	// Timezone to use when parsing timestamp of log lines.
	// Defaults to local timezone of agent.
	Timezone string `json:"timezone,omitempty"`
}

type Input_Read

type Input_Read struct {
	// 从末尾的最多读多少行
	MaxLines int `json:"maxLines,omitempty"`
}

type LogConsumer

type LogConsumer interface{}

type LogConsumerManager

type LogConsumerManager struct {
	// contains filtered or unexported fields
}

type LogContext

type LogContext struct {
	// contains filtered or unexported fields
}

日志上下文

func (*LogContext) GetColumnByIndex

func (c *LogContext) GetColumnByIndex(index int) (string, error)

func (*LogContext) GetColumnByName

func (c *LogContext) GetColumnByName(name string) (interface{}, error)

func (*LogContext) GetLine

func (c *LogContext) GetLine() string

type LogGroup

type LogGroup struct {
	// 第一行
	Line string
	// 多行case, 99%的case都只有一行
	Lines []string
}

一个日志组, 单行情况下一行日志为一个组

func (*LogGroup) Add

func (l *LogGroup) Add(line string)

func (*LogGroup) FirstLine

func (l *LogGroup) FirstLine() string

func (*LogGroup) SetOneLine

func (l *LogGroup) SetOneLine(line string)

type LogParser

type LogParser interface {
	Parse(ctx *LogContext) error
}

type LogPathDetector

type LogPathDetector struct {
	// contains filtered or unexported fields
}

检测匹配哪些路径 TODO format 要特殊处理下, 它和其他的不太一样 其他的都是lazy感应的, 而format需要实时感应

func NewLogDetector

func NewLogDetector(key string, from *collectconfig.From, target *collecttask.CollectTarget) *LogPathDetector

TODO 这东西会变化... 如果当时pod不可用 那么这个方法会失败从而忽略它的路径 如果后来pod变得可用了, 由于该方法已经执行过, 因此不会再重试...

func (*LogPathDetector) Detect

func (ld *LogPathDetector) Detect() []filematch.FatPath

type LogPipeline

type LogPipeline struct {
	// contains filtered or unexported fields
}

LogPipeline is responsible for detecting log inputs(see inputsManager) , scheduling pulling logs task, and put logs to consumer.

func NewPipeline

func NewPipeline(st *api.SubTask, s *storage.Storage, lsm *logstream.Manager) (*LogPipeline, error)

func (*LogPipeline) Key

func (p *LogPipeline) Key() string

func (*LogPipeline) LoadState

func (p *LogPipeline) LoadState(store transfer.StateStore) error

func (*LogPipeline) SetupConsumer

func (p *LogPipeline) SetupConsumer(st *api.SubTask) error

func (*LogPipeline) Start

func (p *LogPipeline) Start() error

func (*LogPipeline) Stop

func (p *LogPipeline) Stop()

func (*LogPipeline) StopAndSaveState

func (p *LogPipeline) StopAndSaveState(store transfer.StateStore) error

func (*LogPipeline) Update

func (p *LogPipeline) Update(f func(api.Pipeline))

func (*LogPipeline) View

func (p *LogPipeline) View(f func(api.Pipeline))

type LogSource

type LogSource interface {
	Start()
	Stop()
}

type LogTaskPipeline

type LogTaskPipeline struct {
	Source          LogSource
	ConsumerManager LogConsumerManager
}

type ParsedConf

type ParsedConf struct {
	*collectconfig.LogAnalysisConf
	ParsedPatterns []*ParsedPatternConf
}

type ParsedPatternConf

type ParsedPatternConf struct {
	*collectconfig.LogAnalysisPatternConf
	// contains filtered or unexported fields
}

type PeriodStatus

type PeriodStatus struct {
	Stat        ConsumerStat
	EmitSuccess bool
	EmitError   int
	Watermark   int64
}

PeriodStatus holds stats for that time period

type PullLogSource

type PullLogSource interface {
	LogSource
}

拉模式的logsource

type RunInLock

type RunInLock func(func())

RunInLock makes a func running in the write lock of the pipeline

type SubConsumer

type SubConsumer interface {
	Update(f func())
	ProcessGroup(iw *inputWrapper, ctx *LogContext, maxTs *int64)
	// Emit emits data with timestamp equals to expectedTs
	// Returns true if data is not empty
	Emit(expectedTs int64) bool

	MaybeFlush()
	// contains filtered or unexported methods
}

子消费者, 是一种实际的日志处理

type TimeParser

type TimeParser interface {
	Parse(*LogContext) (int64, error)
}

type XElect

type XElect interface {
	Init()
	Elect(ctx *LogContext) (interface{}, error)
	ElectString(ctx *LogContext) (string, error)
	ElectNumber(ctx *LogContext) (float64, error)
}

type XGroupBy

type XGroupBy interface {
	// TODO groupBy的结果存在哪里
	GroupNames() []string
	Execute(ctx *LogContext) ([]string, error)
	MaxKeySize() int
}

type XSelect

type XSelect interface {
	Select(ctx *LogContext) ([]DataNode, error)
}

Execute select

type XTransformFilter

type XTransformFilter interface {
	// Init initializes this filter instance, and returns error during initialization.
	Init() error
	// Filter transforms value
	Filter(ctx *LogContext) (interface{}, error)
}

A XTransformFilter is used to transform value.

type XWhere

type XWhere interface {
	Test(ctx *LogContext) (bool, error)
}

func MustParseWhere

func MustParseWhere(w *collectconfig.Where) XWhere

TODO temp

type XWindow

type XWindow struct {
	Interval time.Duration
}

Directories

Path Synopsis
dryrun

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL