Documentation ¶
Index ¶
- type AggregationElements
- type AggregationFlowRecord
- type AggregationInput
- type AggregationProcess
- func (a *AggregationProcess) AggregateMsgByFlowKey(message *entities.Message) error
- func (a *AggregationProcess) DeleteFlowKeyFromMapWithLock(flowKey FlowKey)
- func (a *AggregationProcess) DeleteFlowKeyFromMapWithoutLock(flowKey FlowKey)
- func (a *AggregationProcess) ForAllRecordsDo(callback FlowKeyRecordMapCallBack) error
- func (a *AggregationProcess) Start()
- func (a *AggregationProcess) Stop()
- type FlowKey
- type FlowKeyRecordMapCallBack
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AggregationElements ¶ added in v0.4.0
type AggregationFlowRecord ¶ added in v0.4.0
type AggregationFlowRecord struct { Record entities.Record // ReadyToSend is an indicator that we received all required records for the // given flow, i.e., records from source and destination nodes for the case // inter-node flow and record from the node for the case of intra-node flow. ReadyToSend bool // IsActive is a flag that indicates whether the flow is active or not. If // aggregation process stop receiving flows from collector process, we deem // the flow as inactive. IsActive bool }
type AggregationInput ¶ added in v0.4.0
type AggregationInput struct { MessageChan chan *entities.Message WorkerNum int CorrelateFields []string AggregateElements *AggregationElements }
type AggregationProcess ¶ added in v0.3.1
type AggregationProcess struct {
// contains filtered or unexported fields
}
func InitAggregationProcess ¶
func InitAggregationProcess(input AggregationInput) (*AggregationProcess, error)
InitAggregationProcess takes in message channel (e.g. from collector) as input channel, workerNum(number of workers to process message), and correlateFields(fields to be correlated and filled).
func (*AggregationProcess) AggregateMsgByFlowKey ¶ added in v0.3.1
func (a *AggregationProcess) AggregateMsgByFlowKey(message *entities.Message) error
AggregateMsgByFlowKey gets flow key from records in message and stores in cache
func (*AggregationProcess) DeleteFlowKeyFromMapWithLock ¶ added in v0.3.2
func (a *AggregationProcess) DeleteFlowKeyFromMapWithLock(flowKey FlowKey)
func (*AggregationProcess) DeleteFlowKeyFromMapWithoutLock ¶ added in v0.3.2
func (a *AggregationProcess) DeleteFlowKeyFromMapWithoutLock(flowKey FlowKey)
DeleteFlowKeyFromMapWithoutLock need to be used only when the caller has already acquired the lock. For example, this can be used in a callback of ForAllRecordsDo function. TODO:Remove this when there is notion of invalid flows supported in aggregation process.
func (*AggregationProcess) ForAllRecordsDo ¶ added in v0.3.1
func (a *AggregationProcess) ForAllRecordsDo(callback FlowKeyRecordMapCallBack) error
ForAllRecordsDo takes in callback function to process the operations to flowkey->records pairs in the map
func (*AggregationProcess) Start ¶ added in v0.3.1
func (a *AggregationProcess) Start()
func (*AggregationProcess) Stop ¶ added in v0.3.1
func (a *AggregationProcess) Stop()
type FlowKeyRecordMapCallBack ¶ added in v0.3.1
type FlowKeyRecordMapCallBack func(key FlowKey, record AggregationFlowRecord) error