Documentation ¶
Index ¶
- Variables
- func IsFormatSupported(format string) bool
- func IsLookupSource(s api.Source) bool
- func IsPullStreamSource(s api.Source) bool
- func IsPushStreamSource(s api.Source) bool
- func IsStreamSource(s api.Source) bool
- func RegisterConnection(name string, cp ConnectionProvider)
- func RegisterConverter(name string, provider ConverterProvider)
- func RegisterFileRollHook(name string, provider RollHookProvider)
- func RegisterFileStreamDecorator(name string, provider FileStreamDecoratorProvider)
- func RegisterFileStreamReader(name string, provider FileStreamReaderProvider)
- func RegisterFileStreamReaderAlias(alias string, ref string)
- func RegisterFunc(name string, f NewFuncFunc)
- func RegisterLookupSource(name string, f NewLookupSourceFunc)
- func RegisterMerger(name string, provider MergerProvider)
- func RegisterSink(name string, f NewSinkFunc)
- func RegisterSource(name string, f NewSourceFunc)
- type Connection
- type ConnectionProvider
- type ConnectionStatus
- type ConverterProvider
- type FileStreamDecorator
- type FileStreamDecoratorProvider
- type FileStreamReader
- type FileStreamReaderProvider
- type Merger
- type MergerProvider
- type NewFuncFunc
- type NewLookupSourceFunc
- type NewSinkFunc
- type NewSourceFunc
- type RollHook
- type RollHookProvider
- type StatefulDialer
Constants ¶
This section is empty.
Variables ¶
View Source
var ( Sources = map[string]NewSourceFunc{} Sinks = map[string]NewSinkFunc{} LookupSources = map[string]NewLookupSourceFunc{} )
View Source
var ConnectionRegister map[string]ConnectionProvider
View Source
var Converters = map[string]ConverterProvider{}
View Source
var Functions = map[string]NewFuncFunc{}
View Source
var Mergers = map[string]MergerProvider{}
Mergers list, the key is format + payload format such as "jsoncan"
Functions ¶
func IsFormatSupported ¶
func IsLookupSource ¶
func IsPullStreamSource ¶
func IsPushStreamSource ¶
func IsStreamSource ¶
func RegisterConnection ¶
func RegisterConnection(name string, cp ConnectionProvider)
func RegisterConverter ¶
func RegisterConverter(name string, provider ConverterProvider)
RegisterConverter registers a converter with the given name.
func RegisterFileRollHook ¶
func RegisterFileRollHook(name string, provider RollHookProvider)
func RegisterFileStreamDecorator ¶
func RegisterFileStreamDecorator(name string, provider FileStreamDecoratorProvider)
func RegisterFileStreamReader ¶
func RegisterFileStreamReader(name string, provider FileStreamReaderProvider)
func RegisterFunc ¶
func RegisterFunc(name string, f NewFuncFunc)
func RegisterLookupSource ¶
func RegisterLookupSource(name string, f NewLookupSourceFunc)
func RegisterMerger ¶
func RegisterMerger(name string, provider MergerProvider)
RegisterMerger registers a merger with the format name and payload format name such as "jsoncan"
func RegisterSink ¶
func RegisterSink(name string, f NewSinkFunc)
func RegisterSource ¶
func RegisterSource(name string, f NewSourceFunc)
Types ¶
type Connection ¶
type Connection interface { Provision(ctx api.StreamContext, conId string, props map[string]any) error Dial(ctx api.StreamContext) error GetId(ctx api.StreamContext) string Ping(ctx api.StreamContext) error api.Closable }
type ConnectionProvider ¶
type ConnectionProvider func(ctx api.StreamContext) Connection
type ConnectionStatus ¶
type ConverterProvider ¶
type ConverterProvider func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error)
ConverterProvider - schemaId: the id for the schema - logicalSchema: the default schema - props: extended properties. Implementation can read and handle it.
type FileStreamDecorator ¶
type FileStreamDecorator interface { // Provision Set up the static properties Provision(ctx api.StreamContext, props map[string]any) error // ReadMeta Read the metadata from the file source, and save in the decorator itself // It will receive lines, when receiving EOF, there will be no more lines. ReadMeta(ctx api.StreamContext, line []byte) // Decorate the file source Decorate(ctx api.StreamContext, data any) any }
func GetFileStreamDecorator ¶
func GetFileStreamDecorator(ctx api.StreamContext, name string) (FileStreamDecorator, bool)
type FileStreamDecoratorProvider ¶
type FileStreamDecoratorProvider func(ctx api.StreamContext) FileStreamDecorator
type FileStreamReader ¶
type FileStreamReader interface { // Provision Set up the static properties Provision(ctx api.StreamContext, props map[string]any) error // Bind set the file stream. Make sure the previous read has done Bind(ctx api.StreamContext, fileStream io.Reader, maxSize int) error // Read the next record. Returns EOF when the input has reached its end. Read(ctx api.StreamContext) (any, error) // IsBytesReader If is bytes reader, Read must return []byte, otherwise return map or []map IsBytesReader() bool api.Closable }
FileStreamReader reads a type of file line by line. Avoid to load the full file If need to load full file, just extend converter to decode the full bytes
func GetFileStreamReader ¶
func GetFileStreamReader(ctx api.StreamContext, name string) (FileStreamReader, bool)
type FileStreamReaderProvider ¶
type FileStreamReaderProvider func(ctx api.StreamContext) FileStreamReader
type Merger ¶
type Merger interface { // Merging is called when read in a new frame Merging(ctx api.StreamContext, b []byte) error // Trigger is called when rate limiter is trigger to send out a message Trigger(ctx api.StreamContext) ([]any, bool) }
Merger is used to merge multiple frames. It is currently called by rate limiter only
type MergerProvider ¶
type MergerProvider func(ctx api.StreamContext, payloadSchema string, logicalSchema map[string]*ast.JsonStreamField) (Merger, error)
type NewFuncFunc ¶
type NewLookupSourceFunc ¶
type NewSinkFunc ¶
type NewSourceFunc ¶
type RollHook ¶
type RollHook interface { Provision(ctx api.StreamContext, props map[string]any) error RollDone(ctx api.StreamContext, filePath string) error api.Closable }
func GetFileRollHook ¶
type RollHookProvider ¶
type RollHookProvider func() RollHook
type StatefulDialer ¶
type StatefulDialer interface { SetStatusChangeHandler(ctx api.StreamContext, handler api.StatusChangeHandler) Status(ctx api.StreamContext) ConnectionStatus }
Click to show internal directories.
Click to hide internal directories.