leakybucket

package
v1.6.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2024 License: MIT Imports: 34 Imported by: 1

README

Leakybuckets

Bucket concepts

The Leakybucket is used for decision making. Under certain conditions, enriched events are poured into these buckets. When these buckets are full, we raise a new event. After this event is raised the bucket is destroyed. There are many types of buckets, and we welcome any new useful design of buckets.

Usually, the bucket configuration generates the creation of many buckets. They are differentiated by a field called stackkey. When two events arrive with the same stackkey they go in the same matching bucket.

The very purpose of these buckets is to detect clients that exceed a certain rate of attempts to do something (ssh connection, http authentication failure, etc...). Thus, the most used stackkey field is often the source_ip.

Standard leaky buckets

Default buckets have two main configuration options:

  • capacity: number of events the bucket can hold. When the capacity is reached and a new event is poured, a new event is raised. We call this type of event overflow. This is an int.

  • leakspeed: duration needed for an event to leak. When an event leaks, it disappears from the bucket.

Trigger

A Trigger is a special type of bucket with a capacity of zero. Thus, when an event is poured into a trigger, it always raises an overflow.

Uniq

A Uniq is a bucket working like the standard leaky bucket except for one thing: a filter returns a property for each event and only one occurrence of this property is allowed in the bucket, thus the bucket is called uniq.

Counter

A Counter is a special type of bucket with an infinite capacity and an infinite leakspeed (it never overflows, nor leaks). Nevertheless, the event is raised after a fixed duration. The option is called duration.

Bayesian

A Bayesian is a special bucket that runs bayesian inference instead of counting events. Each event must have its likelihoods specified in the yaml file under prob_given_benign and prob_given_evil. The bucket will continue evaluating events until the posterior goes above the threshold (triggering the overflow) or the duration (specified by leakspeed) expires.

Available configuration options for buckets

Fields for standard buckets
  • type: mandatory field. Must be one of "leaky", "trigger", "uniq" or "counter"

  • name: mandatory field, but the value is totally open. Nevertheless, this value will tag the events raised by the bucket.

  • filter: mandatory field. It's a filter that is run to decide whether an event matches the bucket or not. The filter has to return a boolean. As a filter implementation we use https://github.com/antonmedv/expr

  • capacity: [mandatory for now, shouldn't be mandatory in the final version] it's the size of the bucket. When pouring in a bucket already with size events, it overflows.

  • leakspeed: leakspeed is a time duration (it has to be parsed by https://golang.org/pkg/time/#ParseDuration). After each interval, an event is leaked from the bucket.

  • stackkey: mandatory field. This field is used to differentiate on which instance of the bucket the matching events will be poured. When an unknown stackkey is seen in an event, a new bucket is created.

  • on_overflow: optional field, that tells what to do when the bucket is returning the overflow event. As of today, the possibilities are "ban,1h", "Reprocess" or "Delete". Reprocess is used to send the raised event back to the event pool to be matched against buckets

Fields for special buckets
Uniq
  • uniq_filter: an expression that must comply with the syntax defined in https://github.com/antonmedv/expr and must return a string. All strings returned by this filter in the same buckets have to be different. Thus if a string is seen twice, the event is dismissed.
Trigger

Capacity and leakspeed are not relevant for this kind of bucket.

Counter
  • duration: the Counter will be destroyed after this interval has elapsed since its creation. The duration must be parsed by https://golang.org/pkg/time/#ParseDuration. Nevertheless, this kind of bucket is often used with an infinite leakspeed and an infinite capacity [capacity set to -1 for now].
Bayesian
  • bayesian_prior: The prior to start with
  • bayesian_threshold: The threshold for the posterior to trigger the overflow.
  • bayesian_conditions: List of Bayesian conditions with likelihoods

Bayesian Conditions are built from:

  • condition: The expr for this specific condition to be true
  • prob_given_evil: The likelihood an IP satisfies the condition given the fact that it is a maliscious IP
  • prob_given_benign: The likelihood an IP satisfies the condition given the fact that it is a benign IP
  • guillotine: Bool to stop the condition from getting evaluated if it has evaluated to true once. This should be used if evaluating the condition is computationally expensive.

Add examples here

# ssh bruteforce
- type: leaky
  name: ssh_bruteforce
  filter: "Meta.log_type == 'ssh_failed-auth'"
  leakspeed: "10s"
  capacity: 5
  stackkey: "source_ip"
  on_overflow: ban,1h

# reporting of src_ip,dest_port seen
- type: counter
  name: counter
  filter: "Meta.service == 'tcp' && Event.new_connection == 'true'"
  distinct: "Meta.source_ip + ':' + Meta.dest_port"
  duration: 5m
  capacity: -1

- type: trigger
  name: "New connection"
  filter: "Meta.service == 'tcp' && Event.new_connection == 'true'"
  on_overflow: Reprocess

Note on leakybuckets implementation

[This is not dry enough to have many details here, but:]

The bucket code is triggered by runPour in pour.go, by calling the leaky.PourItemToHolders function. There is one struct called buckets which is for now a map[string]interface{} that holds all buckets. The key of this map is derived from the filter configured for the bucket and its stackkey. This looks complicated, but it allows us to use only one struct. This is done in buckets.go.

On top of that the implementation defines only the standard leaky bucket. A goroutine is launched for every bucket (bucket.go). This goroutine manages the life of the bucket.

For special buckets, hooks are defined at initialization time in manager.go. Hooks are called when relevant by the bucket goroutine when events are poured and/or when a bucket overflows.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var BucketPourCache map[string][]types.Event
View Source
var BucketPourTrack bool
View Source
var BucketsCanceled = prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: "cs_bucket_canceled_total",
		Help: "Total buckets canceled.",
	},
	[]string{"name"},
)
View Source
var BucketsCurrentCount = prometheus.NewGaugeVec(
	prometheus.GaugeOpts{
		Name: "cs_buckets",
		Help: "Number of buckets that currently exist.",
	},
	[]string{"name"},
)
View Source
var BucketsInstantiation = prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: "cs_bucket_created_total",
		Help: "Total buckets were instantiated.",
	},
	[]string{"name"},
)
View Source
var BucketsOverflow = prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: "cs_bucket_overflowed_total",
		Help: "Total buckets overflowed.",
	},
	[]string{"name"},
)
View Source
var BucketsPour = prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: "cs_bucket_poured_total",
		Help: "Total events were poured in bucket.",
	},
	[]string{"source", "type", "name"},
)
View Source
var BucketsUnderflow = prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: "cs_bucket_underflowed_total",
		Help: "Total buckets underflowed.",
	},
	[]string{"name"},
)
View Source
var LeakyRoutineCount int64

Functions

func DumpBucketsStateAt

func DumpBucketsStateAt(deadline time.Time, outputdir string, buckets *Buckets) (string, error)

func EventsFromQueue added in v1.0.0

func EventsFromQueue(queue *types.Queue) []*models.Event

EventsFromQueue iterates the queue to collect & prepare meta-datas from alert

func GarbageCollectBuckets

func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) error

The leaky routines lifecycle are based on "real" time. But when we are running in time-machine mode, the reference time is in logs and not "real" time. Thus we need to garbage collect them to avoid a skyrocketing memory usage.

func GetKey

func GetKey(bucketCfg BucketFactory, stackkey string) string

func LeakRoutine

func LeakRoutine(leaky *Leaky) error
for now mimic a leak routine

LeakRoutine us the life of a bucket. It dies when the bucket underflows or overflows

func LoadBucket

func LoadBucket(bucketFactory *BucketFactory, tomb *tomb.Tomb) error

Init recursively process yaml files from a directory and loads them as BucketFactory

func LoadBucketsState

func LoadBucketsState(file string, buckets *Buckets, bucketFactories []BucketFactory) error

func NewAlert added in v1.0.0

func NewAlert(leaky *Leaky, queue *types.Queue) (types.RuntimeAlert, error)

NewAlert will generate a RuntimeAlert and its APIAlert(s) from a bucket that overflowed

func Pour

func Pour(leaky *Leaky, msg types.Event)

func PourItemToBucket added in v1.2.3

func PourItemToBucket(bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed *types.Event) (bool, error)

func PourItemToHolders

func PourItemToHolders(parsed types.Event, holders []BucketFactory, buckets *Buckets) (bool, error)

func ShutdownAllBuckets added in v0.2.0

func ShutdownAllBuckets(buckets *Buckets) error

func SourceFromEvent added in v1.0.0

func SourceFromEvent(evt types.Event, leaky *Leaky) (map[string]models.Source, error)

SourceFromEvent extracts and formats a valid models.Source object from an Event

func TimeMachinePour

func TimeMachinePour(l *Leaky, msg types.Event)

func ValidateFactory

func ValidateFactory(bucketFactory *BucketFactory) error

Types

type BayesianBucket added in v1.5.3

type BayesianBucket struct {
	DumbProcessor
	// contains filtered or unexported fields
}

func (*BayesianBucket) AfterBucketPour added in v1.5.3

func (c *BayesianBucket) AfterBucketPour(b *BucketFactory) func(types.Event, *Leaky) *types.Event

func (*BayesianBucket) OnBucketInit added in v1.5.3

func (c *BayesianBucket) OnBucketInit(g *BucketFactory) error

type BayesianEvent added in v1.5.3

type BayesianEvent struct {
	// contains filtered or unexported fields
}

type Blackhole

type Blackhole struct {
	DumbProcessor
	// contains filtered or unexported fields
}

func NewBlackhole

func NewBlackhole(bucketFactory *BucketFactory) (*Blackhole, error)

func (*Blackhole) OnBucketOverflow

func (bl *Blackhole) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue)

type BucketFactory

type BucketFactory struct {
	FormatVersion string                 `yaml:"format"`
	Author        string                 `yaml:"author"`
	Description   string                 `yaml:"description"`
	References    []string               `yaml:"references"`
	Type          string                 `yaml:"type"`                // Type can be : leaky, counter, trigger. It determines the main bucket characteristics
	Name          string                 `yaml:"name"`                // Name of the bucket, used later in log and user-messages. Should be unique
	Capacity      int                    `yaml:"capacity"`            // Capacity is applicable to leaky buckets and determines the "burst" capacity
	LeakSpeed     string                 `yaml:"leakspeed"`           // Leakspeed is a float representing how many events per second leak out of the bucket
	Duration      string                 `yaml:"duration"`            // Duration allows 'counter' buckets to have a fixed life-time
	Filter        string                 `yaml:"filter"`              // Filter is an expr that determines if an event is elligible for said bucket. Filter is evaluated against the Event struct
	GroupBy       string                 `yaml:"groupby,omitempty"`   // groupy is an expr that allows to determine the partitions of the bucket. A common example is the source_ip
	Distinct      string                 `yaml:"distinct"`            // Distinct, when present, adds a `Pour()` processor that will only pour uniq items (based on distinct expr result)
	Debug         bool                   `yaml:"debug"`               // Debug, when set to true, will enable debugging for _this_ scenario specifically
	Labels        map[string]interface{} `yaml:"labels"`              // Labels is K:V list aiming at providing context the overflow
	Blackhole     string                 `yaml:"blackhole,omitempty"` // Blackhole is a duration that, if present, will prevent same bucket partition to overflow more often than $duration

	Reprocess           bool                   `yaml:"reprocess"`       // Reprocess, if true, will for the bucket to be re-injected into processing chain
	CacheSize           int                    `yaml:"cache_size"`      // CacheSize, if > 0, limits the size of in-memory cache of the bucket
	Profiling           bool                   `yaml:"profiling"`       // Profiling, if true, will make the bucket record pours/overflows/etc.
	OverflowFilter      string                 `yaml:"overflow_filter"` // OverflowFilter if present, is a filter that must return true for the overflow to go through
	ConditionalOverflow string                 `yaml:"condition"`       // condition if present, is an expression that must return true for the bucket to overflow
	BayesianPrior       float32                `yaml:"bayesian_prior"`
	BayesianThreshold   float32                `yaml:"bayesian_threshold"`
	BayesianConditions  []RawBayesianCondition `yaml:"bayesian_conditions"` // conditions for the bayesian bucket
	ScopeType           types.ScopeType        `yaml:"scope,omitempty"`     // to enforce a different remediation than blocking an IP. Will default this to IP
	BucketName          string                 `yaml:"-"`
	Filename            string                 `yaml:"-"`
	RunTimeFilter       *vm.Program            `json:"-"`
	RunTimeGroupBy      *vm.Program            `json:"-"`
	Data                []*types.DataSource    `yaml:"data,omitempty"`
	DataDir             string                 `yaml:"-"`
	CancelOnFilter      string                 `yaml:"cancel_on,omitempty"` // a filter that, if matched, kills the bucket

	ScenarioVersion string `yaml:"version,omitempty"`

	Simulated bool `yaml:"simulated"` // Set to true if the scenario instantiating the bucket was in the exclusion list
	// contains filtered or unexported fields
}

BucketFactory struct holds all fields for any bucket configuration. This is to have a generic struct for buckets. This can be seen as a bucket factory.

func LoadBuckets

func LoadBuckets(cscfg *csconfig.CrowdsecServiceCfg, hub *cwhub.Hub, files []string, tomb *tomb.Tomb, buckets *Buckets, orderEvent bool) ([]BucketFactory, chan types.Event, error)

type Buckets

type Buckets struct {
	Bucket_map *sync.Map
	// contains filtered or unexported fields
}

Buckets is the struct used to hold buckets in the context of main.go the idea is to have one struct to rule them all

func NewBuckets

func NewBuckets() *Buckets

NewBuckets create the Buckets struct

type CancelOnFilter added in v1.2.2

type CancelOnFilter struct {
	CancelOnFilter *vm.Program
	Debug          bool
}

func (*CancelOnFilter) AfterBucketPour added in v1.5.0

func (u *CancelOnFilter) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event

func (*CancelOnFilter) OnBucketInit added in v1.2.2

func (u *CancelOnFilter) OnBucketInit(bucketFactory *BucketFactory) error

func (*CancelOnFilter) OnBucketOverflow added in v1.2.2

func (u *CancelOnFilter) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue)

func (*CancelOnFilter) OnBucketPour added in v1.2.2

func (u *CancelOnFilter) OnBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event

type ConditionalOverflow added in v1.5.0

type ConditionalOverflow struct {
	ConditionalFilter        string
	ConditionalFilterRuntime *vm.Program
	DumbProcessor
}

func (*ConditionalOverflow) AfterBucketPour added in v1.5.0

func (c *ConditionalOverflow) AfterBucketPour(b *BucketFactory) func(types.Event, *Leaky) *types.Event

func (*ConditionalOverflow) OnBucketInit added in v1.5.0

func (c *ConditionalOverflow) OnBucketInit(g *BucketFactory) error

type DumbProcessor

type DumbProcessor struct {
}

func (*DumbProcessor) AfterBucketPour added in v1.5.0

func (d *DumbProcessor) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event

func (*DumbProcessor) OnBucketInit

func (d *DumbProcessor) OnBucketInit(bucketFactory *BucketFactory) error

func (*DumbProcessor) OnBucketOverflow

func (d *DumbProcessor) OnBucketOverflow(b *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue)

func (*DumbProcessor) OnBucketPour

func (d *DumbProcessor) OnBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event

type HiddenKey

type HiddenKey struct {
	// contains filtered or unexported fields
}

type Leaky

type Leaky struct {
	Name string
	Mode int //LIVE or TIMEMACHINE
	//the limiter is what holds the proper "leaky aspect", it determines when/if we can pour objects
	Limiter         rate.RateLimiter `json:"-"`
	SerializedState rate.Lstate
	//Queue is used to hold the cache of objects in the bucket, it is used to know 'how many' objects we have in buffer.
	Queue *types.Queue
	//Leaky buckets are receiving message through a chan
	In chan *types.Event `json:"-"`
	//Leaky buckets are pushing their overflows through a chan
	Out chan *types.Queue `json:"-"`
	// shared for all buckets (the idea is to kill this afterward)
	AllOut chan types.Event `json:"-"`
	//max capacity (for burst)
	Capacity int
	//CacheRatio is the number of elements that should be kept in memory (compared to capacity)
	CacheSize int
	//the unique identifier of the bucket (a hash)
	Mapkey string
	// chan for signaling
	Signal       chan bool `json:"-"`
	Suicide      chan bool `json:"-"`
	Reprocess    bool
	Simulated    bool
	Uuid         string
	First_ts     time.Time
	Last_ts      time.Time
	Ovflw_ts     time.Time
	Total_count  int
	Leakspeed    time.Duration
	BucketConfig *BucketFactory
	Duration     time.Duration
	Pour         func(*Leaky, types.Event) `json:"-"`
	//Profiling when set to true enables profiling of bucket
	Profiling bool
	// contains filtered or unexported fields
}

Leaky represents one instance of a bucket

func FromFactory

func FromFactory(bucketFactory BucketFactory) *Leaky

func LoadOrStoreBucketFromHolder added in v1.2.3

func LoadOrStoreBucketFromHolder(partitionKey string, buckets *Buckets, holder BucketFactory, expectMode int) (*Leaky, error)

func NewLeaky

func NewLeaky(bucketFactory BucketFactory) *Leaky

Newleaky creates a new leaky bucket from a BucketFactory Events created by the bucket (overflow, bucket empty) are sent to a chan defined by BucketFactory The leaky bucket implementation is based on rate limiter (see https://godoc.org/golang.org/x/time/rate) There's a trick to have an event said when the bucket gets empty to allow its destruction

func NewTimeMachine

func NewTimeMachine(g BucketFactory) *Leaky

type OverflowFilter

type OverflowFilter struct {
	Filter        string
	FilterRuntime *vm.Program
	DumbProcessor
}

func NewOverflowFilter

func NewOverflowFilter(g *BucketFactory) (*OverflowFilter, error)

func (*OverflowFilter) OnBucketOverflow

func (u *OverflowFilter) OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue)

type Processor

type Processor interface {
	OnBucketInit(Bucket *BucketFactory) error
	OnBucketPour(Bucket *BucketFactory) func(types.Event, *Leaky) *types.Event
	OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue)

	AfterBucketPour(Bucket *BucketFactory) func(types.Event, *Leaky) *types.Event
}

type RawBayesianCondition added in v1.5.3

type RawBayesianCondition struct {
	ConditionalFilterName string  `yaml:"condition"`
	ProbGivenEvil         float32 `yaml:"prob_given_evil"`
	ProbGivenBenign       float32 `yaml:"prob_given_benign"`
	Guillotine            bool    `yaml:"guillotine,omitempty"`
}

type Trigger

type Trigger struct {
	DumbProcessor
}

func (*Trigger) OnBucketPour

func (t *Trigger) OnBucketPour(b *BucketFactory) func(types.Event, *Leaky) *types.Event

type Uniq

type Uniq struct {
	DistinctCompiled *vm.Program
	KeyCache         map[string]bool
	CacheMutex       sync.Mutex
}

func (*Uniq) AfterBucketPour added in v1.5.0

func (u *Uniq) AfterBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event

func (*Uniq) OnBucketInit

func (u *Uniq) OnBucketInit(bucketFactory *BucketFactory) error

func (*Uniq) OnBucketOverflow

func (u *Uniq) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue)

func (*Uniq) OnBucketPour

func (u *Uniq) OnBucketPour(bucketFactory *BucketFactory) func(types.Event, *Leaky) *types.Event

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL