testsupport

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DatabaseSchema = "tsdb"
)

Variables

This section is empty.

Functions

func CreateHypertable

func CreateHypertable(
	pool *pgxpool.Pool, timeDimension string, chunkSize time.Duration, columns ...Column,
) (string, string, error)

func CreateTempFile

func CreateTempFile(
	pattern string,
) (string, error)

func RandomNumber added in v0.4.0

func RandomNumber(min, max int) int

Types

type CollectedEvent

type CollectedEvent struct {
	Timestamp time.Time
	TopicName string
	Envelope  Envelope
}

type Column

type Column struct {
	// contains filtered or unexported fields
}

func NewColumn

func NewColumn(
	name, pgType string, nullable, primaryKey bool, defaultValue *string,
) Column

func (Column) DefaultValue

func (c Column) DefaultValue() *string

func (Column) IsNullable

func (c Column) IsNullable() bool

func (Column) IsPrimaryKey

func (c Column) IsPrimaryKey() bool

func (Column) Name

func (c Column) Name() string

func (Column) PgType

func (c Column) PgType() string

type Envelope

type Envelope struct {
	Raw     map[string]any
	Payload Payload `json:"payload"`
	Schema  Schema  `json:"schema"`
}

type EventCollectorSink

type EventCollectorSink struct {
	// contains filtered or unexported fields
}

func NewEventCollectorSink

func NewEventCollectorSink(
	options ...EventCollectorSinkOption,
) *EventCollectorSink

func (*EventCollectorSink) Clear

func (t *EventCollectorSink) Clear()

func (*EventCollectorSink) Emit

func (t *EventCollectorSink) Emit(
	_ sink.Context, timestamp time.Time, topicName string, key, envelope schema.Struct,
) error

func (*EventCollectorSink) Events

func (t *EventCollectorSink) Events() []CollectedEvent

func (*EventCollectorSink) NumOfEvents

func (t *EventCollectorSink) NumOfEvents() int

func (*EventCollectorSink) Start

func (t *EventCollectorSink) Start() error

func (*EventCollectorSink) Stop

func (t *EventCollectorSink) Stop() error

func (*EventCollectorSink) SystemConfigConfigurator

func (t *EventCollectorSink) SystemConfigConfigurator(
	config *sysconfig.SystemConfig,
)

type EventCollectorSinkOption

type EventCollectorSinkOption = func(eventCollectorSink *EventCollectorSink)

func WithFilter

func WithFilter(
	filter func(timestamp time.Time, topicName string, envelope Envelope) bool,
) EventCollectorSinkOption

func WithPostHook

func WithPostHook(
	fn func(sink *EventCollectorSink),
) EventCollectorSinkOption

func WithPreHook

func WithPreHook(
	fn func(sink *EventCollectorSink),
) EventCollectorSinkOption

type Field

type Field struct {
	Name        string      `json:"name"`
	Field       string      `json:"field"`
	Optional    bool        `json:"optional"`
	Type        schema.Type `json:"type"`
	Fields      []Field     `json:"fields"`
	Default     any         `json:"default"`
	KeySchema   *Schema     `json:"keySchema"`
	ValueSchema *Schema     `json:"valueSchema"`
}

func GetField

func GetField(
	name string, fields []Field,
) (Field, bool)

type ForwarderSink

type ForwarderSink struct {
	// contains filtered or unexported fields
}

func NewForwarderSink

func NewForwarderSink(
	fn func(timestamp time.Time, topicName string, envelope schema.Struct) error,
) *ForwarderSink

func (*ForwarderSink) Emit

func (t *ForwarderSink) Emit(
	timestamp time.Time, topicName string, envelope schema.Struct,
) error

type Payload

type Payload struct {
	Before map[string]any            `json:"before"`
	After  map[string]any            `json:"after"`
	Op     schema.Operation          `json:"op"`
	TsdbOp schema.TimescaleOperation `json:"tsdb_op"`
	Source Source                    `json:"source"`
	TsMs   uint64                    `json:"ts_ms"`
}

type Schema

type Schema struct {
	Fields   []Field     `json:"fields"`
	Name     string      `json:"name"`
	Optional bool        `json:"optional"`
	Type     schema.Type `json:"type"`
	Length   *int        `json:"length"`
}

type Source

type Source struct {
	Connector string `json:"connector"`
	DB        string `json:"db"`
	LSN       string `json:"lsn"`
	TxId      uint32 `json:"txId"`
	Name      string `json:"name"`
	Schema    string `json:"schema"`
	Snapshot  bool   `json:"snapshot"`
	Table     string `json:"table"`
	TsMs      uint64 `json:"ts_ms"`
	Version   string `json:"version"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL