postgres

package
v0.33.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2024 License: BSD-3-Clause Imports: 17 Imported by: 0

README

Postgres output

It sends the event batches to postgres db using pgx.

Config params

strict bool default=false

Deprecated. Use strict_fields flag instead.


strict_fields bool default=false

In strict mode file.d will crash on events without required fields.


conn_string string required

PostgreSQL connection string in URL or DSN format.

Example DSN:

user=user password=secret host=pg.example.com port=5432 dbname=mydb sslmode=disable pool_max_conns=10


table string required

Pg target table.


columns []ConfigColumn required

Array of DB columns. Each column have: name, type (int, string, timestamp - which int that will be converted to timestamptz of rfc3339) and nullable options.


retry int default=10

Retries of insertion. If File.d cannot insert for this number of attempts, File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).


fatal_on_failed_insert bool default=false

After an insert error, fall with a non-zero exit code or not Experimental feature


retention cfg.Duration default=50ms

Retention milliseconds for retry to DB.


db_request_timeout cfg.Duration default=3000ms

Multiplier for exponential increase of retention between retries cfg.Duration default=3000ms

Timeout for DB requests in milliseconds.


db_health_check_period cfg.Duration default=60s

Timeout for DB health check.


workers_count cfg.Expression default=gomaxprocs*4

How much workers will be instantiated to send batches.


batch_size cfg.Expression default=capacity/4

Maximum quantity of events to pack into one batch.


batch_size

batch_size_bytes cfg.Expression default=0

A minimum size of events in a batch to send. If both batch_size and batch_size_bytes are set, they will work together.


batch_flush_timeout cfg.Duration default=200ms

After this timeout batch will be sent even if batch isn't completed.


Example

Example Postgres output example:

pipelines:
  example_pipeline:
    input:
      type: file
      persistence_mode: async
      watching_dir: ./
      filename_pattern: input_example.json
      offsets_file: ./offsets.yaml
      offsets_op: reset
	output:
      type: postgres
      conn_string: "user=postgres host=localhost port=5432 dbname=postgres sslmode=disable pool_max_conns=10"
      table: events
      columns:
        - name: id
          type: int
        - name: name
          type: string
      retry: 10
      retention: 1s
      retention_exponentially_multiplier: 1.5

input_example.json

{"id":1,"name":"name1"}
{"id":2,"name":"name2"}


Generated using insane-doc

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEventDoesntHaveField             = errors.New("event doesn't have field")
	ErrEventFieldHasWrongType           = errors.New("event field has wrong type")
	ErrTimestampFromDistantPastOrFuture = errors.New("event field contains timestamp < 1970 or > 9000 year")
)
View Source
var ErrEmptyTableName = errors.New("table name can't be empty string")
View Source
var ErrNoColumns = errors.New("no pg columns in config")

Functions

func Factory

func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig)

Types

type Config

type Config struct {
	// > @3@4@5@6
	// >
	// > Deprecated. Use `strict_fields` flag instead.
	Strict bool `json:"strict" default:"false"` // *

	// > @3@4@5@6
	// >
	// > In strict mode file.d will crash on events without required fields.
	// Otherwise, events will be discarded.
	StrictFields bool `json:"strict_fields" default:"false"` // *

	// > @3@4@5@6
	// >
	// > PostgreSQL connection string in URL or DSN format.
	// >
	// > Example DSN:
	// >
	// > `user=user password=secret host=pg.example.com port=5432 dbname=mydb sslmode=disable pool_max_conns=10`
	ConnString string `json:"conn_string" required:"true"` // *

	// > @3@4@5@6
	// >
	// > Pg target table.
	Table string `json:"table" required:"true"` // *

	// > @3@4@5@6
	// >
	// > Array of DB columns. Each column have:
	// > name, type (int, string, timestamp - which int that will be converted to timestamptz of rfc3339)
	// > and nullable options.
	Columns []ConfigColumn `json:"columns" required:"true" slice:"true"` // *

	// > @3@4@5@6
	// >
	// > Retries of insertion. If File.d cannot insert for this number of attempts,
	// > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert).
	Retry int `json:"retry" default:"10"` // *

	// > @3@4@5@6
	// >
	// > After an insert error, fall with a non-zero exit code or not
	// > **Experimental feature**
	FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // *

	// > @3@4@5@6
	// >
	// > Retention milliseconds for retry to DB.
	Retention  cfg.Duration `json:"retention" default:"50ms" parse:"duration"` // *
	Retention_ time.Duration

	// > @3@4@5@6
	// >
	// > Multiplier for exponential increase of retention between retries
	RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"`

	// > @3@4@5@6
	// >
	// > Timeout for DB requests in milliseconds.
	DBRequestTimeout  cfg.Duration `json:"db_request_timeout" default:"3000ms" parse:"duration"` // *
	DBRequestTimeout_ time.Duration

	// > @3@4@5@6
	// >
	// > Timeout for DB health check.
	DBHealthCheckPeriod  cfg.Duration `json:"db_health_check_period" default:"60s" parse:"duration"` // *
	DBHealthCheckPeriod_ time.Duration

	// > @3@4@5@6
	// >
	// > How much workers will be instantiated to send batches.
	WorkersCount  cfg.Expression `json:"workers_count" default:"gomaxprocs*4" parse:"expression"` // *
	WorkersCount_ int

	// > @3@4@5@6
	// >
	// > Maximum quantity of events to pack into one batch.
	BatchSize cfg.Expression `json:"batch_size" default:"capacity/4"  parse:"expression"` // *
	// BatchSize  cfg.Expression `json:"batch_size" default:"capacity/4" parse:"expression"` // *
	BatchSize_ int

	// > @3@4@5@6
	// >
	// > A minimum size of events in a batch to send.
	// > If both batch_size and batch_size_bytes are set, they will work together.
	BatchSizeBytes  cfg.Expression `json:"batch_size_bytes" default:"0" parse:"expression"` // *
	BatchSizeBytes_ int

	// > @3@4@5@6
	// >
	// > After this timeout batch will be sent even if batch isn't completed.
	BatchFlushTimeout  cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // *
	BatchFlushTimeout_ time.Duration
}

! config-params ^ config-params

type ConfigColumn

type ConfigColumn struct {
	Name       string `json:"name" required:"true"`
	ColumnType string `json:"type" required:"true" options:"int|string|bool|timestamp"`
	Unique     bool   `json:"unique" default:"false"`
}

type PgQueryBuilder

type PgQueryBuilder interface {
	GetPgFields() []column
	GetUniqueFields() map[string]pgType
	GetInsertBuilder() sq.InsertBuilder
	GetPostfix() string
}

func NewQueryBuilder

func NewQueryBuilder(cfgColumns []ConfigColumn, table string) (PgQueryBuilder, error)

NewQueryBuilder returns new instance of builder.

type PgxIface

type PgxIface interface {
	Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
	Close()
}

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) Out

func (p *Plugin) Out(event *pipeline.Event)

func (*Plugin) Start

func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams)

func (*Plugin) Stop

func (p *Plugin) Stop()

Directories

Path Synopsis
Package mock_postgres is a generated GoMock package.
Package mock_postgres is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL