postgres

package
v0.5.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2022 License: BSD-3-Clause Imports: 15 Imported by: 0

README

postgres output

It sends the event batches to postgres db using pgx.

Config params

strict bool default=false

In strict mode file.d will crash on events without required columns.


conn_string string required

PostgreSQL connection string in URL or DSN format.

Example DSN:

user=user password=secret host=pg.example.com port=5432 dbname=mydb sslmode=disable pool_max_conns=10


table string required

Pg target table.


columns []ConfigColumn required

Array of DB columns. Each column have: name, type (int, string, timestamp - which int that will be converted to timestamptz of rfc3339) and nullable options.


retry int default=3

Retries of insertion.


retention cfg.Duration default=50ms

Retention milliseconds for retry to DB.


db_request_timeout cfg.Duration default=3000ms

Timeout for DB requests in milliseconds.


db_health_check_period cfg.Duration default=60s

Timeout for DB health check.


workers_count cfg.Expression default=gomaxprocs*4

How much workers will be instantiated to send batches.


batch_size cfg.Expression default=capacity/4

Maximum quantity of events to pack into one batch.


batch_size

batch_size_bytes cfg.Expression default=0

A minimum size of events in a batch to send. If both batch_size and batch_size_bytes are set, they will work together.


batch_flush_timeout cfg.Duration default=200ms

After this timeout batch will be sent even if batch isn't completed.



Generated using insane-doc

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEventDoesntHaveField   = errors.New("event doesn't have field")
	ErrEventFieldHasWrongType = errors.New("event field has wrong type")
)
View Source
var ErrEmptyTableName = errors.New("table name can't be empty string")
View Source
var ErrNoColumns = errors.New("no pg columns in config")

Functions

func Factory

func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig)

Types

type Config

type Config struct {
	//> @3@4@5@6
	//>
	//> In strict mode file.d will crash on events without required columns.
	// Otherwise events will be discarded.
	Strict bool `json:"strict" default:"false"` //*

	//> @3@4@5@6
	//>
	//> PostgreSQL connection string in URL or DSN format.
	//>
	//> Example DSN:
	//>
	//> `user=user password=secret host=pg.example.com port=5432 dbname=mydb sslmode=disable pool_max_conns=10`
	ConnString string `json:"conn_string" required:"true"` //*

	//> @3@4@5@6
	//>
	//> Pg target table.
	Table string `json:"table" required:"true"` //*

	//> @3@4@5@6
	//>
	//> Array of DB columns. Each column have:
	//> name, type (int, string, timestamp - which int that will be converted to timestamptz of rfc3339)
	//> and nullable options.
	Columns []ConfigColumn `json:"columns" required:"true" slice:"true"` //*

	//> @3@4@5@6
	//>
	//> Retries of insertion.
	Retry int `json:"retry" default:"3"` //*

	//> @3@4@5@6
	//>
	//> Retention milliseconds for retry to DB.
	Retention  cfg.Duration `json:"retention" default:"50ms" parse:"duration"` //*
	Retention_ time.Duration

	//> @3@4@5@6
	//>
	//> Timeout for DB requests in milliseconds.
	DBRequestTimeout  cfg.Duration `json:"db_request_timeout" default:"3000ms" parse:"duration"` //*
	DBRequestTimeout_ time.Duration

	//> @3@4@5@6
	//>
	//> Timeout for DB health check.
	DBHealthCheckPeriod  cfg.Duration `json:"db_health_check_period" default:"60s" parse:"duration"` //*
	DBHealthCheckPeriod_ time.Duration

	//> @3@4@5@6
	//>
	//> How much workers will be instantiated to send batches.
	WorkersCount  cfg.Expression `json:"workers_count" default:"gomaxprocs*4" parse:"expression"` //*
	WorkersCount_ int

	//> @3@4@5@6
	//>
	//> Maximum quantity of events to pack into one batch.
	BatchSize cfg.Expression `json:"batch_size" default:"capacity/4"  parse:"expression"` //*
	//BatchSize  cfg.Expression `json:"batch_size" default:"capacity/4" parse:"expression"` //*
	BatchSize_ int

	//> @3@4@5@6
	//>
	//> A minimum size of events in a batch to send.
	//> If both batch_size and batch_size_bytes are set, they will work together.
	BatchSizeBytes  cfg.Expression `json:"batch_size_bytes" default:"0" parse:"expression"` //*
	BatchSizeBytes_ int

	//> @3@4@5@6
	//>
	//> After this timeout batch will be sent even if batch isn't completed.
	BatchFlushTimeout  cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` //*
	BatchFlushTimeout_ time.Duration
}

! config-params ^ config-params

type ConfigColumn

type ConfigColumn struct {
	Name       string `json:"name" required:"true"`
	ColumnType string `json:"type" required:"true" options:"int|string|bool|timestamp"`
	Unique     bool   `json:"unique" default:"false"`
}

type PgQueryBuilder

type PgQueryBuilder interface {
	GetPgFields() []column
	GetUniqueFields() map[string]pgType
	GetInsertBuilder() sq.InsertBuilder
	GetPostfix() string
}

func NewQueryBuilder

func NewQueryBuilder(cfgColumns []ConfigColumn, table string) (PgQueryBuilder, error)

NewQueryBuilder returns new instance of builder.

type PgxIface

type PgxIface interface {
	Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
	Close()
}

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) Out

func (p *Plugin) Out(event *pipeline.Event)

func (*Plugin) Start

func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams)

func (*Plugin) Stop

func (p *Plugin) Stop()

Directories

Path Synopsis
Package mock_postgres is a generated GoMock package.
Package mock_postgres is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL