postgres

package
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2022 License: BSD-3-Clause Imports: 15 Imported by: 0

README

postgres output

It sends the event batches to postgres db using pgx.

Config params

strict bool default=false

In strict mode file.d will crash on events without required columns.


host string required

DB host.


port uint16 required

Db port.


dbname string required

Dbname in pg.


user string required

Pg user name.


password string required

Pg user pass.


table string required

Pg target table.


columns []ConfigColumn required

Array of DB columns. Each column have: name, type (int, string, timestamp - which int that will be converted to timestamptz of rfc3339) and nullable options.


retry int default=3

Retries of insertion.


retention cfg.Duration default=50ms

Retention milliseconds for retry to DB.


db_request_timeout cfg.Duration default=3000ms

Timeout for DB requests in milliseconds.


db_health_check_period cfg.Duration default=60s

Timeout for DB health check.


workers_count cfg.Expression default=gomaxprocs*4

How much workers will be instantiated to send batches.


batch_size cfg.Expression default=capacity/4

Maximum quantity of events to pack into one batch.


batch_size

batch_flush_timeout cfg.Duration default=200ms

After this timeout batch will be sent even if batch isn't completed.



Generated using insane-doc

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEventDoesntHaveField   = errors.New("event doesn't have field")
	ErrEventFieldHasWrongType = errors.New("event field has wrong type")
)
View Source
var ErrEmptyTableName = errors.New("table name can't be empty string")
View Source
var ErrNoColumns = errors.New("no pg columns in config")

Functions

func Factory

func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig)

Types

type Config

type Config struct {
	//> @3@4@5@6
	//>
	//> In strict mode file.d will crash on events without required columns.
	// Otherwise events will be discarded.
	Strict bool `json:"strict" default:"false"` //*

	//> @3@4@5@6
	//>
	//> DB host.
	Host string `json:"host" required:"true"` //*

	//> @3@4@5@6
	//>
	//> Db port.
	Port uint16 `json:"port" required:"true"` //*

	//> @3@4@5@6
	//>
	//> Dbname in pg.
	DBName string `json:"dbname" required:"true"` //*

	//> @3@4@5@6
	//>
	//> Pg user name.
	Username string `json:"user" required:"true"` //*

	//> @3@4@5@6
	//>
	//> Pg user pass.
	Pass string `json:"password" required:"true"` //*

	//> @3@4@5@6
	//>
	//> Pg target table.
	Table string `json:"table" required:"true"` //*

	//> @3@4@5@6
	//>
	//> Array of DB columns. Each column have:
	//> name, type (int, string, timestamp - which int that will be converted to timestamptz of rfc3339)
	//> and nullable options.
	Columns []ConfigColumn `json:"columns" required:"true" slice:"true"` //*

	//> @3@4@5@6
	//>
	//> Retries of insertion.
	Retry int `json:"retry" default:"3"` //*

	//> @3@4@5@6
	//>
	//> Retention milliseconds for retry to DB.
	Retention  cfg.Duration `json:"retention" default:"50ms" parse:"duration"` //*
	Retention_ time.Duration

	//> @3@4@5@6
	//>
	//> Timeout for DB requests in milliseconds.
	DBRequestTimeout  cfg.Duration `json:"db_request_timeout" default:"3000ms" parse:"duration"` //*
	DBRequestTimeout_ time.Duration

	//> @3@4@5@6
	//>
	//> Timeout for DB health check.
	DBHealthCheckPeriod  cfg.Duration `json:"db_health_check_period" default:"60s" parse:"duration"` //*
	DBHealthCheckPeriod_ time.Duration

	//> @3@4@5@6
	//>
	//> How much workers will be instantiated to send batches.
	WorkersCount  cfg.Expression `json:"workers_count" default:"gomaxprocs*4" parse:"expression"` //*
	WorkersCount_ int

	//> @3@4@5@6
	//>
	//> Maximum quantity of events to pack into one batch.
	BatchSize cfg.Expression `json:"batch_size" default:"capacity/4"  parse:"expression"` //*
	//BatchSize  cfg.Expression `json:"batch_size" default:"capacity/4" parse:"expression"` //*
	BatchSize_ int

	//> @3@4@5@6
	//>
	//> After this timeout batch will be sent even if batch isn't completed.
	BatchFlushTimeout  cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` //*
	BatchFlushTimeout_ time.Duration
}

! config-params ^ config-params

type ConfigColumn

type ConfigColumn struct {
	Name       string `json:"name" required:"true"`
	ColumnType string `json:"type" required:"true" options:"int|string|bool|timestamp"`
	Unique     bool   `json:"unique" default:"false"`
}

type PgQueryBuilder

type PgQueryBuilder interface {
	GetPgFields() []column
	GetUniqueFields() map[string]pgType
	GetInsertBuilder() sq.InsertBuilder
	GetPostfix() string
}

func NewQueryBuilder

func NewQueryBuilder(cfgColumns []ConfigColumn, table string) (PgQueryBuilder, error)

NewQueryBuilder returns new instance of builder.

type PgxIface

type PgxIface interface {
	Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
	Close()
}

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) Out

func (p *Plugin) Out(event *pipeline.Event)

func (*Plugin) Start

func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams)

func (*Plugin) Stop

func (p *Plugin) Stop()

Directories

Path Synopsis
Package mock_postgres is a generated GoMock package.
Package mock_postgres is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL