Documentation ¶
Index ¶
- func New(connector Connector, pattern string, conf *stream.Config, options ...Option) (stream eventstream.Streamer, err error)
- func NewStreamSQL(id string, connector Connector, options ...Option) (eventstream.Streamer, error)
- type Connector
- type Option
- func WithBlockSize(size int) Option
- func WithDebug(debug bool) Option
- func WithFlushIntervals(interval time.Duration) Option
- func WithLogger(logger *zap.Logger) Option
- func WithQueryByPattern(pattern, target string, fields any) Option
- func WithQueryObject(query *Query) Option
- func WithQueryRawFields(query string, fields any) Option
- type Options
- type Query
- func (q *Query) Extract(msg eventstream.Message) map[string]any
- func (q *Query) ParamsBy(msg eventstream.Message) (params []any)
- func (q *Query) QueryString() string
- func (q *Query) StringByMessage(msg eventstream.Message) string
- func (q *Query) StringParamsBy(msg eventstream.Message) (params []string)
- type StreamSQL
- type Value
- func ConvertObjectIntoQueryParams(object any) (values []Value, fields, inserts []string, err error)
- func MapIntoQueryParams(object any) (values []Value, fieldNames, inserts []string, err error)
- func PrepareFields(fls any) (values []Value, fields, inserts []string, err error)
- func PrepareFieldsByArray(fields []string) (values []Value, fieldNames, inserts []string)
- func PrepareFieldsByString(fls string) (values []Value, fields, inserts []string)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func New ¶
func New(connector Connector, pattern string, conf *stream.Config, options ...Option) (stream eventstream.Streamer, err error)
New stream for SQL type integrations
func NewStreamSQL ¶
NewStreamSQL creates streamer object for SQL based stream integration
Types ¶
type Option ¶
Option type of the stream
func WithBlockSize ¶
WithBlockSize setup the size of writing block
func WithFlushIntervals ¶
WithFlushIntervals setup interval betwin flashes
func WithLogger ¶ added in v0.4.0
WithLogger setup the logging object
func WithQueryByPattern ¶ added in v0.2.1
WithQueryByPattern setup query object by query pattern
func WithQueryObject ¶ added in v0.2.1
WithQueryObject setup query object
func WithQueryRawFields ¶ added in v0.2.1
WithQueryRawFields setup query object by fields parameters
type Options ¶ added in v0.4.0
type Options struct { // Debug mode of the stream Debug bool // BlockSize of buffer to flushing data BlockSize int // FlushInterval between flushes FlushInterval time.Duration // QueryBuilder object of data prepare QueryBuilder *Query // Logger object of module debuging and logs Logger *zap.Logger }
Options of the SQL stream
type Query ¶
type Query struct {
// contains filtered or unexported fields
}
Query extractor
func NewQueryByPattern ¶
NewQueryByPattern returns query object
func NewQueryByRaw ¶
NewQueryByRaw returns query object from raw SQL query
func (*Query) Extract ¶
func (q *Query) Extract(msg eventstream.Message) map[string]any
Extract message by special fields and types
func (*Query) ParamsBy ¶
func (q *Query) ParamsBy(msg eventstream.Message) (params []any)
ParamsBy by message
func (*Query) QueryString ¶ added in v0.4.0
QueryString - returns the SQL query string
func (*Query) StringByMessage ¶
func (q *Query) StringByMessage(msg eventstream.Message) string
StringByMessage prepare
func (*Query) StringParamsBy ¶
func (q *Query) StringParamsBy(msg eventstream.Message) (params []string)
StringParamsBy by message
type StreamSQL ¶
type StreamSQL struct {
// contains filtered or unexported fields
}
StreamSQL stream
type Value ¶
type Value struct { Key string // Field key in the object TargetKey string // Target Key in the database Type eventstream.FieldType Length int Format string }
Value item
func ConvertObjectIntoQueryParams ¶ added in v0.4.0
ConvertObjectIntoQueryParams returns object which links object Fields and Data Fields
func MapIntoQueryParams ¶ added in v0.4.0
MapIntoQueryParams returns object which links map fields
func PrepareFields ¶
PrepareFields matching
func PrepareFieldsByArray ¶
PrepareFieldsByArray matching and returns raw fields for insert Example: [service=srv:int, name:string] Result: [srv:int, name:string], [service,name], [?,?]
func PrepareFieldsByString ¶
PrepareFieldsByString matching and returns raw fields for insert