Documentation ¶
Index ¶
- Variables
- func DumpBucketsStateAt(deadline time.Time, outputdir string, buckets *Buckets) (string, error)
- func EventsFromQueue(queue *types.Queue) []*models.Event
- func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error
- func GetKey(bucketCfg BucketFactory, stackkey string) string
- func LeakRoutine(leaky *Leaky) error
- func LoadBucket(bucketFactory *BucketFactory, tomb *tomb.Tomb) error
- func LoadBucketsState(file string, buckets *Buckets, bucketFactories []BucketFactory) error
- func NewAlert(leaky *Leaky, queue *types.Queue) (types.RuntimeAlert, error)
- func Pour(leaky *Leaky, msg types.Event)
- func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed *types.Event) (bool, error)
- func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error)
- func ShutdownAllBuckets(buckets *Buckets) error
- func SourceFromEvent(evt types.Event, leaky *Leaky) (map[string]models.Source, error)
- func TimeMachinePour(l *Leaky, msg types.Event)
- func ValidateFactory(bucketFactory *BucketFactory) error
- type BayesianBucket
- type BayesianEvent
- type Blackhole
- type BucketFactory
- type Buckets
- type CancelOnFilter
- func (u *CancelOnFilter) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event
- func (u *CancelOnFilter) OnBucketInit(bucketFactory *BucketFactory) error
- func (u *CancelOnFilter) OnBucketOverflow(bucketFactory *BucketFactory) ...
- func (u *CancelOnFilter) OnBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event
- type ConditionalOverflow
- type DumbProcessor
- func (d *DumbProcessor) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event
- func (d *DumbProcessor) OnBucketInit(bucketFactory *BucketFactory) error
- func (d *DumbProcessor) OnBucketOverflow(b *BucketFactory) ...
- func (d *DumbProcessor) OnBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event
- type HiddenKey
- type Leaky
- type OverflowFilter
- type Processor
- type RawBayesianCondition
- type Trigger
- type Uniq
- func (u *Uniq) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event
- func (u *Uniq) OnBucketInit(bucketFactory *BucketFactory) error
- func (u *Uniq) OnBucketOverflow(bucketFactory *BucketFactory) ...
- func (u *Uniq) OnBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event
Constants ¶
This section is empty.
Variables ¶
var BucketPourCache map[string][]types.Event
var BucketPourTrack bool
var BucketsCanceled = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cs_bucket_canceled_total", Help: "Total buckets canceled.", }, []string{"name"}, )
var BucketsCurrentCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cs_buckets", Help: "Number of buckets that currently exist.", }, []string{"name"}, )
var BucketsInstantiation = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cs_bucket_created_total", Help: "Total buckets were instantiated.", }, []string{"name"}, )
var BucketsOverflow = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cs_bucket_overflowed_total", Help: "Total buckets overflowed.", }, []string{"name"}, )
var BucketsPour = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cs_bucket_poured_total", Help: "Total events were poured in bucket.", }, []string{"source", "type", "name"}, )
var BucketsUnderflow = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cs_bucket_underflowed_total", Help: "Total buckets underflowed.", }, []string{"name"}, )
var LeakyRoutineCount int64
Functions ¶
func DumpBucketsStateAt ¶
func EventsFromQueue ¶ added in v1.0.0
EventsFromQueue iterates the queue to collect & prepare meta-datas from alert
func GarbageCollectBuckets ¶
The leaky routines lifecycle are based on "real" time. But when we are running in time-machine mode, the reference time is in logs and not "real" time. Thus we need to garbage collect them to avoid a skyrocketing memory usage.
func GetKey ¶
func GetKey(bucketCfg BucketFactory, stackkey string) string
func LeakRoutine ¶
for now mimic a leak routine
LeakRoutine us the life of a bucket. It dies when the bucket underflows or overflows
func LoadBucket ¶
func LoadBucket(bucketFactory *BucketFactory, tomb *tomb.Tomb) error
Init recursively process yaml files from a directory and loads them as BucketFactory
func LoadBucketsState ¶
func LoadBucketsState(file string, buckets *Buckets, bucketFactories []BucketFactory) error
func NewAlert ¶ added in v1.0.0
NewAlert will generate a RuntimeAlert and its APIAlert(s) from a bucket that overflowed
func PourItemToBucket ¶ added in v1.2.3
func PourItemToHolders ¶
func ShutdownAllBuckets ¶ added in v0.2.0
func SourceFromEvent ¶ added in v1.0.0
SourceFromEvent extracts and formats a valid models.Source object from an Event
func TimeMachinePour ¶
func ValidateFactory ¶
func ValidateFactory(bucketFactory *BucketFactory) error
Types ¶
type BayesianBucket ¶ added in v1.5.3
type BayesianBucket struct { DumbProcessor // contains filtered or unexported fields }
func (*BayesianBucket) AfterBucketPour ¶ added in v1.5.3
func (c *BayesianBucket) AfterBucketPour(b *BucketFactory) func(types.Event, *Leaky) *types.Event
func (*BayesianBucket) OnBucketInit ¶ added in v1.5.3
func (c *BayesianBucket) OnBucketInit(g *BucketFactory) error
type BayesianEvent ¶ added in v1.5.3
type BayesianEvent struct {
// contains filtered or unexported fields
}
type Blackhole ¶
type Blackhole struct { DumbProcessor // contains filtered or unexported fields }
func NewBlackhole ¶
func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error)
func (*Blackhole) OnBucketOverflow ¶
func (bl *Blackhole) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue)
type BucketFactory ¶
type BucketFactory struct { FormatVersion string `yaml:"format"` Author string `yaml:"author"` Description string `yaml:"description"` References []string `yaml:"references"` Type string `yaml:"type"` // Type can be : leaky, counter, trigger. It determines the main bucket characteristics Name string `yaml:"name"` // Name of the bucket, used later in log and user-messages. Should be unique Capacity int `yaml:"capacity"` // Capacity is applicable to leaky buckets and determines the "burst" capacity LeakSpeed string `yaml:"leakspeed"` // Leakspeed is a float representing how many events per second leak out of the bucket Duration string `yaml:"duration"` // Duration allows 'counter' buckets to have a fixed life-time Filter string `yaml:"filter"` // Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct GroupBy string `yaml:"groupby,omitempty"` // groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip Distinct string `yaml:"distinct"` // Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on distinct expr result) Debug bool `yaml:"debug"` // Debug, when set to true, will enable debugging for _this_ scenario specifically Labels map[string]interface{} `yaml:"labels"` // Labels is K:V list aiming at providing context the overflow Blackhole string `yaml:"blackhole,omitempty"` // Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration Reprocess bool `yaml:"reprocess"` // Reprocess, if true, will for the bucket to be re-injected into processing chain CacheSize int `yaml:"cache_size"` // CacheSize, if > 0, limits the size of in-memory cache of the bucket Profiling bool `yaml:"profiling"` // Profiling, if true, will make the bucket record pours/overflows/etc. OverflowFilter string `yaml:"overflow_filter"` // OverflowFilter if present, is a filter that must return true for the overflow to go through ConditionalOverflow string `yaml:"condition"` // condition if present, is an expression that must return true for the bucket to overflow BayesianPrior float32 `yaml:"bayesian_prior"` BayesianThreshold float32 `yaml:"bayesian_threshold"` BayesianConditions []RawBayesianCondition `yaml:"bayesian_conditions"` // conditions for the bayesian bucket ScopeType types.ScopeType `yaml:"scope,omitempty"` // to enforce a different remediation than blocking an IP. Will default this to IP BucketName string `yaml:"-"` Filename string `yaml:"-"` RunTimeFilter *vm.Program `json:"-"` RunTimeGroupBy *vm.Program `json:"-"` Data []*types.DataSource `yaml:"data,omitempty"` DataDir string `yaml:"-"` CancelOnFilter string `yaml:"cancel_on,omitempty"` // a filter that, if matched, kills the bucket ScenarioVersion string `yaml:"version,omitempty"` Simulated bool `yaml:"simulated"` // Set to true if the scenario instantiating the bucket was in the exclusion list // contains filtered or unexported fields }
BucketFactory struct holds all fields for any bucket configuration. This is to have a generic struct for buckets. This can be seen as a bucket factory.
func LoadBuckets ¶
type Buckets ¶
Buckets is the struct used to hold buckets in the context of main.go the idea is to have one struct to rule them all
type CancelOnFilter ¶ added in v1.2.2
func (*CancelOnFilter) AfterBucketPour ¶ added in v1.5.0
func (u *CancelOnFilter) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event
func (*CancelOnFilter) OnBucketInit ¶ added in v1.2.2
func (u *CancelOnFilter) OnBucketInit(bucketFactory *BucketFactory) error
func (*CancelOnFilter) OnBucketOverflow ¶ added in v1.2.2
func (u *CancelOnFilter) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue)
func (*CancelOnFilter) OnBucketPour ¶ added in v1.2.2
func (u *CancelOnFilter) OnBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event
type ConditionalOverflow ¶ added in v1.5.0
type ConditionalOverflow struct { ConditionalFilter string ConditionalFilterRuntime *vm.Program DumbProcessor }
func (*ConditionalOverflow) AfterBucketPour ¶ added in v1.5.0
func (c *ConditionalOverflow) AfterBucketPour(b *BucketFactory) func(types.Event, *Leaky) *types.Event
func (*ConditionalOverflow) OnBucketInit ¶ added in v1.5.0
func (c *ConditionalOverflow) OnBucketInit(g *BucketFactory) error
type DumbProcessor ¶
type DumbProcessor struct { }
func (*DumbProcessor) AfterBucketPour ¶ added in v1.5.0
func (d *DumbProcessor) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event
func (*DumbProcessor) OnBucketInit ¶
func (d *DumbProcessor) OnBucketInit(bucketFactory *BucketFactory) error
func (*DumbProcessor) OnBucketOverflow ¶
func (d *DumbProcessor) OnBucketOverflow(b *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue)
func (*DumbProcessor) OnBucketPour ¶
func (d *DumbProcessor) OnBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event
type Leaky ¶
type Leaky struct { Name string Mode int //LIVE or TIMEMACHINE //the limiter is what holds the proper "leaky aspect", it determines when/if we can pour objects Limiter rate.RateLimiter `json:"-"` SerializedState rate.Lstate //Queue is used to hold the cache of objects in the bucket, it is used to know 'how many' objects we have in buffer. Queue *types.Queue //Leaky buckets are receiving message through a chan In chan *types.Event `json:"-"` //Leaky buckets are pushing their overflows through a chan Out chan *types.Queue `json:"-"` // shared for all buckets (the idea is to kill this afterward) AllOut chan types.Event `json:"-"` //max capacity (for burst) Capacity int //CacheRatio is the number of elements that should be kept in memory (compared to capacity) CacheSize int //the unique identifier of the bucket (a hash) Mapkey string // chan for signaling Signal chan bool `json:"-"` Suicide chan bool `json:"-"` Reprocess bool Simulated bool Uuid string First_ts time.Time Last_ts time.Time Ovflw_ts time.Time Total_count int Leakspeed time.Duration BucketConfig *BucketFactory Duration time.Duration Pour func(*Leaky, types.Event) `json:"-"` //Profiling when set to true enables profiling of bucket Profiling bool // contains filtered or unexported fields }
Leaky represents one instance of a bucket
func FromFactory ¶
func FromFactory(bucketFactory BucketFactory) *Leaky
func LoadOrStoreBucketFromHolder ¶ added in v1.2.3
func NewLeaky ¶
func NewLeaky(bucketFactory BucketFactory) *Leaky
Newleaky creates a new leaky bucket from a BucketFactory Events created by the bucket (overflow, bucket empty) are sent to a chan defined by BucketFactory The leaky bucket implementation is based on rate limiter (see https://godoc.org/golang.org/x/time/rate) There's a trick to have an event said when the bucket gets empty to allow its destruction
func NewTimeMachine ¶
func NewTimeMachine(g BucketFactory) *Leaky
type OverflowFilter ¶
type OverflowFilter struct { Filter string FilterRuntime *vm.Program DumbProcessor }
func NewOverflowFilter ¶
func NewOverflowFilter(g *BucketFactory) (*OverflowFilter, error)
func (*OverflowFilter) OnBucketOverflow ¶
func (u *OverflowFilter) OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue)
type Processor ¶
type Processor interface { OnBucketInit(Bucket *BucketFactory) error OnBucketPour(Bucket *BucketFactory) func(types.Event, *Leaky) *types.Event OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue) AfterBucketPour(Bucket *BucketFactory) func(types.Event, *Leaky) *types.Event }
type RawBayesianCondition ¶ added in v1.5.3
type Uniq ¶
func (*Uniq) AfterBucketPour ¶ added in v1.5.0
func (*Uniq) OnBucketInit ¶
func (u *Uniq) OnBucketInit(bucketFactory *BucketFactory) error
func (*Uniq) OnBucketOverflow ¶
func (u *Uniq) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue)