Documentation ¶
Index ¶
- Constants
- Variables
- func DumpBucketsStateAt(deadline time.Time, buckets *Buckets) (string, error)
- func FormatOverflow(l *Leaky, queue *Queue) types.SignalOccurence
- func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error
- func GetKey(bucketCfg BucketFactory, stackkey string) string
- func LeakRoutine(l *Leaky)
- func LoadBucket(g *BucketFactory, dataFolder string) error
- func LoadBucketsState(file string, buckets *Buckets, holders []BucketFactory) error
- func Pour(l *Leaky, msg types.Event)
- func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error)
- func ShutdownAllBuckets(buckets *Buckets) error
- func TimeMachinePour(l *Leaky, msg types.Event)
- func ValidateFactory(b *BucketFactory) error
- type Blackhole
- type BucketFactory
- type Buckets
- type DumbProcessor
- type HiddenKey
- type Leaky
- type OverflowFilter
- type Processor
- type Queue
- type Trigger
- type Uniq
Constants ¶
const ( LIVE = iota TIMEMACHINE )
Variables ¶
var BucketsCurrentCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cs_buckets", Help: "Number of buckets that currently exist.", }, []string{"name"}, )
var BucketsInstanciation = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cs_bucket_created_total", Help: "Total buckets were instanciated.", }, []string{"name"}, )
var BucketsOverflow = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cs_bucket_overflowed_total", Help: "Total buckets overflowed.", }, []string{"name"}, )
var BucketsPour = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cs_bucket_poured_total", Help: "Total events were poured in bucket.", }, []string{"source", "name"}, )
var BucketsUnderflow = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "cs_bucket_underflowed_total", Help: "Total buckets underflowed.", }, []string{"name"}, )
var LeakyRoutineCount int64
Functions ¶
func DumpBucketsStateAt ¶
func FormatOverflow ¶
func FormatOverflow(l *Leaky, queue *Queue) types.SignalOccurence
func GarbageCollectBuckets ¶
The leaky routines lifecycle are based on "real" time. But when we are running in time-machine mode, the reference time is in logs and not "real" time. Thus we need to garbage collect them to avoid a skyrocketing memory usage.
func GetKey ¶
func GetKey(bucketCfg BucketFactory, stackkey string) string
func LeakRoutine ¶
func LeakRoutine(l *Leaky)
for now mimic a leak routine
LeakRoutine us the life of a bucket. It dies when the bucket underflows or overflows
func LoadBucket ¶
func LoadBucket(g *BucketFactory, dataFolder string) error
Init recursively process yaml files from a directory and loads them as BucketFactory
func LoadBucketsState ¶
func LoadBucketsState(file string, buckets *Buckets, holders []BucketFactory) error
func PourItemToHolders ¶
func ShutdownAllBuckets ¶ added in v0.2.0
func TimeMachinePour ¶
func ValidateFactory ¶
func ValidateFactory(b *BucketFactory) error
Types ¶
type Blackhole ¶
type Blackhole struct { DumbProcessor // contains filtered or unexported fields }
func NewBlackhole ¶
func NewBlackhole(g *BucketFactory) (*Blackhole, error)
func (*Blackhole) OnBucketOverflow ¶
func (bl *Blackhole) OnBucketOverflow(b *BucketFactory) func(*Leaky, types.SignalOccurence, *Queue) (types.SignalOccurence, *Queue)
type BucketFactory ¶
type BucketFactory struct { FormatVersion string `yaml:"format"` Author string `yaml:"author"` Description string `yaml:"description"` References []string `yaml:"references"` Type string `yaml:"type"` //Type can be : leaky, counter, trigger. It determines the main bucket characteristics Name string `yaml:"name"` //Name of the bucket, used later in log and user-messages. Should be unique Capacity int `yaml:"capacity"` //Capacity is applicable to leaky buckets and determines the "burst" capacity LeakSpeed string `yaml:"leakspeed"` //Leakspeed is a float representing how many events per second leak out of the bucket Duration string `yaml:"duration"` //Duration allows 'counter' buckets to have a fixed life-time Filter string `yaml:"filter"` //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct GroupBy string `yaml:"groupby,omitempty"` //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip Distinct string `yaml:"distinct"` //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on uniq_filter expr result) Debug bool `yaml:"debug"` //Debug, when set to true, will enable debugging for _this_ scenario specifically Labels map[string]string `yaml:"labels"` //Labels is K:V list aiming at providing context the overflow Blackhole string `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration Reprocess bool `yaml:"reprocess"` //Reprocess, if true, will for the bucket to be re-injected into processing chain CacheSize int `yaml:"cache_size"` //CacheSize, if > 0, limits the size of in-memory cache of the bucket Profiling bool `yaml:"profiling"` //Profiling, if true, will make the bucket record pours/overflows/etc. OverflowFilter string `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through BucketName string `yaml:"-"` Filename string `yaml:"-"` RunTimeFilter *vm.Program `json:"-"` ExprDebugger *exprhelpers.ExprDebugger `yaml:"-" json:"-"` // used to debug expression by printing the content of each variable of the expression RunTimeGroupBy *vm.Program `json:"-"` Data []*types.DataSource `yaml:"data,omitempty"` // contains filtered or unexported fields }
BucketFactory struct holds all fields for any bucket configuration. This is to have a generic struct for buckets. This can be seen as a bucket factory.
func LoadBucketDir ¶
func LoadBuckets ¶
type Buckets ¶
Buckets is the struct used to hold buckets in the context of main.go the idea is to have one struct to rule them all
type DumbProcessor ¶
type DumbProcessor struct { }
func (*DumbProcessor) OnBucketInit ¶
func (d *DumbProcessor) OnBucketInit(b *BucketFactory) error
func (*DumbProcessor) OnBucketOverflow ¶
func (d *DumbProcessor) OnBucketOverflow(b *BucketFactory) func(*Leaky, types.SignalOccurence, *Queue) (types.SignalOccurence, *Queue)
func (*DumbProcessor) OnBucketPour ¶
func (d *DumbProcessor) OnBucketPour(b *BucketFactory) func(types.Event, *Leaky) *types.Event
type Leaky ¶
type Leaky struct { Name string Mode int //LIVE or TIMEMACHINE //the limiter is what holds the proper "leaky aspect", it determines when/if we can pour objects Limiter rate.RateLimiter `json:"-"` SerializedState rate.Lstate //Queue is used to held the cache of objects in the bucket, it is used to know 'how many' objects we have in buffer. Queue *Queue //Leaky buckets are receiving message through a chan In chan types.Event `json:"-"` //Leaky buckets are pushing their overflows through a chan Out chan *Queue `json:"-"` // shared for all buckets (the idea is to kill this afterwards) AllOut chan types.Event `json:"-"` KillSwitch chan bool `json:"-"` //max capacity (for burst) Capacity int //CacheRatio is the number of elements that should be kept in memory (compared to capacity) CacheSize int //the unique identifier of the bucket (a hash) Mapkey string // chan for signaling Signal chan bool `json:"-"` Reprocess bool Uuid string First_ts time.Time Last_ts time.Time Ovflw_ts time.Time Total_count int Leakspeed time.Duration BucketConfig *BucketFactory Duration time.Duration Pour func(*Leaky, types.Event) `json:"-"` //Profiling when set to true enables profiling of bucket Profiling bool // contains filtered or unexported fields }
Leaky represents one instance of a bucket
func FromFactory ¶
func FromFactory(g BucketFactory) *Leaky
func NewLeaky ¶
func NewLeaky(g BucketFactory) *Leaky
Newleaky creates a new leaky bucket from a BucketFactory Events created by the bucket (overflow, bucket empty) are sent to a chan defined by BucketFactory The leaky bucket implementation is based on rate limiter (see https://godoc.org/golang.org/x/time/rate) There's a trick to have an event said when the bucket gets empty to allow its destruction
func NewTimeMachine ¶
func NewTimeMachine(g BucketFactory) *Leaky
type OverflowFilter ¶
type OverflowFilter struct { Filter string FilterRuntime *vm.Program DumbProcessor }
func NewOverflowFilter ¶
func NewOverflowFilter(g *BucketFactory) (*OverflowFilter, error)
func (*OverflowFilter) OnBucketOverflow ¶
func (u *OverflowFilter) OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.SignalOccurence, *Queue) (types.SignalOccurence, *Queue)
type Processor ¶
type Processor interface { OnBucketInit(Bucket *BucketFactory) error OnBucketPour(Bucket *BucketFactory) func(types.Event, *Leaky) *types.Event OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.SignalOccurence, *Queue) (types.SignalOccurence, *Queue) }
type Queue ¶
Queue holds a limited size queue
type Uniq ¶
func (*Uniq) OnBucketInit ¶
func (u *Uniq) OnBucketInit(Bucket *BucketFactory) error
func (*Uniq) OnBucketOverflow ¶
func (u *Uniq) OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.SignalOccurence, *Queue) (types.SignalOccurence, *Queue)