throttleplugin

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2019 License: MIT Imports: 21 Imported by: 0

README

Build Status Go Report Card

filebeat-throttle-plugin

This plugins allows to throttle beat events.

Throttle processor retrieves configuration from separate component called policy manager.

┌──────┐                    ┌──────┐                     ┌──────┐
│Node 1│                    │Node 2│                     │Node 3│
├──────┴─────────────┐      ├──────┴─────────────┐       ├──────┴─────────────┐
│ ┌──────┐  ┌──────┐ │      │ ┌──────┐  ┌──────┐ │       │ ┌──────┐  ┌──────┐ │
│ │      │  │      │ │      │ │      │  │      │ │       │ │      │  │      │ │
│ │      │  │      │ │      │ │      │  │      │ │       │ │      │  │      │ │
│ └──────┘  └──────┘ │      │ └──────┘  └──────┘ │       │ └──────┘  └──────┘ │
│ ┌──────┐  ┌──────┐ │      │ ┌──────┐  ┌──────┐ │       │ ┌──────┐  ┌──────┐ │
│ │      │  │      │ │      │ │      │  │      │ │       │ │      │  │      │ │
│ │      │  │      │ │      │ │      │  │      │ │       │ │      │  │      │ │
│ └──────┘  └──────┘ │      │ └──────┘  └──────┘ │       │ └──────┘  └──────┘ │
│ ┌──────┐  ┌──────┐ │      │ ┌──────┐  ┌──────┐ │       │ ┌──────┐  ┌──────┐ │
│ │      │  │      │ │      │ │      │  │      │ │       │ │      │  │      │ │
│ │      │  │      │ │      │ │      │  │      │ │       │ │      │  │      │ │
│ └──────┘  └──────┘ │      │ └──────┘  └──────┘ │       │ └──────┘  └──────┘ │
│                    │      │                    │       │                    │
│                    │      │                    │       │                    │
│ ┌────────────────┐ │      │ ┌────────────────┐ │       │ ┌────────────────┐ │
│ │    filebeat    │ │      │ │    filebeat    │ │       │ │    filebeat    │ │
│ └────────────────┘ │      │ └────────────────┘ │       │ └────────────────┘ │
│          │         │      │          │         │       │          │         │
└──────────┼─────────┘      └──────────┼─────────┘       └──────────┼─────────┘
           │                           │                            │
           └───────────────┐           │            ┌───────────────┘
                           │           │            │
                           │           │            │
                           ▼           ▼            ▼
                         ┌─────────────────────────────┐
                         │                             │
                         │       Policy Manager        │
                         │                             │
                         └─────────────────────────────┘
                              HTTP /policy endpoint

Configuration

To enable throttling you have to add throttle processor to configuration:

- throttle:
    prometheus_port: 9090
    metric_name: proccessed_records
    metric_labels:
        - from: kubernetes_container_name
            to: container_name
        - from: "labels.app"
            to: app
    policy_host: "http://policymanager.local:8080/policy"
    policy_update_interval: 1s
    bucket_size: 1
    buckets: 1000
  • prometheus_port - prometheus metrics handler to listen on
  • metric_name - name of counter metric with number of processed/throttled events
  • metric_labels - additional fields that will be converted to metric labels
  • policy_host - policy manager host
  • policy_update_interval - how often processor refresh policies
  • buckets - number of buckets
  • bucket_size - bucket duration (in seconds)

Policy Manager

Policy manager exposes configuration by /policy endpoint in following format:

---
limits:
  - value: 500
    conditions:
      kubernetes_container_name: "simple-generator"
  - value: 5000
    conditions:
      kubernetes_namespace: "bx"

value указывает на максимальное разрешенное кол-во сообщений в интервал времени, указанный в bucket_size.
В conditions можно использовать любые поля из событий. Все условия работают по принципу equals, т.е. по полному совпадению.

Throttling algorithm

Для реализации троттлинга используется имплементация token bucket алгоритма. В простейшем случае мы можем хранить только один текущий бакет и считать лимиты по ней. Однако, такой алгоритм плохо работает в случае, когда filebeat некоторое время не работает (плановое обновление, падение и т.д.): в этом случае все накопившиеся сообщения будут попадать в один бакет, что может привести к игнорированию тех сообщений, которые не должны были быть проигнорированы при обычной работе. Чтобы избежать таких проблем, нам нужно хранить информацию о N последних бакетах.

Представим, что processor имеет следующие настройки:

bucket_size: 1 // 1 секунда
buckets: 5

и имеем одно правило с limit: 10

Тогда в момент времени T для этого правила будут существовать такие бакеты:

T - 5        T - 4         T - 3         T - 2         T - 1          T
  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐
  │   0/10   │  │   0/10   │  │   0/10   │  │   0/10   │  │   0/10   │
  └──────────┘  └──────────┘  └──────────┘  └──────────┘  └──────────┘

При добавление события со временем между T-3 и T-2 ситуация изменится на такую:

T - 5        T - 4         T - 3         T - 2         T - 1          T
  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐
  │   0/10   │  │   0/10   │  │   1/10   │  │   0/10   │  │   0/10   │
  └──────────┘  └──────────┘  └──────────┘  └──────────┘  └──────────┘

При переполнении какого-то бакета, все события попадающие в этот временной интервал будут проигнорированы.

В момент времени T + 1 бакеты будут такие:

T - 4        T - 3         T - 2         T - 1           T          T + 1
  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐
  │   0/10   │  │   1/10   │  │   0/10   │  │   0/10   │  │   0/10   │
  └──────────┘  └──────────┘  └──────────┘  └──────────┘  └──────────┘

Т.е. все бакеты сдвинулись на одну влево. Все события со временем старше, чем T-4 будут проигнорированы.

Сборка

Filebeat не предоставляет механизма для динамического подключения плагинов, поэтому нужно пересобирать бинарник filebeat'a с нашим плагином.
Для этого в пакет main filebeat'a подкладывается файл beats/register_ratelimit_plugin.go, который импортирует пакет плагина.

Генератор логов

В generator лежит простой генератор логов, который удобно использовать для локального тестирования.

➜ go get -u gitlab.ozon.ru/sre/filebeat-ratelimit-plugin/generator
➜ generator -h
Usage of generator:
  -id id
        id field value (default "generator")
  -rate float
        records per second (default 10)
Usage of generator:
  -id id
        id field value (default "generator")
  -rate float
        records per second (default 10)
➜ generator -id foo
{"ts":"2019-01-09T18:59:56.109522+03:00", "message": "1", "id":"foo"}
{"ts":"2019-01-09T18:59:56.207788+03:00", "message": "2", "id":"foo"}
{"ts":"2019-01-09T18:59:56.310223+03:00", "message": "3", "id":"foo"}
{"ts":"2019-01-09T18:59:56.409879+03:00", "message": "4", "id":"foo"}
{"ts":"2019-01-09T18:59:56.509572+03:00", "message": "5", "id":"foo"}
{"ts":"2019-01-09T18:59:56.608653+03:00", "message": "6", "id":"foo"}
{"ts":"2019-01-09T18:59:56.708547+03:00", "message": "7", "id":"foo"}
{"ts":"2019-01-09T18:59:56.809872+03:00", "message": "8", "id":"foo"}
^C

С помощью ключа -rate можно управлять кол-вом сообщением в секунду, которое будет выдавать генератор.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewProcessor

func NewProcessor(cfg *common.Config) (processors.Processor, error)

NewProcessor returns new processor instance.

Types

type BucketLimiter

type BucketLimiter struct {
	// contains filtered or unexported fields
}

func NewBucketLimiter

func NewBucketLimiter(bucketInterval, limit, buckets int64, now time.Time) *BucketLimiter

func (*BucketLimiter) Allow

func (bl *BucketLimiter) Allow(t time.Time) bool

Allow returns TRUE if event is allowed to be processed.

func (*BucketLimiter) LastUpdate

func (bl *BucketLimiter) LastUpdate() time.Time

LastUpdate returns last Allow method call time.

func (*BucketLimiter) SetLimit

func (bl *BucketLimiter) SetLimit(limit int64)

SetLimit updates limit value. Note: it's allowed only to change limit, not bucketInterval.

func (*BucketLimiter) WriteStatus

func (bl *BucketLimiter) WriteStatus(w io.Writer) error

WriteStatus writes text based status into Writer.

type ConditionLimiter

type ConditionLimiter struct {
	// contains filtered or unexported fields
}

ConditionLimiter first checks if event is valid for specified conditions and then applies rate limiting.

func NewConditionLimiter

func NewConditionLimiter(fields map[string]string, bucketInterval, limit, buckets int64, now time.Time) (*ConditionLimiter, error)

NewConditionLimiter returns new ConditionLimiter instance.

func (*ConditionLimiter) Allow

func (cl *ConditionLimiter) Allow(t time.Time) bool

Allow returns TRUE if event is allowed to be processed.

func (*ConditionLimiter) Check

func (cl *ConditionLimiter) Check(e *beat.Event) bool

Check checks if event satisfies condition.

func (*ConditionLimiter) SetLimit

func (cl *ConditionLimiter) SetLimit(limit int64)

SetLimit updates limit value. Note: it's allowed only to change limit, not bucketInterval.

func (*ConditionLimiter) WriteStatus

func (cl *ConditionLimiter) WriteStatus(w io.Writer) error

WriteStatus writes text based status into Writer.

type Config

type Config struct {
	PolicyHost           string        `config:"policy_host"`
	PolicyUpdateInterval time.Duration `config:"policy_update_interval"`
	PrometheusPort       int           `config:"prometheus_port"`

	BucketSize int64 `config:"bucket_size"`
	Buckets    int64 `config:"buckets"`

	MetricName   string         `config:"metric_name"`
	MetricLabels []LabelMapping `config:"metric_labels"`
}

Config defines processor configuration.

func (Config) GetFields

func (c Config) GetFields() []string

func (Config) GetMetricLabels

func (c Config) GetMetricLabels() []string

type LabelMapping

type LabelMapping struct {
	From string `config:"from"`
	To   string `config:"to"`
}

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor make rate-limiting for messages.

func (*Processor) Close

func (mp *Processor) Close() error

func (*Processor) Run

func (mp *Processor) Run(event *beat.Event) (*beat.Event, error)

func (*Processor) RunHTTPHandlers

func (mp *Processor) RunHTTPHandlers(port int)

RunHTTPHandlers runs prometheus handler on specified port.

func (*Processor) String

func (mp *Processor) String() string

type RemoteConfig

type RemoteConfig struct {
	Key          string       `yaml:"key"`
	DefaultLimit int64        `yaml:"default_limit"`
	Rules        []RuleConfig `yaml:"rules"`
}

type RemoteLimiter

type RemoteLimiter struct {
	// contains filtered or unexported fields
}

func NewRemoteLimiter

func NewRemoteLimiter(url string, bucketInterval, buckets int64) (*RemoteLimiter, error)

NewRemoteLimiter creates new remote limiter instance.

func (*RemoteLimiter) Allow

func (rl *RemoteLimiter) Allow(e *beat.Event) bool

Allow returns TRUE if event is allowed to be processed.

func (*RemoteLimiter) Update

func (rl *RemoteLimiter) Update(ctx context.Context) error

Update retrieves policies from Policy Manager.

func (*RemoteLimiter) UpdateWithInterval

func (rl *RemoteLimiter) UpdateWithInterval(ctx context.Context, interval time.Duration) error

UpdateWithInterval runs update with some interval.

func (*RemoteLimiter) WriteStatus

func (rl *RemoteLimiter) WriteStatus(w io.Writer) error

type Rule

type Rule struct {
	// contains filtered or unexported fields
}

func NewRule

func NewRule(fields map[string]string, limit int64) Rule

NewRule returns new Rule instance.

func (Rule) Limit

func (r Rule) Limit() int64

Limit returns current limit.

func (Rule) Match

func (r Rule) Match(e *beat.Event) (ok bool, key string)

Match checks if event has the same field values as expected.

type RuleConfig

type RuleConfig struct {
	Limit     int64             `yaml:"limit"`
	Selectors map[string]string `yaml:"selectors"`
}

Directories

Path Synopsis
register
compile
package main contains only import to register throttle plugin in filebeat.
package main contains only import to register throttle plugin in filebeat.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL