leakybucket

package
v1.5.0-freebsd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2023 License: MIT Imports: 34 Imported by: 1

README

Leakybuckets

Bucket concepts

Leakybucket is used for decision making. Under certain conditions enriched events are poured in these buckets. When these buckets are full, we raise a new event. After this event is raised the bucket is destroyed. There are many types of buckets, and we welcome any new useful design of buckets.

Usually the bucket configuration generates the creation of many buckets. They are differenciated by a field called stackkey. When two events arrives with the same stackkey they go in the same matching bucket.

The very purpose of these buckets is to detect clients that exceed a certain rate of attemps to do something (ssh connection, http authentication failure, etc...). Thus, the most use stackkey field is often the source_ip.

Standard leaky buckets

Default buckets have two main configuration options:

  • capacity: number of events the bucket can hold. When the capacity is reached and a new event is poured, a new event is raised. We call this type of event overflow. This is an int.
  • leakspeed: duration needed for an event to leak. When an event leaks, it disappear from the bucket.

Trigger

It's a special type of bucket with a zero capacity. Thus, when an event is poured in a trigger, it always raises an overflow.

Uniq

It's a bucket working as the standard leaky bucket except for one thing: a filter returns a property for each event and only one occurrence of this property is allowed in the bucket, thus the bucket is called uniq.

Counter

It's a special type of bucket with an infinite capacity and an infinite leakspeed (it never overflows, neither leaks). Nevertheless, the event is raised after a fixed duration. The option is called duration.

Available configuration options for buckets

Fields for standard buckets
  • type: mandatory field. Must be one of "leaky", "trigger", "uniq" or "counter"
  • name: mandatory field, but the value is totally open. Nevertheless this value will tag the events raised by the bucket.
  • filter: mandatory field. It's a filter that is run when the decision to make an event match the bucket or not. The filter have to return a boolean. As a filter implementation we use https://github.com/antonmedv/expr
  • capacity: [mandatory for now, shouldn't be mandatory in the final version] it's the size of the bucket. When pouring in a bucket already with size events, it overflows.
  • leakspeed: leakspeed is a time duration (has to be parseable by https://golang.org/pkg/time/#ParseDuration). After each interval an event is leaked from the bucket.
  • stackkey: mandatory field. This field is used to differentiate on which bucket ongoing events will be poured. When an unknown stackkey is seen in an event a new bucket is created.
  • on_overflow: optional field, that tells the what to do when the bucket is returning the overflow event. As of today, the possibility are these: "ban,1h", "Reprocess", "Delete". Reprocess is used to send the raised event back in the event pool to be matched agains buckets
Fields for special buckets
Uniq

Uniq has an extra field uniq_filter which is too use the filter implementation from https://github.com/antonmedv/expr. The filter must return a string. All strins returned by this filter in the same buckets have to be different. Thus, if a string is seen twice it is dismissed.

Trigger

Capacity and leakspeed are not relevant for this kind of bucket.

Counter

It's a special kind of bucket that raise an event and is destroyed after a fixed duration. The configuration field used is duration and must be parseable by https://golang.org/pkg/time/#ParseDuration. Nevertheless, this kind of bucket is often used with an infinite leakspeed and an infinite capacity [capacity set to -1 for now].

Add examples here

# ssh bruteforce
- type: leaky
  name: ssh_bruteforce
  filter: "Meta.log_type == 'ssh_failed-auth'"
  leakspeed: "10s"
  capacity: 5
  stackkey: "source_ip"
  on_overflow: ban,1h

# reporting of src_ip,dest_port seen
- type: counter
  name: counter
  filter: "Meta.service == 'tcp' && Event.new_connection == 'true'"
  distinct: "Meta.source_ip + ':' + Meta.dest_port"
  duration: 5m
  capacity: -1

- type: trigger
  name: "New connection"
  filter: "Meta.service == 'tcp' && Event.new_connection == 'true'"
  on_overflow: Reprocess

Note on leakybuckets implementation

[This is not dry enough to have many details here, but:]

The bucket code is triggered by InfiniBucketify in main.go. There's one struct called buckets which is for now a map[string]interface{} that holds all buckets. The key of this map is derivated from the filter configured for the bucket and its stackkey. This looks like complicated, but in fact it allows us to use only one structs. This is done in buckets.go.

On top of that the implementation define only the standard leaky bucket. A goroutine is launched for every buckets (bucket.go). This goroutine manages the life of the bucket.

For special buckets, hooks are defined at initialization time in manager.go. Hooks are called when relevant by the bucket gorourine when events are poured and/or when bucket overflows.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var BucketPourCache map[string][]types.Event
View Source
var BucketPourTrack bool
View Source
var BucketsCanceled = prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: "cs_bucket_canceled_total",
		Help: "Total buckets canceled.",
	},
	[]string{"name"},
)
View Source
var BucketsCurrentCount = prometheus.NewGaugeVec(
	prometheus.GaugeOpts{
		Name: "cs_buckets",
		Help: "Number of buckets that currently exist.",
	},
	[]string{"name"},
)
View Source
var BucketsInstantiation = prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: "cs_bucket_created_total",
		Help: "Total buckets were instantiated.",
	},
	[]string{"name"},
)
View Source
var BucketsOverflow = prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: "cs_bucket_overflowed_total",
		Help: "Total buckets overflowed.",
	},
	[]string{"name"},
)
View Source
var BucketsPour = prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: "cs_bucket_poured_total",
		Help: "Total events were poured in bucket.",
	},
	[]string{"source", "type", "name"},
)
View Source
var BucketsUnderflow = prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: "cs_bucket_underflowed_total",
		Help: "Total buckets underflowed.",
	},
	[]string{"name"},
)
View Source
var LeakyRoutineCount int64

Functions

func DumpBucketsStateAt

func DumpBucketsStateAt(deadline time.Time, outputdir string, buckets *Buckets) (string, error)

func EventsFromQueue added in v1.0.0

func EventsFromQueue(queue *Queue) []*models.Event

EventsFromQueue iterates the queue to collect & prepare meta-datas from alert

func GarbageCollectBuckets

func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error

The leaky routines lifecycle are based on "real" time. But when we are running in time-machine mode, the reference time is in logs and not "real" time. Thus we need to garbage collect them to avoid a skyrocketing memory usage.

func GetKey

func GetKey(bucketCfg BucketFactory, stackkey string) string

func LeakRoutine

func LeakRoutine(leaky *Leaky) error
for now mimic a leak routine

LeakRoutine us the life of a bucket. It dies when the bucket underflows or overflows

func LoadBucket

func LoadBucket(bucketFactory *BucketFactory, tomb *tomb.Tomb) error

Init recursively process yaml files from a directory and loads them as BucketFactory

func LoadBucketsState

func LoadBucketsState(file string, buckets *Buckets, bucketFactories []BucketFactory) error

func NewAlert added in v1.0.0

func NewAlert(leaky *Leaky, queue *Queue) (types.RuntimeAlert, error)

NewAlert will generate a RuntimeAlert and its APIAlert(s) from a bucket that overflowed

func Pour

func Pour(leaky *Leaky, msg types.Event)

func PourItemToBucket added in v1.2.3

func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed *types.Event) (bool, error)

func PourItemToHolders

func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error)

func ShutdownAllBuckets added in v0.2.0

func ShutdownAllBuckets(buckets *Buckets) error

func SourceFromEvent added in v1.0.0

func SourceFromEvent(evt types.Event, leaky *Leaky) (map[string]models.Source, error)

SourceFromEvent extracts and formats a valid models.Source object from an Event

func TimeMachinePour

func TimeMachinePour(l *Leaky, msg types.Event)

func ValidateFactory

func ValidateFactory(bucketFactory *BucketFactory) error

Types

type Blackhole

type Blackhole struct {
	DumbProcessor
	// contains filtered or unexported fields
}

func NewBlackhole

func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error)

func (*Blackhole) OnBucketOverflow

func (bl *Blackhole) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *Queue) (types.RuntimeAlert, *Queue)

type BucketFactory

type BucketFactory struct {
	FormatVersion string            `yaml:"format"`
	Author        string            `yaml:"author"`
	Description   string            `yaml:"description"`
	References    []string          `yaml:"references"`
	Type          string            `yaml:"type"`                //Type can be : leaky, counter, trigger. It determines the main bucket characteristics
	Name          string            `yaml:"name"`                //Name of the bucket, used later in log and user-messages. Should be unique
	Capacity      int               `yaml:"capacity"`            //Capacity is applicable to leaky buckets and determines the "burst" capacity
	LeakSpeed     string            `yaml:"leakspeed"`           //Leakspeed is a float representing how many events per second leak out of the bucket
	Duration      string            `yaml:"duration"`            //Duration allows 'counter' buckets to have a fixed life-time
	Filter        string            `yaml:"filter"`              //Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct
	GroupBy       string            `yaml:"groupby,omitempty"`   //groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip
	Distinct      string            `yaml:"distinct"`            //Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on distinct expr result)
	Debug         bool              `yaml:"debug"`               //Debug, when set to true, will enable debugging for _this_ scenario specifically
	Labels        map[string]string `yaml:"labels"`              //Labels is K:V list aiming at providing context the overflow
	Blackhole     string            `yaml:"blackhole,omitempty"` //Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration

	Reprocess           bool                      `yaml:"reprocess"`       //Reprocess, if true, will for the bucket to be re-injected into processing chain
	CacheSize           int                       `yaml:"cache_size"`      //CacheSize, if > 0, limits the size of in-memory cache of the bucket
	Profiling           bool                      `yaml:"profiling"`       //Profiling, if true, will make the bucket record pours/overflows/etc.
	OverflowFilter      string                    `yaml:"overflow_filter"` //OverflowFilter if present, is a filter that must return true for the overflow to go through
	ConditionalOverflow string                    `yaml:"condition"`       //condition if present, is an expression that must return true for the bucket to overflow
	ScopeType           types.ScopeType           `yaml:"scope,omitempty"` //to enforce a different remediation than blocking an IP. Will default this to IP
	BucketName          string                    `yaml:"-"`
	Filename            string                    `yaml:"-"`
	RunTimeFilter       *vm.Program               `json:"-"`
	ExprDebugger        *exprhelpers.ExprDebugger `yaml:"-" json:"-"` // used to debug expression by printing the content of each variable of the expression
	RunTimeGroupBy      *vm.Program               `json:"-"`
	Data                []*types.DataSource       `yaml:"data,omitempty"`
	DataDir             string                    `yaml:"-"`
	CancelOnFilter      string                    `yaml:"cancel_on,omitempty"` //a filter that, if matched, kills the bucket

	ScenarioVersion string `yaml:"version,omitempty"`

	Simulated bool `yaml:"simulated"` //Set to true if the scenario instantiating the bucket was in the exclusion list
	// contains filtered or unexported fields
}

BucketFactory struct holds all fields for any bucket configuration. This is to have a generic struct for buckets. This can be seen as a bucket factory.

func LoadBuckets

func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, files []string, tomb *tomb.Tomb, buckets *Buckets) ([]BucketFactory, chan types.Event, error)

type Buckets

type Buckets struct {
	Bucket_map *sync.Map
	// contains filtered or unexported fields
}

Buckets is the struct used to hold buckets in the context of main.go the idea is to have one struct to rule them all

func NewBuckets

func NewBuckets() *Buckets

NewBuckets create the Buckets struct

type CancelOnFilter added in v1.2.2

type CancelOnFilter struct {
	CancelOnFilter      *vm.Program
	CancelOnFilterDebug *exprhelpers.ExprDebugger
}

func (*CancelOnFilter) AfterBucketPour added in v1.5.0

func (u *CancelOnFilter) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event

func (*CancelOnFilter) OnBucketInit added in v1.2.2

func (u *CancelOnFilter) OnBucketInit(bucketFactory *BucketFactory) error

func (*CancelOnFilter) OnBucketOverflow added in v1.2.2

func (u *CancelOnFilter) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *Queue) (types.RuntimeAlert, *Queue)

func (*CancelOnFilter) OnBucketPour added in v1.2.2

func (u *CancelOnFilter) OnBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event

type ConditionalOverflow added in v1.5.0

type ConditionalOverflow struct {
	ConditionalFilter        string
	ConditionalFilterRuntime *vm.Program
	DumbProcessor
}

func (*ConditionalOverflow) AfterBucketPour added in v1.5.0

func (c *ConditionalOverflow) AfterBucketPour(b *BucketFactory) func(types.Event, *Leaky) *types.Event

func (*ConditionalOverflow) OnBucketInit added in v1.5.0

func (c *ConditionalOverflow) OnBucketInit(g *BucketFactory) error

type DumbProcessor

type DumbProcessor struct {
}

func (*DumbProcessor) AfterBucketPour added in v1.5.0

func (d *DumbProcessor) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event

func (*DumbProcessor) OnBucketInit

func (d *DumbProcessor) OnBucketInit(bucketFactory *BucketFactory) error

func (*DumbProcessor) OnBucketOverflow

func (d *DumbProcessor) OnBucketOverflow(b *BucketFactory) func(*Leaky, types.RuntimeAlert, *Queue) (types.RuntimeAlert, *Queue)

func (*DumbProcessor) OnBucketPour

func (d *DumbProcessor) OnBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event

type HiddenKey

type HiddenKey struct {
	// contains filtered or unexported fields
}

type Leaky

type Leaky struct {
	Name string
	Mode int //LIVE or TIMEMACHINE
	//the limiter is what holds the proper "leaky aspect", it determines when/if we can pour objects
	Limiter         rate.RateLimiter `json:"-"`
	SerializedState rate.Lstate
	//Queue is used to held the cache of objects in the bucket, it is used to know 'how many' objects we have in buffer.
	Queue *Queue
	//Leaky buckets are receiving message through a chan
	In chan *types.Event `json:"-"`
	//Leaky buckets are pushing their overflows through a chan
	Out chan *Queue `json:"-"`
	// shared for all buckets (the idea is to kill this afterwards)
	AllOut chan types.Event `json:"-"`
	//max capacity (for burst)
	Capacity int
	//CacheRatio is the number of elements that should be kept in memory (compared to capacity)
	CacheSize int
	//the unique identifier of the bucket (a hash)
	Mapkey string
	// chan for signaling
	Signal       chan bool `json:"-"`
	Suicide      chan bool `json:"-"`
	Reprocess    bool
	Simulated    bool
	Uuid         string
	First_ts     time.Time
	Last_ts      time.Time
	Ovflw_ts     time.Time
	Total_count  int
	Leakspeed    time.Duration
	BucketConfig *BucketFactory
	Duration     time.Duration
	Pour         func(*Leaky, types.Event) `json:"-"`
	//Profiling when set to true enables profiling of bucket
	Profiling bool
	// contains filtered or unexported fields
}

Leaky represents one instance of a bucket

func FromFactory

func FromFactory(bucketFactory BucketFactory) *Leaky

func LoadOrStoreBucketFromHolder added in v1.2.3

func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder BucketFactory, expectMode int) (*Leaky, error)

func NewLeaky

func NewLeaky(bucketFactory BucketFactory) *Leaky

Newleaky creates a new leaky bucket from a BucketFactory Events created by the bucket (overflow, bucket empty) are sent to a chan defined by BucketFactory The leaky bucket implementation is based on rate limiter (see https://godoc.org/golang.org/x/time/rate) There's a trick to have an event said when the bucket gets empty to allow its destruction

func NewTimeMachine

func NewTimeMachine(g BucketFactory) *Leaky

type OverflowFilter

type OverflowFilter struct {
	Filter        string
	FilterRuntime *vm.Program
	DumbProcessor
}

func NewOverflowFilter

func NewOverflowFilter(g *BucketFactory) (*OverflowFilter, error)

func (*OverflowFilter) OnBucketOverflow

func (u *OverflowFilter) OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.RuntimeAlert, *Queue) (types.RuntimeAlert, *Queue)

type Processor

type Processor interface {
	OnBucketInit(Bucket *BucketFactory) error
	OnBucketPour(Bucket *BucketFactory) func(types.Event, *Leaky) *types.Event
	OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.RuntimeAlert, *Queue) (types.RuntimeAlert, *Queue)

	AfterBucketPour(Bucket *BucketFactory) func(types.Event, *Leaky) *types.Event
}

type Queue

type Queue struct {
	Queue []types.Event
	L     int //capacity
}

Queue holds a limited size queue

func NewQueue

func NewQueue(l int) *Queue

NewQueue create a new queue with a size of l

func (*Queue) Add

func (q *Queue) Add(m types.Event)

Add an event in the queue. If it has already l elements, the first element is dropped before adding the new m element

func (*Queue) GetQueue

func (q *Queue) GetQueue() []types.Event

GetQueue returns the entire queue

type Trigger

type Trigger struct {
	DumbProcessor
}

func (*Trigger) OnBucketPour

func (t *Trigger) OnBucketPour(b *BucketFactory) func(types.Event, *Leaky) *types.Event

type Uniq

type Uniq struct {
	DistinctCompiled *vm.Program
	KeyCache         map[string]bool
	CacheMutex       sync.Mutex
}

func (*Uniq) AfterBucketPour added in v1.5.0

func (u *Uniq) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event

func (*Uniq) OnBucketInit

func (u *Uniq) OnBucketInit(bucketFactory *BucketFactory) error

func (*Uniq) OnBucketOverflow

func (u *Uniq) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *Queue) (types.RuntimeAlert, *Queue)

func (*Uniq) OnBucketPour

func (u *Uniq) OnBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL