sql

package
v0.5.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New(connector Connector, pattern string, conf *stream.Config, options ...Option) (stream eventstream.Streamer, err error)

New stream for SQL type integrations

func NewStreamSQL

func NewStreamSQL(id string, connector Connector, options ...Option) (eventstream.Streamer, error)

NewStreamSQL creates streamer object for SQL based stream integration

Types

type Connector

type Connector interface {
	Connection() (*sql.DB, error)
}

Connector to DB

type Option

type Option func(opts *Options) error

Option type of the stream

func WithBlockSize

func WithBlockSize(size int) Option

WithBlockSize setup the size of writing block

func WithDebug

func WithDebug(debug bool) Option

WithDebug setup the debug mode ON/OFF

func WithFlushIntervals

func WithFlushIntervals(interval time.Duration) Option

WithFlushIntervals setup interval betwin flashes

func WithLogger added in v0.4.0

func WithLogger(logger *zap.Logger) Option

WithLogger setup the logging object

func WithQuery added in v0.4.3

func WithQuery(query string, qOpts ...QueryOption) Option

WithQuery setup query object

func WithQueryObject added in v0.2.1

func WithQueryObject(query *Query) Option

WithQueryObject setup query object

type Options added in v0.4.0

type Options struct {
	// Debug mode of the stream
	Debug bool

	// BlockSize of buffer to flushing data
	BlockSize int

	// FlushInterval between flushes
	FlushInterval time.Duration

	// QueryBuilder object of data prepare
	QueryBuilder *Query

	// Logger object of module debuging and logs
	Logger *zap.Logger
}

Options of the SQL stream

type Query

type Query struct {
	// contains filtered or unexported fields
}

Query extractor

func NewQuery added in v0.4.3

func NewQuery(query string, opts ...QueryOption) (*Query, error)

NewQuery returns query object from SQL query

func (*Query) Extract

func (q *Query) Extract(msg eventstream.Message) []map[string]any

Extract message by special fields and types

func (*Query) ParamsBy

func (q *Query) ParamsBy(msg eventstream.Message) paramsResult

ParamsBy by message

func (*Query) QueryString added in v0.4.0

func (q *Query) QueryString() string

QueryString - returns the SQL query string

type QueryConfig added in v0.4.3

type QueryConfig struct {
	FieldObject any
	Target      string
	IterateBy   string
}

QueryConfig extra information

type QueryOption added in v0.4.3

type QueryOption func(*QueryConfig)

QueryOption func

func QWithIterateBy added in v0.4.3

func QWithIterateBy(iterateBy string) QueryOption

QWithIterateBy message value

func QWithMessageTmpl added in v0.4.3

func QWithMessageTmpl(fields any) QueryOption

QWithMessageTmpl source info

func QWithTarget added in v0.4.3

func QWithTarget(target string) QueryOption

QWithTarget data collection

type StreamSQL

type StreamSQL struct {
	// contains filtered or unexported fields
}

StreamSQL stream

func (*StreamSQL) Check

func (s *StreamSQL) Check(ctx context.Context, msg eventstream.Message) bool

Check message value

func (*StreamSQL) Close

func (s *StreamSQL) Close() (err error)

Close implementation

func (*StreamSQL) ID

func (s *StreamSQL) ID() string

ID returns unical stream identificator

func (*StreamSQL) Put

func (s *StreamSQL) Put(ctx context.Context, msg eventstream.Message) error

Put message to stream

func (*StreamSQL) Run

func (s *StreamSQL) Run(ctx context.Context) error

Run SQL writer daemon

type Value

type Value struct {
	Key       string // Field key in the object
	TargetKey string // Target Key in the database
	Type      eventstream.FieldType
	Length    int
	Format    string
}

Value item

func ConvertObjectIntoQueryParams added in v0.4.0

func ConvertObjectIntoQueryParams(object any) (values []Value, fields, inserts []string, err error)

ConvertObjectIntoQueryParams returns object which links object Fields and Data Fields

func MapIntoQueryParams added in v0.4.0

func MapIntoQueryParams(object any) (values []Value, fieldNames, inserts []string, err error)

MapIntoQueryParams returns object which links map fields

func PrepareFields

func PrepareFields(fls any) (values []Value, fields, inserts []string, err error)

PrepareFields matching

func PrepareFieldsByArray

func PrepareFieldsByArray(fields []string) (values []Value, fieldNames, inserts []string)

PrepareFieldsByArray matching and returns raw fields for insert Example: [service=srv:int, name:string] Result: [srv:int, name:string], [service,name], [?,?]

func PrepareFieldsByString

func PrepareFieldsByString(fls string) (values []Value, fields, inserts []string)

PrepareFieldsByString matching and returns raw fields for insert

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL