intermediate

package
v0.5.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2021 License: Apache-2.0 Imports: 9 Imported by: 6

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	MaxRetries    = 2
	MinExpiryTime = 100 * time.Millisecond
)

Functions

This section is empty.

Types

type AggregationElements added in v0.4.0

type AggregationElements struct {
	NonStatsElements                   []string
	StatsElements                      []string
	AggregatedSourceStatsElements      []string
	AggregatedDestinationStatsElements []string
}

type AggregationFlowRecord added in v0.4.0

type AggregationFlowRecord struct {
	Record entities.Record
	// Flow record contains mapping to its reference in priority queue.
	PriorityQueueItem *ItemToExpire
	// ReadyToSend is an indicator that we received all required records for the
	// given flow, i.e., records from source and destination nodes for the case
	// inter-node flow and record from the node for the case of intra-node flow.
	ReadyToSend bool
	// contains filtered or unexported fields
}

type AggregationInput added in v0.4.0

type AggregationInput struct {
	MessageChan           chan *entities.Message
	WorkerNum             int
	CorrelateFields       []string
	AggregateElements     *AggregationElements
	ActiveExpiryTimeout   time.Duration
	InactiveExpiryTimeout time.Duration
}

type AggregationProcess added in v0.3.1

type AggregationProcess struct {
	// contains filtered or unexported fields
}

func InitAggregationProcess

func InitAggregationProcess(input AggregationInput) (*AggregationProcess, error)

InitAggregationProcess takes in message channel (e.g. from collector) as input channel, workerNum(number of workers to process message), and correlateFields(fields to be correlated and filled).

func (*AggregationProcess) AggregateMsgByFlowKey added in v0.3.1

func (a *AggregationProcess) AggregateMsgByFlowKey(message *entities.Message) error

AggregateMsgByFlowKey gets flow key from records in message and stores in cache

func (*AggregationProcess) AreCorrelatedFieldsFilled added in v0.5.3

func (a *AggregationProcess) AreCorrelatedFieldsFilled(record AggregationFlowRecord) bool

func (*AggregationProcess) AreExternalFieldsFilled added in v0.5.3

func (a *AggregationProcess) AreExternalFieldsFilled(record AggregationFlowRecord) bool

func (*AggregationProcess) ForAllExpiredFlowRecordsDo added in v0.5.1

func (a *AggregationProcess) ForAllExpiredFlowRecordsDo(callback FlowKeyRecordMapCallBack) error

func (*AggregationProcess) ForAllRecordsDo added in v0.3.1

func (a *AggregationProcess) ForAllRecordsDo(callback FlowKeyRecordMapCallBack) error

ForAllRecordsDo takes in callback function to process the operations to flowkey->records pairs in the map

func (*AggregationProcess) GetExpiryFromExpirePriorityQueue added in v0.5.1

func (a *AggregationProcess) GetExpiryFromExpirePriorityQueue() time.Duration

GetExpiryFromExpirePriorityQueue returns the earliest timestamp (active expiry or inactive expiry) from expire priority queue.

func (*AggregationProcess) GetNumberOfFlows added in v0.5.8

func (a *AggregationProcess) GetNumberOfFlows() uint64

GetNumberOfFlows returns total number of connections/flows stored in map

func (*AggregationProcess) IsAggregatedRecordIPv4 added in v0.5.3

func (a *AggregationProcess) IsAggregatedRecordIPv4(record AggregationFlowRecord) bool

func (*AggregationProcess) ResetStatElementsInRecord added in v0.5.1

func (a *AggregationProcess) ResetStatElementsInRecord(record entities.Record) error

ResetStatElementsInRecord is called by the user after the aggregation record is sent after its expiry either by active or inactive expiry interval. This should be called by user after acquiring the mutex in the Aggregation process.

func (*AggregationProcess) SetCorrelatedFieldsFilled added in v0.5.3

func (a *AggregationProcess) SetCorrelatedFieldsFilled(record *AggregationFlowRecord, isFilled bool)

func (*AggregationProcess) SetExternalFieldsFilled added in v0.5.3

func (a *AggregationProcess) SetExternalFieldsFilled(record *AggregationFlowRecord, isFilled bool)

func (*AggregationProcess) Start added in v0.3.1

func (a *AggregationProcess) Start()

func (*AggregationProcess) Stop added in v0.3.1

func (a *AggregationProcess) Stop()

type FlowKey added in v0.3.1

type FlowKey struct {
	SourceAddress      string
	DestinationAddress string
	Protocol           uint8
	SourcePort         uint16
	DestinationPort    uint16
}

type FlowKeyRecordMapCallBack added in v0.3.1

type FlowKeyRecordMapCallBack func(key FlowKey, record *AggregationFlowRecord) error

type ItemToExpire added in v0.5.1

type ItemToExpire struct {
	// contains filtered or unexported fields
}

type TimeToExpirePriorityQueue added in v0.5.1

type TimeToExpirePriorityQueue []*ItemToExpire

func (TimeToExpirePriorityQueue) Len added in v0.5.1

func (pq TimeToExpirePriorityQueue) Len() int

func (TimeToExpirePriorityQueue) Less added in v0.5.1

func (pq TimeToExpirePriorityQueue) Less(i, j int) bool

func (TimeToExpirePriorityQueue) Peek added in v0.5.1

Peek returns the item at the beginning of the queue, without removing the item or otherwise mutating the queue. It is safe to call directly.

func (*TimeToExpirePriorityQueue) Pop added in v0.5.1

func (pq *TimeToExpirePriorityQueue) Pop() interface{}

func (*TimeToExpirePriorityQueue) Push added in v0.5.1

func (pq *TimeToExpirePriorityQueue) Push(x interface{})

func (TimeToExpirePriorityQueue) Swap added in v0.5.1

func (pq TimeToExpirePriorityQueue) Swap(i, j int)

func (*TimeToExpirePriorityQueue) Update added in v0.5.1

func (pq *TimeToExpirePriorityQueue) Update(item *ItemToExpire, flowKey *FlowKey, flowRecord *AggregationFlowRecord, activeExpireTime time.Time, inactiveExpireTime time.Time)

update modifies the priority and flow record of an Item in the queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL