scheduler

package
v5.0.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2023 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CalculateAndPersistFacts

func CalculateAndPersistFacts(t time.Time, factIDs []int64) (map[string]history.HistoryRecordV4, error)

func CalculateAndPersistSituations

func CalculateAndPersistSituations(localRuleEngine *ruleeng.RuleEngine, situationsToUpdate map[string]history.HistoryRecordV4) ([]tasker.TaskBatch, error)

func GetEnabledSituations

func GetEnabledSituations(fact engine.Fact, t time.Time) ([]history.HistoryRecordV4, error)

GetEnabledSituations returns all situation linked to a fact

func GetLinkedSituations

func GetLinkedSituations(fact engine.Fact) ([]history.HistoryRecordV4, error)

GetLinkedSituations returns all situation linked to a fact

func ReceiveAndPersistFacts

func ReceiveAndPersistFacts(aggregates []ExternalAggregate) (map[string]history.HistoryRecordV4, error)

ReceiveAndPersistFacts process a slice of ExternalAggregates and trigger all standard fact-situation-rule process

func ReplaceGlobalRepository

func ReplaceGlobalRepository(repository Repository) func()

ReplaceGlobalRepository affect a new repository to the global repository singleton

func ReplaceGlobals

func ReplaceGlobals(scheduler *InternalScheduler) func()

ReplaceGlobals affect a new repository to the global fact scheduler singleton

Types

type ExternalAggregate

type ExternalAggregate struct {
	FactID              int64       `json:"factId"`
	SituationID         int64       `json:"situationId"`
	SituationInstanceID int64       `json:"situationInstanceId"`
	Time                time.Time   `json:"time"`
	Value               reader.Item `json:"value"`
}

ExternalAggregate contains all information to store a new aggregat in postgresql

type FactCalculationJob

type FactCalculationJob struct {
	FactIds        []int64 `json:"factIds"`
	From           string  `json:"from,omitempty"`
	To             string  `json:"to,omitempty"`
	LastDailyValue bool    `json:"lastDailyValue,omitempty"`
	Debug          bool    `json:"debug"`
	ScheduleID     int64   `json:"-"`
}

FactCalculationJob represent a scheduler job instance which process a group of facts, and persist the result in postgresql It also generate situations, persists them and notify the rule engine to evaluate them

func (FactCalculationJob) IsValid

func (job FactCalculationJob) IsValid() (bool, error)

IsValid checks if an internal schedule job definition is valid and has no missing mandatory fields

func (*FactCalculationJob) ResolveFromAndTo

func (job *FactCalculationJob) ResolveFromAndTo(t time.Time) (time.Time, time.Time, error)

ResolveFromAndTo resolves the expressions in parameters From and To

func (FactCalculationJob) Run

func (job FactCalculationJob) Run()

Run contains all the business logic of the job

func (*FactCalculationJob) UnmarshalJSON

func (job *FactCalculationJob) UnmarshalJSON(data []byte) error

UnmarshalJSON unmarshals a quoted json string to a valid FactCalculationJob struct

type FactRecalculationJob added in v5.0.13

type FactRecalculationJob struct {
	FactIds        []int64 `json:"factIds"`
	From           string  `json:"from,omitempty"`
	To             string  `json:"to,omitempty"`
	LastDailyValue bool    `json:"lastDailyValue,omitempty"`
	Debug          bool    `json:"debug"`
	ScheduleID     int64   `json:"-"`
}

FactRecalculationJob represent a scheduler job instance which process a group of facts, and persist the result in postgresql It also generate situations, persists them and notify the rule engine to evaluate them

func (FactRecalculationJob) FetchRecalculationData added in v5.0.13

func (job FactRecalculationJob) FetchRecalculationData(historySituations []history.HistorySituationsV4) ([]history.HistoryFactsV4, map[int64][]int64, map[int64]int64, error)

func (FactRecalculationJob) RecalculateAndUpdateFacts added in v5.0.13

func (job FactRecalculationJob) RecalculateAndUpdateFacts(factIDs []int64, facts map[int64]engine.Fact,
	mapFactSituation map[int64]int64, mapSituations map[int64]history.HistorySituationsV4, historyFacts []history.HistoryFactsV4) (map[int64]history.HistoryFactsV4, error)

func (FactRecalculationJob) RecalculateAndUpdateSituations added in v5.0.13

func (job FactRecalculationJob) RecalculateAndUpdateSituations(localRuleEngine *ruleeng.RuleEngine, s situation.Situation, mapSituationFact map[int64][]int64,
	historySituations []history.HistorySituationsV4, newFactHistory map[int64]history.HistoryFactsV4) error

func (*FactRecalculationJob) ResolveFromAndTo added in v5.0.13

func (job *FactRecalculationJob) ResolveFromAndTo(t time.Time) (time.Time, time.Time, error)

ResolveFromAndTo resolves the expressions in parameters From and To

func (FactRecalculationJob) Run added in v5.0.13

func (job FactRecalculationJob) Run()

type InternalJob

type InternalJob interface {
	cron.Job
	IsValid() (bool, error)
}

InternalJob embed the external "standard" cron job with some additionnal data

func UnmarshalInternalJob

func UnmarshalInternalJob(t string, b json.RawMessage, scheduleID int64) (InternalJob, error)

UnmarshalInternalJob unmarshal a fact from a json string

type InternalSchedule

type InternalSchedule struct {
	ID       int64       `json:"id"`
	Name     string      `json:"name"`
	CronExpr string      `json:"cronexpr" example:"0 */15 * * *"`
	JobType  string      `json:"jobtype" enums:"fact,baseline"`
	Job      InternalJob `json:"job"`
}

InternalSchedule wrap a schedule

func (*InternalSchedule) IsValid

func (schedule *InternalSchedule) IsValid() (bool, error)

IsValid checks if an internal schedule definition is valid and has no missing mandatory fields

func (*InternalSchedule) UnmarshalJSON

func (schedule *InternalSchedule) UnmarshalJSON(data []byte) error

UnmarshalJSON unmarshals a json object as a InternalSchedule

type InternalScheduler

type InternalScheduler struct {
	C    *cron.Cron
	Jobs map[int64]cron.EntryID

	RuleEngine chan string
	// contains filtered or unexported fields
}

InternalScheduler represents an instance of a scheduler used for fact processing

func NewScheduler

func NewScheduler() *InternalScheduler

NewScheduler returns a pointer to a new instance of InternalScheduler

func S

func S() *InternalScheduler

S is used to access the global fact scheduler singleton

func (*InternalScheduler) AddJobSchedule

func (s *InternalScheduler) AddJobSchedule(schedule InternalSchedule) error

AddJobSchedule add a new schedule to the current scheduler

func (*InternalScheduler) AddRunningJob

func (s *InternalScheduler) AddRunningJob(scheduleID int64)

AddRunningJob add a job ID to the running job list

func (*InternalScheduler) ExistingRunningJob

func (s *InternalScheduler) ExistingRunningJob(scheduleID int64) bool

ExistingRunningJob check if a job is already running

func (*InternalScheduler) Init

func (s *InternalScheduler) Init() error

Init loads the job schedules from Data Base

func (*InternalScheduler) RemoveJobSchedule

func (s *InternalScheduler) RemoveJobSchedule(scheduleID int64)

RemoveJobSchedule add a new job to the current scheduler

func (*InternalScheduler) RemoveRunningJob

func (s *InternalScheduler) RemoveRunningJob(scheduleID int64)

RemoveRunningJob remove a job ID to the running job list

type PostgresRepository

type PostgresRepository struct {
	// contains filtered or unexported fields
}

PostgresRepository is a repository containing the rules based on a PSQL database and implementing the repository interface

func (*PostgresRepository) Create

func (r *PostgresRepository) Create(schedule InternalSchedule) (int64, error)

Create creates a new schedule in the repository

func (*PostgresRepository) Delete

func (r *PostgresRepository) Delete(id int64) error

Delete deletes an entry from the repository by it's ID

func (*PostgresRepository) Get

Get search and returns a job schedule from the repository by its id

func (*PostgresRepository) GetAll

func (r *PostgresRepository) GetAll() (map[int64]InternalSchedule, error)

GetAll returns all job schedules in the repository

func (*PostgresRepository) Update

func (r *PostgresRepository) Update(schedule InternalSchedule) error

Update updates a schedule in the repository by its name

type Repository

type Repository interface {
	Create(schedule InternalSchedule) (int64, error)
	Get(id int64) (InternalSchedule, bool, error)
	Update(schedule InternalSchedule) error
	Delete(id int64) error
	GetAll() (map[int64]InternalSchedule, error)
}

Repository is a storage interface which can be implemented by multiple backend (in-memory map, sql database, in-memory cache, file system, ...) It allows standard CRUD operation on situations

func NewPostgresRepository

func NewPostgresRepository(dbClient *sqlx.DB) Repository

NewPostgresRepository returns a new instance of PostgresRulesRepository

func R

func R() Repository

R is used to access the global repository singleton

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL