postgres

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2023 License: BSD-3-Clause Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEventDoesntHaveField             = errors.New("event doesn't have field")
	ErrEventFieldHasWrongType           = errors.New("event field has wrong type")
	ErrTimestampFromDistantPastOrFuture = errors.New("event field contains timestamp < 1970 or > 9000 year")
)
View Source
var ErrEmptyTableName = errors.New("table name can't be empty string")
View Source
var ErrNoColumns = errors.New("no pg columns in config")

Functions

func Factory

func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig)

Types

type Config

type Config struct {
	// > @3@4@5@6
	// >
	// > Deprecated. Use `strict_fields` flag instead.
	Strict bool `json:"strict" default:"false"` // *

	// > @3@4@5@6
	// >
	// > In strict mode file.d will crash on events without required fields.
	// Otherwise, events will be discarded.
	StrictFields bool `json:"strict_fields" default:"false"` // *

	// > @3@4@5@6
	// >
	// > PostgreSQL connection string in URL or DSN format.
	// >
	// > Example DSN:
	// >
	// > `user=user password=secret host=pg.example.com port=5432 dbname=mydb sslmode=disable pool_max_conns=10`
	ConnString string `json:"conn_string" required:"true"` // *

	// > @3@4@5@6
	// >
	// > Pg target table.
	Table string `json:"table" required:"true"` // *

	// > @3@4@5@6
	// >
	// > Array of DB columns. Each column have:
	// > name, type (int, string, timestamp - which int that will be converted to timestamptz of rfc3339)
	// > and nullable options.
	Columns []ConfigColumn `json:"columns" required:"true" slice:"true"` // *

	// > @3@4@5@6
	// >
	// > Retries of insertion.
	Retry int `json:"retry" default:"3"` // *

	// > @3@4@5@6
	// >
	// > Retention milliseconds for retry to DB.
	Retention  cfg.Duration `json:"retention" default:"50ms" parse:"duration"` // *
	Retention_ time.Duration

	// > @3@4@5@6
	// >
	// > Timeout for DB requests in milliseconds.
	DBRequestTimeout  cfg.Duration `json:"db_request_timeout" default:"3000ms" parse:"duration"` // *
	DBRequestTimeout_ time.Duration

	// > @3@4@5@6
	// >
	// > Timeout for DB health check.
	DBHealthCheckPeriod  cfg.Duration `json:"db_health_check_period" default:"60s" parse:"duration"` // *
	DBHealthCheckPeriod_ time.Duration

	// > @3@4@5@6
	// >
	// > How much workers will be instantiated to send batches.
	WorkersCount  cfg.Expression `json:"workers_count" default:"gomaxprocs*4" parse:"expression"` // *
	WorkersCount_ int

	// > @3@4@5@6
	// >
	// > Maximum quantity of events to pack into one batch.
	BatchSize cfg.Expression `json:"batch_size" default:"capacity/4"  parse:"expression"` // *
	// BatchSize  cfg.Expression `json:"batch_size" default:"capacity/4" parse:"expression"` // *
	BatchSize_ int

	// > @3@4@5@6
	// >
	// > A minimum size of events in a batch to send.
	// > If both batch_size and batch_size_bytes are set, they will work together.
	BatchSizeBytes  cfg.Expression `json:"batch_size_bytes" default:"0" parse:"expression"` // *
	BatchSizeBytes_ int

	// > @3@4@5@6
	// >
	// > After this timeout batch will be sent even if batch isn't completed.
	BatchFlushTimeout  cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // *
	BatchFlushTimeout_ time.Duration
}

! config-params ^ config-params

type ConfigColumn

type ConfigColumn struct {
	Name       string `json:"name" required:"true"`
	ColumnType string `json:"type" required:"true" options:"int|string|bool|timestamp"`
	Unique     bool   `json:"unique" default:"false"`
}

type PgQueryBuilder

type PgQueryBuilder interface {
	GetPgFields() []column
	GetUniqueFields() map[string]pgType
	GetInsertBuilder() sq.InsertBuilder
	GetPostfix() string
}

func NewQueryBuilder

func NewQueryBuilder(cfgColumns []ConfigColumn, table string) (PgQueryBuilder, error)

NewQueryBuilder returns new instance of builder.

type PgxIface

type PgxIface interface {
	Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
	Close()
}

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) Out

func (p *Plugin) Out(event *pipeline.Event)

func (*Plugin) Start

func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams)

func (*Plugin) Stop

func (p *Plugin) Stop()

Directories

Path Synopsis
Package mock_postgres is a generated GoMock package.
Package mock_postgres is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL