Documentation ¶
Index ¶
- Constants
- type AggregatorBase
- func (p *AggregatorBase) Add(log *protocol.Log, ctx map[string]interface{}) error
- func (*AggregatorBase) Description() string
- func (p *AggregatorBase) Flush() []*protocol.LogGroup
- func (p *AggregatorBase) GetResult(ctx pipeline.PipelineContext) error
- func (p *AggregatorBase) Init(context pipeline.Context, que pipeline.LogGroupQueue) (int, error)
- func (p *AggregatorBase) InitInner(packFlag bool, packString string, lock *sync.Mutex, logstore string, ...)
- func (p *AggregatorBase) Record(event *models.PipelineGroupEvents, ctx pipeline.PipelineContext) error
- func (p *AggregatorBase) Reset()
Constants ¶
const ( MaxLogCount = 1024 MaxLogGroupSize = 3 * 1024 * 1024 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AggregatorBase ¶
type AggregatorBase struct { MaxLogGroupCount int // the maximum log group count to trigger flush operation MaxLogCount int // the maximum log in a log group PackFlag bool // whether to add config name as a tag Topic string // the output topic Lock *sync.Mutex // contains filtered or unexported fields }
Other aggregators can use AggregatorBase as base aggregator.
For inner usage, note about following information. There is a quick flush design in AggregatorBase, which is implemented in Add method (search p.queue.Add in current file). Therefore, not all LogGroups are returned through Flush method. If you want to do some operations (such as adding tags) on LogGroups returned by AggregatorBase in your own aggregator, you should do some extra works, just see the sample code in doc.go.
func NewAggregatorBase ¶
func NewAggregatorBase() *AggregatorBase
NewAggregatorBase create a default aggregator with default value.
func (*AggregatorBase) Add ¶
func (p *AggregatorBase) Add(log *protocol.Log, ctx map[string]interface{}) error
Add adds @log to aggregator. It uses defaultLogGroup to store log groups which contain logs as following: defaultLogGroup => [LG1: log1->log2->log3] -> [LG2: log1->log2->log3] -> .. The last log group is set as nowLogGroup, @log will be appended to nowLogGroup if the size and log count of the log group don't exceed limits (MaxLogCount and MAX_LOG_GROUP_SIZE). When nowLogGroup exceeds limits, Add creates a new log group and switch nowLogGroup to it, then append @log to it. When the count of log group reaches MaxLogGroupCount, the first log group will be popped from defaultLogGroup list and add to queue (after adding pack_id tag). Add returns any error encountered, nil means success.
@return error. **For inner usage, must handle this error!!!!**
func (*AggregatorBase) Description ¶
func (*AggregatorBase) Description() string
func (*AggregatorBase) GetResult ¶ added in v1.4.0
func (p *AggregatorBase) GetResult(ctx pipeline.PipelineContext) error
GetResult the current aggregates to the accumulator.
func (*AggregatorBase) Init ¶
func (p *AggregatorBase) Init(context pipeline.Context, que pipeline.LogGroupQueue) (int, error)
Init method would be trigger before working. 1. context store the metadata of this Logstore config 2. que is a transfer channel for flushing LogGroup when reaches the maximum in the cache.
func (*AggregatorBase) InitInner ¶
func (p *AggregatorBase) InitInner(packFlag bool, packString string, lock *sync.Mutex, logstore string, topic string, maxLogCount int, maxLoggroupCount int)
InitInner initializes instance for other aggregators.
func (*AggregatorBase) Record ¶ added in v1.4.0
func (p *AggregatorBase) Record(event *models.PipelineGroupEvents, ctx pipeline.PipelineContext) error