Documentation ¶
Index ¶
- Variables
- type AggregationElements
- type AggregationFlowRecord
- type AggregationInput
- type AggregationProcess
- func (a *AggregationProcess) AggregateMsgByFlowKey(message *entities.Message) error
- func (a *AggregationProcess) AreCorrelatedFieldsFilled(record AggregationFlowRecord) bool
- func (a *AggregationProcess) AreExternalFieldsFilled(record AggregationFlowRecord) bool
- func (a *AggregationProcess) ForAllExpiredFlowRecordsDo(callback FlowKeyRecordMapCallBack) error
- func (a *AggregationProcess) ForAllRecordsDo(callback FlowKeyRecordMapCallBack) error
- func (a *AggregationProcess) GetExpiryFromExpirePriorityQueue() time.Duration
- func (a *AggregationProcess) IsAggregatedRecordIPv4(record AggregationFlowRecord) bool
- func (a *AggregationProcess) ResetStatElementsInRecord(record entities.Record) error
- func (a *AggregationProcess) SetCorrelatedFieldsFilled(record *AggregationFlowRecord, isFilled bool)
- func (a *AggregationProcess) SetExternalFieldsFilled(record *AggregationFlowRecord, isFilled bool)
- func (a *AggregationProcess) Start()
- func (a *AggregationProcess) Stop()
- type FlowKey
- type FlowKeyRecordMapCallBack
- type ItemToExpire
- type TimeToExpirePriorityQueue
- func (pq TimeToExpirePriorityQueue) Len() int
- func (pq TimeToExpirePriorityQueue) Less(i, j int) bool
- func (pq TimeToExpirePriorityQueue) Peek() *ItemToExpire
- func (pq *TimeToExpirePriorityQueue) Pop() interface{}
- func (pq *TimeToExpirePriorityQueue) Push(x interface{})
- func (pq TimeToExpirePriorityQueue) Swap(i, j int)
- func (pq *TimeToExpirePriorityQueue) Update(item *ItemToExpire, flowKey *FlowKey, flowRecord *AggregationFlowRecord, ...)
Constants ¶
This section is empty.
Variables ¶
var ( MaxRetries = 2 MinExpiryTime = 100 * time.Millisecond )
Functions ¶
This section is empty.
Types ¶
type AggregationElements ¶ added in v0.4.0
type AggregationFlowRecord ¶ added in v0.4.0
type AggregationFlowRecord struct { Record entities.Record // Flow record contains mapping to its reference in priority queue. PriorityQueueItem *ItemToExpire // ReadyToSend is an indicator that we received all required records for the // given flow, i.e., records from source and destination nodes for the case // inter-node flow and record from the node for the case of intra-node flow. ReadyToSend bool // contains filtered or unexported fields }
type AggregationInput ¶ added in v0.4.0
type AggregationProcess ¶ added in v0.3.1
type AggregationProcess struct {
// contains filtered or unexported fields
}
func InitAggregationProcess ¶
func InitAggregationProcess(input AggregationInput) (*AggregationProcess, error)
InitAggregationProcess takes in message channel (e.g. from collector) as input channel, workerNum(number of workers to process message), and correlateFields(fields to be correlated and filled).
func (*AggregationProcess) AggregateMsgByFlowKey ¶ added in v0.3.1
func (a *AggregationProcess) AggregateMsgByFlowKey(message *entities.Message) error
AggregateMsgByFlowKey gets flow key from records in message and stores in cache
func (*AggregationProcess) AreCorrelatedFieldsFilled ¶ added in v0.5.3
func (a *AggregationProcess) AreCorrelatedFieldsFilled(record AggregationFlowRecord) bool
func (*AggregationProcess) AreExternalFieldsFilled ¶ added in v0.5.3
func (a *AggregationProcess) AreExternalFieldsFilled(record AggregationFlowRecord) bool
func (*AggregationProcess) ForAllExpiredFlowRecordsDo ¶ added in v0.5.1
func (a *AggregationProcess) ForAllExpiredFlowRecordsDo(callback FlowKeyRecordMapCallBack) error
func (*AggregationProcess) ForAllRecordsDo ¶ added in v0.3.1
func (a *AggregationProcess) ForAllRecordsDo(callback FlowKeyRecordMapCallBack) error
ForAllRecordsDo takes in callback function to process the operations to flowkey->records pairs in the map
func (*AggregationProcess) GetExpiryFromExpirePriorityQueue ¶ added in v0.5.1
func (a *AggregationProcess) GetExpiryFromExpirePriorityQueue() time.Duration
GetExpiryFromExpirePriorityQueue returns the earliest timestamp (active expiry or inactive expiry) from expire priority queue.
func (*AggregationProcess) IsAggregatedRecordIPv4 ¶ added in v0.5.3
func (a *AggregationProcess) IsAggregatedRecordIPv4(record AggregationFlowRecord) bool
func (*AggregationProcess) ResetStatElementsInRecord ¶ added in v0.5.1
func (a *AggregationProcess) ResetStatElementsInRecord(record entities.Record) error
ResetStatElementsInRecord is called by the user after the aggregation record is sent after its expiry either by active or inactive expiry interval. This should be called by user after acquiring the mutex in the Aggregation process.
func (*AggregationProcess) SetCorrelatedFieldsFilled ¶ added in v0.5.3
func (a *AggregationProcess) SetCorrelatedFieldsFilled(record *AggregationFlowRecord, isFilled bool)
func (*AggregationProcess) SetExternalFieldsFilled ¶ added in v0.5.3
func (a *AggregationProcess) SetExternalFieldsFilled(record *AggregationFlowRecord, isFilled bool)
func (*AggregationProcess) Start ¶ added in v0.3.1
func (a *AggregationProcess) Start()
func (*AggregationProcess) Stop ¶ added in v0.3.1
func (a *AggregationProcess) Stop()
type FlowKeyRecordMapCallBack ¶ added in v0.3.1
type FlowKeyRecordMapCallBack func(key FlowKey, record *AggregationFlowRecord) error
type ItemToExpire ¶ added in v0.5.1
type ItemToExpire struct {
// contains filtered or unexported fields
}
type TimeToExpirePriorityQueue ¶ added in v0.5.1
type TimeToExpirePriorityQueue []*ItemToExpire
func (TimeToExpirePriorityQueue) Len ¶ added in v0.5.1
func (pq TimeToExpirePriorityQueue) Len() int
func (TimeToExpirePriorityQueue) Less ¶ added in v0.5.1
func (pq TimeToExpirePriorityQueue) Less(i, j int) bool
func (TimeToExpirePriorityQueue) Peek ¶ added in v0.5.1
func (pq TimeToExpirePriorityQueue) Peek() *ItemToExpire
Peek returns the item at the beginning of the queue, without removing the item or otherwise mutating the queue. It is safe to call directly.
func (*TimeToExpirePriorityQueue) Pop ¶ added in v0.5.1
func (pq *TimeToExpirePriorityQueue) Pop() interface{}
func (*TimeToExpirePriorityQueue) Push ¶ added in v0.5.1
func (pq *TimeToExpirePriorityQueue) Push(x interface{})
func (TimeToExpirePriorityQueue) Swap ¶ added in v0.5.1
func (pq TimeToExpirePriorityQueue) Swap(i, j int)
func (*TimeToExpirePriorityQueue) Update ¶ added in v0.5.1
func (pq *TimeToExpirePriorityQueue) Update(item *ItemToExpire, flowKey *FlowKey, flowRecord *AggregationFlowRecord, activeExpireTime time.Time, inactiveExpireTime time.Time)
update modifies the priority and flow record of an Item in the queue.