msgservice

package
v2.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2022 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AggregateScanAndGetQueue = func(outputName string, currentContent map[string]string, counts int, ignoreLength bool) []map[string]string {
	aggregatedScans, err := dbservice.AggregateScans(outputName, currentContent, counts, ignoreLength)
	if err != nil {
		log.Printf("AggregateScans Error: %v", err)
		return aggregatedScans
	}
	if len(currentContent) != 0 && len(aggregatedScans) == 0 {
		log.Printf("New scan was added to the queue of %q without sending.", outputName)
		return nil
	}
	return aggregatedScans
}
View Source
var RunScheduler = func(
	route *routes.InputRoute,
	fnSend func(plg outputs.Output, cnt map[string]string),
	fnAggregate func(outputName string, currentContent map[string]string, counts int, ignoreLength bool) []map[string]string,
	inpteval data.Inpteval,
	name *string,
	output outputs.Output,
) {
	log.Printf("Scheduler is activated for route %q. Period: %d sec", route.Name, route.Plugins.AggregateTimeoutSeconds)

	ticker := getTicker(route.Plugins.AggregateTimeoutSeconds)
	route.StartScheduler()

	go func(done chan struct{}, currentTicker *time.Ticker) {
		for {
			select {
			case <-done:
				currentTicker.Stop()
				log.Printf("Scheduler for %q was stopped", route.Name)
				return
			case <-currentTicker.C:
				log.Printf("Scheduler triggered for %q", route.Name)
				queue := fnAggregate(route.Name, nil, 0, false)
				if len(queue) > 0 {
					aggregated, err := inpteval.BuildAggregatedContent(queue)
					if err != nil {
						log.Printf("Unable to build aggregated contents %v\n", err)
					}
					fnSend(output, aggregated)
				}
			}
		}
	}(route.Scheduling, ticker)
}

Functions

func GetMessageUniqueId

func GetMessageUniqueId(in map[string]interface{}, props []string) string

Types

type MsgService

type MsgService struct {
}

func (*MsgService) MsgHandling

func (scan *MsgService) MsgHandling(input []byte, output outputs.Output, route *routes.InputRoute, inpteval data.Inpteval, AquaServer *string)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL