Documentation ¶
Index ¶
- Constants
- Variables
- type Consumer
- func (c *Consumer) AddBatchDetailDatus(expectedTs int64, datum []*model.DetailData)
- func (c *Consumer) Consume(resp *logstream.ReadResponse, iw *inputWrapper, err error)
- func (c *Consumer) LoadState(state *consumerStateObj) error
- func (c *Consumer) SaveState() (*consumerStateObj, error)
- func (c *Consumer) SetOutput(output output.Output)
- func (c *Consumer) SetStorage(s *storage.Storage)
- func (c *Consumer) Start()
- func (c *Consumer) Stop()
- func (c *Consumer) Update(o *Consumer)
- type ConsumerStat
- type DataAccumulator
- type DataNode
- type DataNodeImpl
- type DelayCalculator
- type DryRunExecutor
- type DryRunRequest
- type DryRunResponse
- type EmitContext
- type GroupResult
- type Input
- type Input_Plain
- type Input_Read
- type LogConsumer
- type LogConsumerManager
- type LogContext
- type LogGroup
- type LogParser
- type LogPathDetector
- type LogPipeline
- func (p *LogPipeline) Key() string
- func (p *LogPipeline) LoadState(store transfer.StateStore) error
- func (p *LogPipeline) SetupConsumer(st *api.SubTask) error
- func (p *LogPipeline) Start() error
- func (p *LogPipeline) Stop()
- func (p *LogPipeline) StopAndSaveState(store transfer.StateStore) error
- func (p *LogPipeline) Update(f func(api.Pipeline))
- func (p *LogPipeline) View(f func(api.Pipeline))
- type LogSource
- type LogTaskPipeline
- type ParsedConf
- type ParsedPatternConf
- type PeriodStatus
- type PullLogSource
- type RunInLock
- type SubConsumer
- type TimeParser
- type XElect
- type XGroupBy
- type XSelect
- type XTransformFilter
- type XWhere
- type XWindow
Constants ¶
View Source
const ( TypeAuto = "auto" TypeProcessTime = "processTime" TypeElect = "elect" FormatUnix = "unix" FormatUnixMilli = "unixMilli" FormatGolangLayout = "golangLayout" FormatAuto = "auto" TimeParseError int64 = -2 )
View Source
const (
// ReportTaskInfoInterval indicates the time interval of reporting task info.
ReportTaskInfoInterval = 10
)
Variables ¶
View Source
var ErrIndexOutOfBound = errors.New("index ouf of bound")
View Source
var (
LogParseNotMatched = errors.New("LogParseNotMatched")
)
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct { Select XSelect Where XWhere GroupBy XGroupBy Window *XWindow LogParser LogParser TimeParser TimeParser BeforeParseWhere XWhere // contains filtered or unexported fields }
Consumer 的设计是不用锁的, 它完全靠上层(Pipeline)来调度, 由 Pipeline 负责保证 Consumer 的调用是安全的
func (*Consumer) AddBatchDetailDatus ¶
func (c *Consumer) AddBatchDetailDatus(expectedTs int64, datum []*model.DetailData)
func (*Consumer) Consume ¶
func (c *Consumer) Consume(resp *logstream.ReadResponse, iw *inputWrapper, err error)
func (*Consumer) SetStorage ¶
type ConsumerStat ¶
type ConsumerStat struct { IoTotal int32 IoError int32 // IO read empty count IoEmpty int32 // IO read bytes Bytes int64 // Read log lines Lines int32 // Read log groups Groups int32 Broken bool // File is missing Miss bool Processed int32 FilterBeforeParseWhere int32 FilterLogParseError int32 FilterTimeParseError int32 FilterWhere int32 FilterGroup int32 FilterGroupMaxKeys int32 FilterIgnore int32 FilterMultiline int32 FilterDelay int32 Emit int32 EmitSuccess int32 EmitError int32 // error count when agg AggWhereError int32 // error count when select SelectError int32 ZeroBytes int }
ConsumerStat holds consumer running stats. All fields are public in order to be encoded by gob.
type DataAccumulator ¶
type DataAccumulator interface {
AddBatchDetailDatus([]*model.DetailData)
}
type DataNodeImpl ¶
func (*DataNodeImpl) GetCount ¶
func (d *DataNodeImpl) GetCount() int32
func (*DataNodeImpl) GetNumber ¶
func (d *DataNodeImpl) GetNumber() float64
func (*DataNodeImpl) GetString ¶
func (d *DataNodeImpl) GetString() string
type DelayCalculator ¶
type DelayCalculator struct{}
func NewDelayCalculator ¶
func NewDelayCalculator() *DelayCalculator
type DryRunExecutor ¶
func NewDryRunExecutor ¶
func NewDryRunExecutor(request *DryRunRequest) (*DryRunExecutor, error)
func (*DryRunExecutor) Run ¶
func (e *DryRunExecutor) Run() *DryRunResponse
type DryRunRequest ¶
type DryRunRequest struct { Task *collecttask.CollectTask Input *Input `json:"input,omitempty"` }
TODO 请求体改造, 考虑这个是prod发给reg的
type DryRunResponse ¶
type DryRunResponse struct { Event *event.Event `json:"event,omitempty"` GroupResult []*GroupResult `json:"groupResult,omitempty"` }
type EmitContext ¶
type EmitContext struct{}
type GroupResult ¶
type GroupResult struct { // 一个 group 表示一组相关的日志, 比如一个错误堆栈是一组日志 // 对于非多行日志场景, 一个group只会包含一行日志 GroupLines []string `json:"groupLines,omitempty"` // 是否满足(黑白名单)过滤条件, 如果为false, 那么 selectedValues 和 groupBy 都会是空的 Paas bool `json:"paas,omitempty"` // 数值的切分结果 SelectedValues map[string]interface{} `json:"selectedValues,omitempty"` // 维度的切分结果 GroupBy map[string]interface{} `json:"groupBy,omitempty"` }
type Input ¶
type Input struct { // read: 上层直接提供日志原文 // read: 需要去pod上真实读取日志原文 Type string `json:"type,omitempty"` Plain *Input_Plain `json:"plain,omitempty"` Read *Input_Read `json:"read,omitempty"` }
type Input_Plain ¶
type Input_Read ¶
type Input_Read struct { // 从末尾的最多读多少行 MaxLines int `json:"maxLines,omitempty"` }
type LogConsumer ¶
type LogConsumer interface{}
type LogConsumerManager ¶
type LogConsumerManager struct {
// contains filtered or unexported fields
}
type LogContext ¶
type LogContext struct {
// contains filtered or unexported fields
}
日志上下文
func (*LogContext) GetColumnByIndex ¶
func (c *LogContext) GetColumnByIndex(index int) (string, error)
func (*LogContext) GetColumnByName ¶
func (c *LogContext) GetColumnByName(name string) (interface{}, error)
func (*LogContext) GetLine ¶
func (c *LogContext) GetLine() string
type LogParser ¶
type LogParser interface {
Parse(ctx *LogContext) error
}
type LogPathDetector ¶
type LogPathDetector struct {
// contains filtered or unexported fields
}
检测匹配哪些路径 TODO format 要特殊处理下, 它和其他的不太一样 其他的都是lazy感应的, 而format需要实时感应
func NewLogDetector ¶
func NewLogDetector(key string, from *collectconfig.From, target *collecttask.CollectTarget) *LogPathDetector
TODO 这东西会变化... 如果当时pod不可用 那么这个方法会失败从而忽略它的路径 如果后来pod变得可用了, 由于该方法已经执行过, 因此不会再重试...
func (*LogPathDetector) Detect ¶
func (ld *LogPathDetector) Detect() []filematch.FatPath
type LogPipeline ¶
type LogPipeline struct {
// contains filtered or unexported fields
}
LogPipeline is responsible for detecting log inputs(see inputsManager) , scheduling pulling logs task, and put logs to consumer.
func NewPipeline ¶
func (*LogPipeline) Key ¶
func (p *LogPipeline) Key() string
func (*LogPipeline) LoadState ¶
func (p *LogPipeline) LoadState(store transfer.StateStore) error
func (*LogPipeline) SetupConsumer ¶
func (p *LogPipeline) SetupConsumer(st *api.SubTask) error
func (*LogPipeline) Start ¶
func (p *LogPipeline) Start() error
func (*LogPipeline) Stop ¶
func (p *LogPipeline) Stop()
func (*LogPipeline) StopAndSaveState ¶
func (p *LogPipeline) StopAndSaveState(store transfer.StateStore) error
func (*LogPipeline) Update ¶
func (p *LogPipeline) Update(f func(api.Pipeline))
func (*LogPipeline) View ¶
func (p *LogPipeline) View(f func(api.Pipeline))
type LogTaskPipeline ¶
type LogTaskPipeline struct { Source LogSource ConsumerManager LogConsumerManager }
type ParsedConf ¶
type ParsedConf struct { *collectconfig.LogAnalysisConf ParsedPatterns []*ParsedPatternConf }
type ParsedPatternConf ¶
type ParsedPatternConf struct { *collectconfig.LogAnalysisPatternConf // contains filtered or unexported fields }
type PeriodStatus ¶
type PeriodStatus struct { Stat ConsumerStat EmitSuccess bool EmitError int Watermark int64 }
PeriodStatus holds stats for that time period
type RunInLock ¶
type RunInLock func(func())
RunInLock makes a func running in the write lock of the pipeline
type SubConsumer ¶
type SubConsumer interface { Update(f func()) ProcessGroup(iw *inputWrapper, ctx *LogContext, maxTs *int64) // Emit emits data with timestamp equals to expectedTs // Returns true if data is not empty Emit(expectedTs int64) bool MaybeFlush() // contains filtered or unexported methods }
子消费者, 是一种实际的日志处理
type TimeParser ¶
type TimeParser interface {
Parse(*LogContext) (int64, error)
}
type XElect ¶
type XElect interface { Init() Elect(ctx *LogContext) (interface{}, error) ElectString(ctx *LogContext) (string, error) ElectNumber(ctx *LogContext) (float64, error) }
type XGroupBy ¶
type XGroupBy interface { // TODO groupBy的结果存在哪里 GroupNames() []string Execute(ctx *LogContext) ([]string, error) MaxKeySize() int }
type XSelect ¶
type XSelect interface {
Select(ctx *LogContext) ([]DataNode, error)
}
Execute select
type XTransformFilter ¶
type XTransformFilter interface { // Init initializes this filter instance, and returns error during initialization. Init() error // Filter transforms value Filter(ctx *LogContext) (interface{}, error) }
A XTransformFilter is used to transform value.
type XWhere ¶
type XWhere interface {
Test(ctx *LogContext) (bool, error)
}
Source Files ¶
- consumer.go
- consumer_log_multiline.go
- consumer_log_sub.go
- consumer_log_sub_analysis.go
- consumer_log_sub_detail.go
- consumer_log_sub_stat.go
- consumer_parser.go
- delay.go
- elect.go
- elect_context.go
- elect_leftright.go
- elect_line.go
- elect_pathvar.go
- elect_refindex.go
- elect_refmeta.go
- elect_refname.go
- elect_refvar.go
- elect_regexp.go
- elect_transform.go
- executor_dryrun.go
- executor_log.go
- from.go
- from_log.go
- group.go
- inputsmanager.go
- log_detector.go
- logparser.go
- logparser_grok.go
- logparser_json.go
- logparser_regexp.go
- multiline.go
- pipeline.go
- pool.go
- select.go
- time.go
- time_elect_auto.go
- time_elect_golanglayout.go
- time_elect_unix.go
- time_elect_unix_milli.go
- time_processtime.go
- transform.go
- transform_append_v1.go
- transform_cleanurl_v1.go
- transform_const.go
- transform_discard.go
- transform_mapping_v1.go
- transform_regexp_v1.go
- transform_substring_v1.go
- transform_switchcase_v1.go
- v1.go
- v2.go
- vars.go
- where.go
- where_alwaystrue.go
- where_boolean.go
- where_contains.go
- where_in.go
- where_numberop.go
- where_regexp.go
- window.go
Click to show internal directories.
Click to hide internal directories.