Documentation
¶
Index ¶
- Constants
- Variables
- func PrintVersion()
- func PrintVersionExit()
- func StartEngine(ctx context.Context, engine Engine, waitChan *chan os.Signal) (int, error)
- type DefaultEngine
- func (de *DefaultEngine) AckMessage(msg amqp.Delivery)
- func (de *DefaultEngine) AcknowledgeConfirmStop()
- func (de *DefaultEngine) AskStop(state int)
- func (de *DefaultEngine) ConfirmPeriodicalStop()
- func (de *DefaultEngine) ConfirmWorkerStop()
- func (de *DefaultEngine) Continue() bool
- func (de *DefaultEngine) GetRunInfo() engine.RunInfo
- func (de *DefaultEngine) Initialize(ctx context.Context) error
- func (de *DefaultEngine) Logger() *zerolog.Logger
- func (de *DefaultEngine) NackMessage(msg amqp.Delivery)
- func (de *DefaultEngine) PeriodicalEnd()
- func (de *DefaultEngine) PeriodicalWaitTime() time.Duration
- func (de *DefaultEngine) ProcessWorkerError(err error, msg amqp.Delivery)
- func (de DefaultEngine) RunPeriodicalProcess() bool
- func (de DefaultEngine) RunWorkerProcess() bool
- func (de *DefaultEngine) SetWaitStopChan(sigchan chan os.Signal)
- func (de *DefaultEngine) Started(ctx context.Context, runInfo engine.RunInfo)
- func (de *DefaultEngine) Stop() int
- func (de *DefaultEngine) WorkerEnd()
- type Engine
Constants ¶
const ( ExitOK = iota ExitPanic ExitEngine )
Returned values on exit
const ( EnvCpsAMQPTestExchange = "canopsis.tests" ActionEngineName = "engine-action" ActionQueueName = "Engine_action" ActionAxeRPCClientQueueName = "Engine_action_axe_rpc_client" ActionWebhookRPCClientQueueName = "Engine_action_webhook_rpc_client" ActionConsumerName = "action" ActionRPCConsumerName = "action_rpc" AxeExchangeName = "engine-axe" AxeQueueName = "Engine_axe" AxeServiceRPCClientQueueName = "Engine_axe_service_rpc_client" AxePbehaviorRPCClientQueueName = "Engine_axe_pbehavior_rpc_client" AxeRPCQueueServerName = "Engine_axe_rpc_server" AxeConsumerName = "axe" AxeRPCConsumerName = "axe_rpc" CheExchangeName = "canopsis.events" CheEngineName = "engine-che" CheQueueName = "Engine_che" CheConsumerName = "che" DbName = mongo.DB DefaultBulkSize = 1000 DefaultEventAuthor = "system" DoneAutosolveDelay = 15 * 60 DynamicInfosQueueName = "Engine_dynamic_infos" DynamicInfosConsumerName = "dynamic-infos" HeartBeatExchangeName = "canopsis.events" HeartBeatQueueName = "Engine_heartbeat" HeartBeatConsumerName = "heartbeat" MaxPythonTimestamp = 253402297199 // compat: max timestamp in python StatsExchangeName = "canopsis.events" StatsQueueName = "Engine_stat" StatsConsumerName = "stat" StatsDatabase = "canopsis" StatsMeasurement = "event_state_history" StatsngExchangeName = "amq.direct" StatsngQueueName = "Engine_statsng" PBehaviorEngineName = "engine-pbehavior" PBehaviorQueueName = "Engine_pbehavior" PBehaviorRPCQueueServerName = "Engine_pbehavior_rpc_server" PBehaviorConsumerName = "pbehavior" PBehaviorRPCConsumerName = "pbehavior_rpc" PluginExtension = ".so" ServiceEngineName = "engine-service" ServiceQueueName = "Engine_service" ServiceRPCQueueServerName = "Engine_service_rpc_server" ServiceConsumerName = "service" ServiceRPCConsumerName = "service_rpc" WebhookRPCQueueServerName = "Engine_webhook_rpc_server" WebhookRPCConsumerName = "webhook_rpc" FIFOExchangeName = "" FIFOQueueName = "Engine_fifo" FIFOAckExchangeName = "" FIFOAckQueueName = "FIFO_ack" FIFOConsumerName = "fifo" CorrelationQueueName = "Engine_correlation" CorrelationConsumerName = "correlation" PeriodicalWaitTime = time.Minute JsonContentType = "application/json" CanopsisEventsExchange = "canopsis.events" )
Globals
Variables ¶
var BuildDate string
BuildDate ...
var BuildGitBranch string
BuildGitBranch ...
var BuildGitCommit string
BuildGitCommit is the short version of git commit
Functions ¶
func StartEngine ¶
StartEngine handles starting the WorkerProcess and PeriodicalProcess of your Engine. It first calls Engine.Initialize(), returns an error if any, then proceed to signals binding.
Engine.PeriodicalProcess and Engine.WorkerProcess are launched into separate goroutines.
Only SIGTERM or SIGINT will trigger the Engine.Stop() method.
waitChan is optional: if nil a chan will be instanciated and managed for you. This is mainly for tests.
Types ¶
type DefaultEngine ¶
type DefaultEngine struct { ConfirmChan chan bool Sub libamqp.Channel Sleep time.Duration RunWorker bool RunPeriodical bool Debug bool RunInfoManager engine.RunInfoManager // contains filtered or unexported fields }
DefaultEngine provides basic functions and will behave as an Engine with PeriodicalProcess and WorkerProcess.
Stop() will automatically close the Sub channel and waiting for confirmations to come on ConfirmChan.
func NewDefaultEngine ¶
func NewDefaultEngine( sleep time.Duration, runworker, runperiodical bool, sub libamqp.Channel, logger zerolog.Logger, runInfoManager ...engine.RunInfoManager, ) DefaultEngine
NewDefaultEngine returns an default engine implementation. Check engine_test.go in this package to see how you can use it.
func (*DefaultEngine) AckMessage ¶
func (de *DefaultEngine) AckMessage(msg amqp.Delivery)
func (*DefaultEngine) AcknowledgeConfirmStop ¶
func (de *DefaultEngine) AcknowledgeConfirmStop()
AcknowledgeConfirmStop sends true to ConfirmChan once for each call. Do NOT override this method when using DefaultEngine in your struct.
func (*DefaultEngine) AskStop ¶
func (de *DefaultEngine) AskStop(state int)
AskStop sends SIGINT into the channel where StartEngine waits for sigint/term, leading to a proper engine stop.
func (*DefaultEngine) ConfirmPeriodicalStop ¶
func (de *DefaultEngine) ConfirmPeriodicalStop()
ConfirmPeriodicalStop sends one value to ConfirmChan.
func (*DefaultEngine) ConfirmWorkerStop ¶
func (de *DefaultEngine) ConfirmWorkerStop()
ConfirmWorkerStop sends one value to ConfirmChan.
func (*DefaultEngine) Continue ¶
func (de *DefaultEngine) Continue() bool
Continue returns the DefaultEngine.cont flag.
func (*DefaultEngine) GetRunInfo ¶
func (de *DefaultEngine) GetRunInfo() engine.RunInfo
func (*DefaultEngine) Initialize ¶
func (de *DefaultEngine) Initialize(ctx context.Context) error
Initialize does nothing here besides returning nil.
func (*DefaultEngine) Logger ¶
func (de *DefaultEngine) Logger() *zerolog.Logger
func (*DefaultEngine) NackMessage ¶
func (de *DefaultEngine) NackMessage(msg amqp.Delivery)
func (*DefaultEngine) PeriodicalEnd ¶
func (de *DefaultEngine) PeriodicalEnd()
func (*DefaultEngine) PeriodicalWaitTime ¶
func (de *DefaultEngine) PeriodicalWaitTime() time.Duration
PeriodicalWaitTime returns the Sleep attribute.
func (*DefaultEngine) ProcessWorkerError ¶
func (de *DefaultEngine) ProcessWorkerError(err error, msg amqp.Delivery)
ProcessWorkerError nacks message if external services are not reachable and stops engine. It acks messages on other errors and continues engine working.
func (DefaultEngine) RunPeriodicalProcess ¶
func (de DefaultEngine) RunPeriodicalProcess() bool
RunPeriodicalProcess returns DefaultEngine.RunPeriodical
func (DefaultEngine) RunWorkerProcess ¶
func (de DefaultEngine) RunWorkerProcess() bool
RunWorkerProcess returns DefaultEngine.RunWorker
func (*DefaultEngine) SetWaitStopChan ¶
func (de *DefaultEngine) SetWaitStopChan(sigchan chan os.Signal)
SetWaitStopChan keeps in memory our sigint/term chan.
func (*DefaultEngine) Started ¶
func (de *DefaultEngine) Started(ctx context.Context, runInfo engine.RunInfo)
Started is called before StartEngine waits for stop signal.
func (*DefaultEngine) Stop ¶
func (de *DefaultEngine) Stop() int
Stop set cont flag to false, close the Sub channel then wait for two values from ConfirmChan. It returns the engine's exit status, that should be used in os.Exit.
func (*DefaultEngine) WorkerEnd ¶
func (de *DefaultEngine) WorkerEnd()
type Engine ¶
type Engine interface { // PeriodicalWaitTime returns the duration to wait between // two run of PeriodicalProcess() PeriodicalWaitTime() time.Duration // Continue returns the state of the engine. // true to continue running // false to stop as soon as possible, meaning when functions finish // their job. Continue() bool // ConsumerChan must return a new channel from amqp.Channel.Consume() // The worker process will stop looping when the channel is closed, // so you must close yourself all channels. ConsumerChan() (<-chan amqp.Delivery, error) // Initialize anything you want. If err != nil then the Engine will not start. Initialize(ctx context.Context) error Started(ctx context.Context, runInfo engine.RunInfo) // WorkerProcess must implement the actual consumer processing. WorkerProcess(context.Context, amqp.Delivery) // PeriodicalProcess must implement the actual "beat processing". // Prefer using channels that you will close for long // PeriodicalWaitTime() values. PeriodicalProcess(ctx context.Context) SetWaitStopChan(chan os.Signal) AskStop(int) // Stop handles stopping the engine, and also waiting // for goroutines to finish. // It returns the engine's exit status, that should be used in os.Exit. Stop() int // ConfirmPeriodicalStop is useful to mutate a variable // or an internal chan to confirm that the Periodical process // is now stopped. ConfirmPeriodicalStop() ConfirmWorkerStop() RunPeriodicalProcess() bool RunWorkerProcess() bool // Do NOT override when using DefaultEngine. // Use this method to send a ConfirmStop without doing anything else. AcknowledgeConfirmStop() // WorkerEnd is called at the end of the worker routine, when one of the // following things happened: // - the AMQP consumer channel closed // - the engine has been stopped // - WorkerProcess panicked // It should recover from panics, and notify the engine that the worker // routine has stopped (with the method ConfirmWorkerStop). WorkerEnd() // PeriodicalEnd is called at the end of the periodical routine, when one // of the following things happened: // - the engine has been stopped // - PeriodicalProcess panicked // It should recover from panics, and notify the engine that the periodical // routine has stopped (with the method ConfirmWorkerStop). PeriodicalEnd() Logger() *zerolog.Logger GetRunInfo() engine.RunInfo }
Engine makes engines more predictible in the way they are built. Used with StartEngine(), goroutine spawn is handled for you and you only have to implement your working logic.
You can use the DefaultEngine struct to avoid implementing Initialize(), Stop(), Continue(), Confirm*Stop() and Has*Process()
Directories
¶
Path | Synopsis |
---|---|
engine contain implementation of canopsis engine.
|
engine contain implementation of canopsis engine. |
Package idlealarm implements alarm modification on idle alarm.
|
Package idlealarm implements alarm modification on idle alarm. |
Package idlerule contains idle rule model and adapter.
|
Package idlerule contains idle rule model and adapter. |
Package operation implements alarm modification operations.
|
Package operation implements alarm modification operations. |
executor
Package executor contains operation executors.
|
Package executor contains operation executors. |