Documentation ¶
Index ¶
- Variables
- func IsFormatSupported(format string) bool
- func IsLookupSource(s api.Source) bool
- func IsPullStreamSource(s api.Source) bool
- func IsPushStreamSource(s api.Source) bool
- func IsStreamSource(s api.Source) bool
- func RegisterConnection(name string, cp ConnectionProvider)
- func RegisterConverter(name string, provider ConverterProvider)
- func RegisterFileRollHook(name string, provider RollHookProvider)
- func RegisterFileStreamReader(name string, provider FileStreamReaderProvider)
- func RegisterFunc(name string, f NewFuncFunc)
- func RegisterLookupSource(name string, f NewLookupSourceFunc)
- func RegisterMerger(name string, provider MergerProvider)
- func RegisterSink(name string, f NewSinkFunc)
- func RegisterSource(name string, f NewSourceFunc)
- type Connection
- type ConnectionProvider
- type ConverterProvider
- type FileStreamReader
- type FileStreamReaderProvider
- type Merger
- type MergerProvider
- type NewFuncFunc
- type NewLookupSourceFunc
- type NewSinkFunc
- type NewSourceFunc
- type RollHook
- type RollHookProvider
Constants ¶
This section is empty.
Variables ¶
View Source
var ( Sources = map[string]NewSourceFunc{} Sinks = map[string]NewSinkFunc{} LookupSources = map[string]NewLookupSourceFunc{} )
View Source
var ConnectionRegister map[string]ConnectionProvider
View Source
var Converters = map[string]ConverterProvider{}
View Source
var Functions = map[string]NewFuncFunc{}
View Source
var Mergers = map[string]MergerProvider{}
Mergers list, the key is format + payload format such as "jsoncan"
Functions ¶
func IsFormatSupported ¶
func IsLookupSource ¶
func IsPullStreamSource ¶
func IsPushStreamSource ¶
func IsStreamSource ¶
func RegisterConnection ¶
func RegisterConnection(name string, cp ConnectionProvider)
func RegisterConverter ¶
func RegisterConverter(name string, provider ConverterProvider)
RegisterConverter registers a converter with the given name.
func RegisterFileRollHook ¶
func RegisterFileRollHook(name string, provider RollHookProvider)
func RegisterFileStreamReader ¶
func RegisterFileStreamReader(name string, provider FileStreamReaderProvider)
func RegisterFunc ¶
func RegisterFunc(name string, f NewFuncFunc)
func RegisterLookupSource ¶
func RegisterLookupSource(name string, f NewLookupSourceFunc)
func RegisterMerger ¶
func RegisterMerger(name string, provider MergerProvider)
RegisterMerger registers a merger with the format name and payload format name such as "jsoncan"
func RegisterSink ¶
func RegisterSink(name string, f NewSinkFunc)
func RegisterSource ¶
func RegisterSource(name string, f NewSourceFunc)
Types ¶
type Connection ¶
type Connection interface { Ping(ctx api.StreamContext) error DetachSub(ctx api.StreamContext, props map[string]any) api.Closable }
type ConnectionProvider ¶
type ConnectionProvider func(ctx api.StreamContext, props map[string]any) (Connection, error)
type ConverterProvider ¶
type ConverterProvider func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error)
ConverterProvider - schemaId: the id for the schema - logicalSchema: the default schema - props: extended properties. Implementation can read and handle it.
type FileStreamReader ¶
type FileStreamReader interface { // Provision Set up the static properties Provision(ctx api.StreamContext, props map[string]any) error // Bind set the file stream. Make sure the previous read has done Bind(ctx api.StreamContext, fileStream io.Reader, maxSize int) error // Read the next record. Returns EOF when the input has reached its end. Read(ctx api.StreamContext) (any, error) // IsBytesReader If is bytes reader, Read must return []byte, otherwise return map or []map IsBytesReader() bool api.Closable }
FileStreamReader reads a type of file line by line. Avoid to load the full file If need to load full file, just extend converter to decode the full bytes
func GetFileStreamReader ¶
func GetFileStreamReader(ctx api.StreamContext, name string) (FileStreamReader, bool)
type FileStreamReaderProvider ¶
type FileStreamReaderProvider func(ctx api.StreamContext) FileStreamReader
type Merger ¶
type Merger interface { // Merging is called when read in a new frame Merging(ctx api.StreamContext, b []byte) error // Trigger is called when rate limiter is trigger to send out a message Trigger(ctx api.StreamContext) ([]any, bool) }
Merger is used to merge multiple frames. It is currently called by rate limiter only
type MergerProvider ¶
type MergerProvider func(ctx api.StreamContext, payloadSchema string, logicalSchema map[string]*ast.JsonStreamField) (Merger, error)
type NewFuncFunc ¶
type NewLookupSourceFunc ¶
type NewSinkFunc ¶
type NewSourceFunc ¶
type RollHook ¶
type RollHook interface { Provision(ctx api.StreamContext, props map[string]any) error RollDone(ctx api.StreamContext, filePath string) error api.Closable }
func GetFileRollHook ¶
type RollHookProvider ¶
type RollHookProvider func() RollHook
Click to show internal directories.
Click to hide internal directories.