Documentation ¶
Index ¶
- Variables
- type AggregationElements
- type AggregationFlowRecord
- type AggregationInput
- type AggregationProcess
- func (a *AggregationProcess) AggregateMsgByFlowKey(message *entities.Message) error
- func (a *AggregationProcess) AreCorrelatedFieldsFilled(record AggregationFlowRecord) bool
- func (a *AggregationProcess) AreExternalFieldsFilled(record AggregationFlowRecord) bool
- func (a *AggregationProcess) ForAllExpiredFlowRecordsDo(callback FlowKeyRecordMapCallBack) error
- func (a *AggregationProcess) ForAllRecordsDo(callback FlowKeyRecordMapCallBack) error
- func (a *AggregationProcess) GetExpiryFromExpirePriorityQueue() time.Duration
- func (a *AggregationProcess) GetNumFlows() int64
- func (a *AggregationProcess) GetRecords(flowKey *FlowKey) []map[string]interface{}
- func (a *AggregationProcess) IsAggregatedRecordIPv4(record AggregationFlowRecord) bool
- func (a *AggregationProcess) ResetStatAndThroughputElementsInRecord(record entities.Record) error
- func (a *AggregationProcess) SetCorrelatedFieldsFilled(record *AggregationFlowRecord, isFilled bool)
- func (a *AggregationProcess) SetExternalFieldsFilled(record *AggregationFlowRecord, isFilled bool)
- func (a *AggregationProcess) Start()
- func (a *AggregationProcess) Stop()
- type FlowKey
- type FlowKeyRecordMapCallBack
- type ItemToExpire
- type TimeToExpirePriorityQueue
- func (pq TimeToExpirePriorityQueue) Len() int
- func (pq TimeToExpirePriorityQueue) Less(i, j int) bool
- func (pq TimeToExpirePriorityQueue) Peek() *ItemToExpire
- func (pq *TimeToExpirePriorityQueue) Pop() interface{}
- func (pq *TimeToExpirePriorityQueue) Push(x interface{})
- func (pq TimeToExpirePriorityQueue) Swap(i, j int)
- func (pq *TimeToExpirePriorityQueue) Update(item *ItemToExpire, flowKey *FlowKey, flowRecord *AggregationFlowRecord, ...)
Constants ¶
This section is empty.
Variables ¶
var ( MaxRetries = 2 MinExpiryTime = 100 * time.Millisecond )
Functions ¶
This section is empty.
Types ¶
type AggregationElements ¶
type AggregationElements struct { NonStatsElements []string StatsElements []string AggregatedSourceStatsElements []string AggregatedDestinationStatsElements []string AntreaFlowEndSecondsElements []string ThroughputElements []string SourceThroughputElements []string DestinationThroughputElements []string }
type AggregationFlowRecord ¶
type AggregationFlowRecord struct { Record entities.Record // Flow record contains mapping to its reference in priority queue. PriorityQueueItem *ItemToExpire // ReadyToSend is an indicator that we received all required records for the // given flow, i.e., records from source and destination nodes for the case // inter-node flow and record from the node for the case of intra-node flow. ReadyToSend bool // contains filtered or unexported fields }
type AggregationInput ¶
type AggregationProcess ¶
type AggregationProcess struct {
// contains filtered or unexported fields
}
func InitAggregationProcess ¶
func InitAggregationProcess(input AggregationInput) (*AggregationProcess, error)
InitAggregationProcess takes in message channel (e.g. from collector) as input channel, workerNum(number of workers to process message), and correlateFields(fields to be correlated and filled).
func (*AggregationProcess) AggregateMsgByFlowKey ¶
func (a *AggregationProcess) AggregateMsgByFlowKey(message *entities.Message) error
AggregateMsgByFlowKey gets flow key from records in message and stores in cache
func (*AggregationProcess) AreCorrelatedFieldsFilled ¶
func (a *AggregationProcess) AreCorrelatedFieldsFilled(record AggregationFlowRecord) bool
func (*AggregationProcess) AreExternalFieldsFilled ¶
func (a *AggregationProcess) AreExternalFieldsFilled(record AggregationFlowRecord) bool
func (*AggregationProcess) ForAllExpiredFlowRecordsDo ¶
func (a *AggregationProcess) ForAllExpiredFlowRecordsDo(callback FlowKeyRecordMapCallBack) error
func (*AggregationProcess) ForAllRecordsDo ¶
func (a *AggregationProcess) ForAllRecordsDo(callback FlowKeyRecordMapCallBack) error
ForAllRecordsDo takes in callback function to process the operations to flowkey->records pairs in the map
func (*AggregationProcess) GetExpiryFromExpirePriorityQueue ¶
func (a *AggregationProcess) GetExpiryFromExpirePriorityQueue() time.Duration
GetExpiryFromExpirePriorityQueue returns the earliest timestamp (active expiry or inactive expiry) from expire priority queue.
func (*AggregationProcess) GetNumFlows ¶
func (a *AggregationProcess) GetNumFlows() int64
GetNumFlows returns total number of connections/flows stored in map
func (*AggregationProcess) GetRecords ¶
func (a *AggregationProcess) GetRecords(flowKey *FlowKey) []map[string]interface{}
GetRecords returns map format flow records given a flow key. The key of the map is the element name and the value is the IE object. Returns partially matched flow records if the flow key is not complete. Returns all the flow records if the flow key is not provided.
func (*AggregationProcess) IsAggregatedRecordIPv4 ¶
func (a *AggregationProcess) IsAggregatedRecordIPv4(record AggregationFlowRecord) bool
func (*AggregationProcess) ResetStatAndThroughputElementsInRecord ¶
func (a *AggregationProcess) ResetStatAndThroughputElementsInRecord(record entities.Record) error
ResetStatAndThroughputElementsInRecord is called by the user after the aggregation record is sent after its expiry either by active or inactive expiry interval. This should be called by user after acquiring the mutex in the Aggregation process.
func (*AggregationProcess) SetCorrelatedFieldsFilled ¶
func (a *AggregationProcess) SetCorrelatedFieldsFilled(record *AggregationFlowRecord, isFilled bool)
func (*AggregationProcess) SetExternalFieldsFilled ¶
func (a *AggregationProcess) SetExternalFieldsFilled(record *AggregationFlowRecord, isFilled bool)
func (*AggregationProcess) Start ¶
func (a *AggregationProcess) Start()
func (*AggregationProcess) Stop ¶
func (a *AggregationProcess) Stop()
type FlowKeyRecordMapCallBack ¶
type FlowKeyRecordMapCallBack func(key FlowKey, record *AggregationFlowRecord) error
type ItemToExpire ¶
type ItemToExpire struct {
// contains filtered or unexported fields
}
type TimeToExpirePriorityQueue ¶
type TimeToExpirePriorityQueue []*ItemToExpire
func (TimeToExpirePriorityQueue) Len ¶
func (pq TimeToExpirePriorityQueue) Len() int
func (TimeToExpirePriorityQueue) Less ¶
func (pq TimeToExpirePriorityQueue) Less(i, j int) bool
func (TimeToExpirePriorityQueue) Peek ¶
func (pq TimeToExpirePriorityQueue) Peek() *ItemToExpire
Peek returns the item at the beginning of the queue, without removing the item or otherwise mutating the queue. It is safe to call directly.
func (*TimeToExpirePriorityQueue) Pop ¶
func (pq *TimeToExpirePriorityQueue) Pop() interface{}
func (*TimeToExpirePriorityQueue) Push ¶
func (pq *TimeToExpirePriorityQueue) Push(x interface{})
func (TimeToExpirePriorityQueue) Swap ¶
func (pq TimeToExpirePriorityQueue) Swap(i, j int)
func (*TimeToExpirePriorityQueue) Update ¶
func (pq *TimeToExpirePriorityQueue) Update(item *ItemToExpire, flowKey *FlowKey, flowRecord *AggregationFlowRecord, activeExpireTime time.Time, inactiveExpireTime time.Time)
update modifies the priority and flow record of an Item in the queue.