Documentation ¶
Index ¶
- func New(connector Connector, pattern string, conf *stream.Config, options ...Option) (stream eventstream.Streamer, err error)
- func NewStreamSQL(id string, connector Connector, options ...Option) (eventstream.Streamer, error)
- type Connector
- type Option
- type Options
- type Query
- type QueryConfig
- type QueryOption
- type StreamSQL
- type Value
- func ConvertObjectIntoQueryParams(object any) (values []Value, fields, inserts []string, err error)
- func MapIntoQueryParams(object any) (values []Value, fieldNames, inserts []string, err error)
- func PrepareFields(fls any) (values []Value, fields, inserts []string, err error)
- func PrepareFieldsByArray(fields []string) (values []Value, fieldNames, inserts []string)
- func PrepareFieldsByString(fls string) (values []Value, fields, inserts []string)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func New ¶
func New(connector Connector, pattern string, conf *stream.Config, options ...Option) (stream eventstream.Streamer, err error)
New stream for SQL type integrations
func NewStreamSQL ¶
NewStreamSQL creates streamer object for SQL based stream integration
Types ¶
type Option ¶
Option type of the stream
func WithBlockSize ¶
WithBlockSize setup the size of writing block
func WithFlushIntervals ¶
WithFlushIntervals setup interval betwin flashes
func WithLogger ¶ added in v0.4.0
WithLogger setup the logging object
func WithQuery ¶ added in v0.4.3
func WithQuery(query string, qOpts ...QueryOption) Option
WithQuery setup query object
func WithQueryObject ¶ added in v0.2.1
WithQueryObject setup query object
type Options ¶ added in v0.4.0
type Options struct { // Debug mode of the stream Debug bool // BlockSize of buffer to flushing data BlockSize int // FlushInterval between flushes FlushInterval time.Duration // QueryBuilder object of data prepare QueryBuilder *Query // Logger object of module debuging and logs Logger *zap.Logger }
Options of the SQL stream
type Query ¶
type Query struct {
// contains filtered or unexported fields
}
Query extractor
func NewQuery ¶ added in v0.4.3
func NewQuery(query string, opts ...QueryOption) (*Query, error)
NewQuery returns query object from SQL query
func (*Query) Extract ¶
func (q *Query) Extract(msg eventstream.Message) []map[string]any
Extract message by special fields and types
func (*Query) ParamsBy ¶
func (q *Query) ParamsBy(msg eventstream.Message) paramsResult
ParamsBy by message
func (*Query) QueryString ¶ added in v0.4.0
QueryString - returns the SQL query string
type QueryConfig ¶ added in v0.4.3
QueryConfig extra information
type QueryOption ¶ added in v0.4.3
type QueryOption func(*QueryConfig)
QueryOption func
func QWithIterateBy ¶ added in v0.4.3
func QWithIterateBy(iterateBy string) QueryOption
QWithIterateBy message value
func QWithMessageTmpl ¶ added in v0.4.3
func QWithMessageTmpl(fields any) QueryOption
QWithMessageTmpl source info
func QWithTarget ¶ added in v0.4.3
func QWithTarget(target string) QueryOption
QWithTarget data collection
type StreamSQL ¶
type StreamSQL struct {
// contains filtered or unexported fields
}
StreamSQL stream
type Value ¶
type Value struct { Key string // Field key in the object TargetKey string // Target Key in the database Type eventstream.FieldType Length int Format string }
Value item
func ConvertObjectIntoQueryParams ¶ added in v0.4.0
ConvertObjectIntoQueryParams returns object which links object Fields and Data Fields
func MapIntoQueryParams ¶ added in v0.4.0
MapIntoQueryParams returns object which links map fields
func PrepareFields ¶
PrepareFields matching
func PrepareFieldsByArray ¶
PrepareFieldsByArray matching and returns raw fields for insert Example: [service=srv:int, name:string] Result: [srv:int, name:string], [service,name], [?,?]
func PrepareFieldsByString ¶
PrepareFieldsByString matching and returns raw fields for insert