modules

package
v2.1.0-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2025 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	Sources       = map[string]NewSourceFunc{}
	Sinks         = map[string]NewSinkFunc{}
	LookupSources = map[string]NewLookupSourceFunc{}
)
View Source
var ConnectionRegister map[string]ConnectionProvider
View Source
var Converters = map[string]ConverterProvider{}
View Source
var Functions = map[string]NewFuncFunc{}
View Source
var Mergers = map[string]MergerProvider{}

Mergers list, the key is format + payload format such as "jsoncan"

Functions

func IsFormatSupported

func IsFormatSupported(format string) bool

func IsLookupSource

func IsLookupSource(s api.Source) bool

func IsPullStreamSource

func IsPullStreamSource(s api.Source) bool

func IsPushStreamSource

func IsPushStreamSource(s api.Source) bool

func IsStreamSource

func IsStreamSource(s api.Source) bool

func RegisterConnection

func RegisterConnection(name string, cp ConnectionProvider)

func RegisterConverter

func RegisterConverter(name string, provider ConverterProvider)

RegisterConverter registers a converter with the given name.

func RegisterFileRollHook

func RegisterFileRollHook(name string, provider RollHookProvider)

func RegisterFileStreamDecorator

func RegisterFileStreamDecorator(name string, provider FileStreamDecoratorProvider)

func RegisterFileStreamReader

func RegisterFileStreamReader(name string, provider FileStreamReaderProvider)

func RegisterFileStreamReaderAlias

func RegisterFileStreamReaderAlias(alias string, ref string)

func RegisterFunc

func RegisterFunc(name string, f NewFuncFunc)

func RegisterLookupSource

func RegisterLookupSource(name string, f NewLookupSourceFunc)

func RegisterMerger

func RegisterMerger(name string, provider MergerProvider)

RegisterMerger registers a merger with the format name and payload format name such as "jsoncan"

func RegisterSink

func RegisterSink(name string, f NewSinkFunc)

func RegisterSource

func RegisterSource(name string, f NewSourceFunc)

Types

type Connection

type Connection interface {
	Provision(ctx api.StreamContext, conId string, props map[string]any) error
	Dial(ctx api.StreamContext) error
	GetId(ctx api.StreamContext) string
	Ping(ctx api.StreamContext) error
	api.Closable
}

type ConnectionProvider

type ConnectionProvider func(ctx api.StreamContext) Connection

type ConnectionStatus

type ConnectionStatus struct {
	Status string `json:"status"`
	ErrMsg string `json:"errMsg,omitempty"`
}

type ConverterProvider

type ConverterProvider func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error)

ConverterProvider - schemaId: the id for the schema - logicalSchema: the default schema - props: extended properties. Implementation can read and handle it.

type FileStreamDecorator

type FileStreamDecorator interface {
	// Provision Set up the static properties
	Provision(ctx api.StreamContext, props map[string]any) error
	// ReadMeta Read the metadata from the file source, and save in the decorator itself
	// It will receive lines, when receiving EOF, there will be no more lines.
	ReadMeta(ctx api.StreamContext, line []byte)
	// Decorate the file source
	Decorate(ctx api.StreamContext, data any) any
}

func GetFileStreamDecorator

func GetFileStreamDecorator(ctx api.StreamContext, name string) (FileStreamDecorator, bool)

type FileStreamDecoratorProvider

type FileStreamDecoratorProvider func(ctx api.StreamContext) FileStreamDecorator

type FileStreamReader

type FileStreamReader interface {
	// Provision Set up the static properties
	Provision(ctx api.StreamContext, props map[string]any) error
	// Bind set the file stream. Make sure the previous read has done
	Bind(ctx api.StreamContext, fileStream io.Reader, maxSize int) error
	// Read the next record. Returns EOF when the input has reached its end.
	Read(ctx api.StreamContext) (any, error)
	// IsBytesReader If is bytes reader, Read must return []byte, otherwise return map or []map
	IsBytesReader() bool
	api.Closable
}

FileStreamReader reads a type of file line by line. Avoid to load the full file If need to load full file, just extend converter to decode the full bytes

func GetFileStreamReader

func GetFileStreamReader(ctx api.StreamContext, name string) (FileStreamReader, bool)

type FileStreamReaderProvider

type FileStreamReaderProvider func(ctx api.StreamContext) FileStreamReader

type Merger

type Merger interface {
	// Merging is called when read in a new frame
	Merging(ctx api.StreamContext, b []byte) error
	// Trigger is called when rate limiter is trigger to send out a message
	Trigger(ctx api.StreamContext) ([]any, bool)
}

Merger is used to merge multiple frames. It is currently called by rate limiter only

type MergerProvider

type MergerProvider func(ctx api.StreamContext, payloadSchema string, logicalSchema map[string]*ast.JsonStreamField) (Merger, error)

type NewFuncFunc

type NewFuncFunc func() api.Function

type NewLookupSourceFunc

type NewLookupSourceFunc func() api.Source

type NewSinkFunc

type NewSinkFunc func() api.Sink

type NewSourceFunc

type NewSourceFunc func() api.Source

type RollHook

type RollHook interface {
	Provision(ctx api.StreamContext, props map[string]any) error
	RollDone(ctx api.StreamContext, filePath string) error
	api.Closable
}

func GetFileRollHook

func GetFileRollHook(name string) (RollHook, bool)

type RollHookProvider

type RollHookProvider func() RollHook

type StatefulDialer

type StatefulDialer interface {
	SetStatusChangeHandler(ctx api.StreamContext, handler api.StatusChangeHandler)
	Status(ctx api.StreamContext) ConnectionStatus
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL