s3

package
v0.23.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 29, 2024 License: BSD-3-Clause Imports: 23 Imported by: 0

README

s3 output

Sends events to s3 output of one or multiple buckets. bucket is default bucket for events. Addition buckets can be described in multi_buckets section, example down here. Field "bucket_field_event" is filed name, that will be searched in event. If appears we try to send event to this bucket instead of described here.

⚠ Currently bucket names for bucket and multi_buckets can't intersect.

⚠ If dynamic bucket moved to config it can leave some not send data behind. To send this data to s3 move bucket dir from /var/log/dynamic_buckets/bucketName to /var/log/static_buckets/bucketName (/var/log is default path) and restart file.d

Example Standard example:

pipelines:
  mkk:
    settings:
      capacity: 128
    # input plugin is not important in this case, let's emulate http input.
    input:
      type: http
      emulate_mode: "no"
      address: ":9200"
	actions:
	- type: json_decode
		field: message
    output:
      type: s3
      file_config:
        retention_interval: 10s
      # endpoint, access_key, secret_key, bucket are required.
      endpoint: "s3.fake_host.org:80"
      access_key: "access_key1"
      secret_key: "secret_key2"
      bucket: "bucket-logs"
      bucket_field_event: "bucket_name"

Example with fan-out buckets:

pipelines:
  mkk:
    settings:
      capacity: 128
    # input plugin is not important in this case, let's emulate http input.
    input:
      type: http
      emulate_mode: "no"
      address: ":9200"
	actions:
	- type: json_decode
		field: message
    output:
      type: s3
      file_config:
        retention_interval: 10s
      # endpoint, access_key, secret_key, bucket are required.
      endpoint: "s3.fake_host.org:80"
      access_key: "access_key1"
      secret_key: "secret_key2"
      bucket: "bucket-logs"
      # bucket_field_event - event with such field will be sent to bucket with its value
      # if such exists: {"bucket_name": "secret", "message": 123} to bucket "secret".
      bucket_field_event: "bucket_name"
      # multi_buckets is optional, contains array of buckets.
      multi_buckets:
        - endpoint: "otherS3.fake_host.org:80"
          access_key: "access_key2"
          secret_key: "secret_key2"
          bucket: "bucket-logs-2"
        - endpoint: "yet_anotherS3.fake_host.ru:80"
          access_key: "access_key3"
          secret_key: "secret_key3"
          bucket: "bucket-logs-3"
Config params

file_config file.Config

Under the hood this plugin uses /plugin/output/file/ to collect logs.


compression_type string default=zip options=zip

Compressed files format.


endpoint string required

Address of default bucket.


access_key string required

S3 access key.


secret_key string required

S3 secret key.


bucket string required

Main S3 bucket.


multi_buckets ``json:"multi_buckets"`

Additional buckets, which can also receive event. Event with bucket_name field sends to such s3 bucket.


secure bool default=false

S3 connection secure option.


bucket_field_event string

Change destination bucket of event. Fallback to DefaultBucket if BucketEventField bucket doesn't exist.


dynamic_buckets_limit int default=32

Regulates number of buckets that can be created dynamically.


upload_timeout cfg.Duration default=1m

Sets upload timeout.


retry int default=10

Retries of upload. If File.d cannot upload for this number of attempts, File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).


fatal_on_failed_insert bool default=false

After an insert error, fall with a non-zero exit code or not Experimental feature


retention cfg.Duration default=1s

Retention milliseconds for retry to upload.


retention_exponentially_multiplier int default=2

Multiplier for exponential increase of retention between retries



Generated using insane-doc

Documentation

Index

Constants

View Source
const (
	StaticBucketDir  = "static_buckets"
	DynamicBucketDir = "dynamic_buckets"
)

Variables

This section is empty.

Functions

func Factory

func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig)

Types

type Config

type Config struct {
	// > @3@4@5@6
	// >
	// > Under the hood this plugin uses /plugin/output/file/ to collect logs.
	FileConfig file.Config `json:"file_config" child:"true"` // *

	// > @3@4@5@6
	// >
	// > Compressed files format.
	CompressionType string `json:"compression_type" default:"zip" options:"zip"` // *

	// s3 section
	// > @3@4@5@6
	// >
	// > Address of default bucket.
	Endpoint string `json:"endpoint" required:"true"` // *

	// > @3@4@5@6
	// >
	// > S3 access key.
	AccessKey string `json:"access_key" required:"true"` // *

	// > @3@4@5@6
	// >
	// > S3 secret key.
	SecretKey string `json:"secret_key" required:"true"` // *

	// > @3@4@5@6
	// >
	// > Main S3 bucket.
	DefaultBucket string `json:"bucket" required:"true"` // *

	// > @3@4@5@6
	// >
	// > Additional buckets, which can also receive event.
	// > Event with bucket_name field sends to such s3 bucket.
	MultiBuckets `json:"multi_buckets" required:"false"` // *

	// > @3@4@5@6
	// >
	// > S3 connection secure option.
	Secure bool `json:"secure" default:"false"` // *

	// > @3@4@5@6
	// >
	// > Change destination bucket of event.
	// > Fallback to DefaultBucket if BucketEventField bucket doesn't exist.
	BucketEventField string `json:"bucket_field_event" default:""` // *

	// > @3@4@5@6
	// >
	// > Regulates number of buckets that can be created dynamically.
	DynamicBucketsLimit int `json:"dynamic_buckets_limit" default:"32"` // *

	// > @3@4@5@6
	// >
	// > Sets upload timeout.
	UploadTimeout  cfg.Duration `json:"upload_timeout" default:"1m" parse:"duration"` // *
	UploadTimeout_ time.Duration

	// > @3@4@5@6
	// >
	// > Retries of upload. If File.d cannot upload for this number of attempts,
	// > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
	Retry int `json:"retry" default:"10"` // *

	// > @3@4@5@6
	// >
	// > After an insert error, fall with a non-zero exit code or not
	// > **Experimental feature**
	FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *

	// > @3@4@5@6
	// >
	// > Retention milliseconds for retry to upload.
	Retention  cfg.Duration `json:"retention" default:"1s" parse:"duration"` // *
	Retention_ time.Duration

	// > @3@4@5@6
	// >
	// > Multiplier for exponential increase of retention between retries
	RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // *
}

! config-params ^ config-params

func (*Config) IsMultiBucketExists

func (c *Config) IsMultiBucketExists(bucketName string) bool

type MultiBuckets

type MultiBuckets []singleBucketConfig

type ObjectStoreClient

type ObjectStoreClient interface {
	MakeBucket(bucketName string, location string) (err error)
	BucketExists(bucketName string) (bool, error)
	FPutObjectWithContext(ctx context.Context, bucketName, objectName, filePath string, opts minio.PutObjectOptions) (n int64, err error)
}

type ObjectStoreClientLimiter

type ObjectStoreClientLimiter struct {
	// contains filtered or unexported fields
}

func NewObjectStoreClientLimiter

func NewObjectStoreClientLimiter(limit int) *ObjectStoreClientLimiter

func (*ObjectStoreClientLimiter) CanCreate

func (limiter *ObjectStoreClientLimiter) CanCreate() bool

func (*ObjectStoreClientLimiter) Increment

func (limiter *ObjectStoreClientLimiter) Increment()

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) Out

func (p *Plugin) Out(event *pipeline.Event)

func (*Plugin) Start

func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams)

func (*Plugin) StartWithMinio

func (p *Plugin) StartWithMinio(config pipeline.AnyConfig, params *pipeline.OutputPluginParams, factory objStoreFactory)

func (*Plugin) Stop

func (p *Plugin) Stop()

Directories

Path Synopsis
Package mock_s3 is a generated GoMock package.
Package mock_s3 is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL