Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var AggregateScanAndGetQueue = func(outputName string, currentContent map[string]string, counts int, ignoreLength bool) []map[string]string { aggregatedScans, err := dbservice.AggregateScans(outputName, currentContent, counts, ignoreLength) if err != nil { log.Printf("AggregateScans Error: %v", err) return aggregatedScans } if len(currentContent) != 0 && len(aggregatedScans) == 0 { log.Printf("New scan was added to the queue of %q without sending.", outputName) return nil } return aggregatedScans }
View Source
var RunScheduler = func( route *routes.InputRoute, fnSend func(plg outputs.Output, cnt map[string]string), fnAggregate func(outputName string, currentContent map[string]string, counts int, ignoreLength bool) []map[string]string, inpteval data.Inpteval, name *string, output outputs.Output, ) { log.Printf("Scheduler is activated for route %q. Period: %d sec", route.Name, route.Plugins.AggregateTimeoutSeconds) ticker := getTicker(route.Plugins.AggregateTimeoutSeconds) route.StartScheduler() go func(done chan struct{}, currentTicker *time.Ticker) { for { select { case <-done: currentTicker.Stop() log.Printf("Scheduler for %q was stopped", route.Name) return case <-currentTicker.C: log.Printf("Scheduler triggered for %q", route.Name) queue := fnAggregate(route.Name, nil, 0, false) if len(queue) > 0 { aggregated, err := inpteval.BuildAggregatedContent(queue) if err != nil { log.Printf("Unable to build aggregated contents %v\n", err) } fnSend(output, aggregated) } } } }(route.Scheduling, ticker) }
Functions ¶
func GetMessageUniqueId ¶
Types ¶
type MsgService ¶
type MsgService struct { }
func (*MsgService) MsgHandling ¶
func (scan *MsgService) MsgHandling(input []byte, output outputs.Output, route *routes.InputRoute, inpteval data.Inpteval, AquaServer *string)
Click to show internal directories.
Click to hide internal directories.