runtime

package
v0.0.146 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2024 License: BSD-3-Clause Imports: 35 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsKeyValueType

func IsKeyValueType[T any]() bool
func MakeMultiJoinLink[K comparable, T1, T2, R any](
	multiJoin TypedStream[R],
	stream TypedStream[datastruct.KeyValue[K, T2]])

func MakeSerde

func MakeSerde[T any](runtime ServiceExecutionRuntime) serde.StreamSerde[T]

func MakeService

func MakeService[Environment ServiceExecutionEnvironment, Cfg config.Config](name string, configSettings *config.ConfigSettings) Environment

Types

type AppSinkStream

type AppSinkStream[T any] struct {
	StreamBase[T]
	// contains filtered or unexported fields
}

func MakeAppSinkStream

func MakeAppSinkStream[T any](name string, stream TypedStream[T], consumer ConsumerFunc[T]) *AppSinkStream[T]

func (*AppSinkStream[T]) Consume

func (s *AppSinkStream[T]) Consume(value T)

func (*AppSinkStream[T]) GetConsumers added in v0.0.134

func (s *AppSinkStream[T]) GetConsumers() []Stream

type BinaryConsumer

type BinaryConsumer interface {
	ConsumeBinary([]byte)
}

type BinaryConsumerFunc

type BinaryConsumerFunc func([]byte) error

func (BinaryConsumerFunc) Consume

func (f BinaryConsumerFunc) Consume(data []byte) error

type BinaryKVConsumer

type BinaryKVConsumer interface {
	ConsumeBinary([]byte, []byte)
}

type BinaryKVConsumerFunc

type BinaryKVConsumerFunc func([]byte, []byte) error

func (BinaryKVConsumerFunc) Consume

func (f BinaryKVConsumerFunc) Consume(key []byte, value []byte) error

type Caller

type Caller[T any] interface {
	Consumer[T]
}

type Collect

type Collect[T any] interface {
	Out(value T)
}

type ConsumeStatistics

type ConsumeStatistics interface {
	Count() int64
	LinkId() config.LinkId
}

type ConsumedStream

type ConsumedStream[T any] struct {
	StreamBase[T]
	// contains filtered or unexported fields
}

func (*ConsumedStream[T]) GetConsumer

func (s *ConsumedStream[T]) GetConsumer() TypedStreamConsumer[T]

func (*ConsumedStream[T]) GetConsumers added in v0.0.134

func (s *ConsumedStream[T]) GetConsumers() []Stream

func (*ConsumedStream[T]) GetSerde

func (s *ConsumedStream[T]) GetSerde() serde.StreamSerde[T]

func (*ConsumedStream[T]) SetConsumer

func (s *ConsumedStream[T]) SetConsumer(consumer TypedStreamConsumer[T])

type Consumer

type Consumer[T any] interface {
	Consume(T)
}

type ConsumerFunc

type ConsumerFunc[T any] func(T) error

func (ConsumerFunc[T]) Consume

func (f ConsumerFunc[T]) Consume(value T) error

type DataConnector

type DataConnector interface {
	GetName() string
	GetId() int
}

type DataSink

type DataSink interface {
	DataConnector
	Start(context.Context) error
	Stop(context.Context)
	GetDataConnector() *config.DataConnectorConfig
	GetEnvironment() ServiceExecutionEnvironment
	AddEndpoint(SinkEndpoint)
	GetEndpoint(id int) SinkEndpoint
	GetEndpoints() []SinkEndpoint
}

type DataSinkEndpoint

type DataSinkEndpoint struct {
	// contains filtered or unexported fields
}

func MakeDataSinkEndpoint

func MakeDataSinkEndpoint(dataSink DataSink, config *config.EndpointConfig, environment ServiceExecutionEnvironment) *DataSinkEndpoint

func (*DataSinkEndpoint) AddEndpointConsumer

func (ep *DataSinkEndpoint) AddEndpointConsumer(endpointConsumer OutputEndpointConsumer)

func (*DataSinkEndpoint) GetConfig

func (ep *DataSinkEndpoint) GetConfig() *config.EndpointConfig

func (*DataSinkEndpoint) GetDataConnector

func (ep *DataSinkEndpoint) GetDataConnector() DataConnector

func (*DataSinkEndpoint) GetDataSink

func (ep *DataSinkEndpoint) GetDataSink() DataSink

func (*DataSinkEndpoint) GetEndpointConsumers

func (ep *DataSinkEndpoint) GetEndpointConsumers() []OutputEndpointConsumer

func (*DataSinkEndpoint) GetEnvironment added in v0.0.134

func (ep *DataSinkEndpoint) GetEnvironment() ServiceExecutionEnvironment

func (*DataSinkEndpoint) GetId

func (ep *DataSinkEndpoint) GetId() int

func (*DataSinkEndpoint) GetName

func (ep *DataSinkEndpoint) GetName() string

type DataSinkEndpointConsumer

type DataSinkEndpointConsumer[T any] struct {
	// contains filtered or unexported fields
}

func MakeDataSinkEndpointConsumer

func MakeDataSinkEndpointConsumer[T any](endpoint SinkEndpoint, stream TypedSinkStream[T]) *DataSinkEndpointConsumer[T]

func (*DataSinkEndpointConsumer[T]) Endpoint

func (ec *DataSinkEndpointConsumer[T]) Endpoint() SinkEndpoint

func (*DataSinkEndpointConsumer[T]) GetEndpointWriter

func (ec *DataSinkEndpointConsumer[T]) GetEndpointWriter() TypedEndpointWriter[T]

type DataSource

type DataSource interface {
	DataConnector
	Start(context.Context) error
	Stop(context.Context)
	GetDataConnector() *config.DataConnectorConfig
	GetEnvironment() ServiceExecutionEnvironment
	AddEndpoint(InputEndpoint)
	GetEndpoint(id int) InputEndpoint
	GetEndpoints() []InputEndpoint
}

type DataSourceEndpoint

type DataSourceEndpoint struct {
	// contains filtered or unexported fields
}

func MakeDataSourceEndpoint

func MakeDataSourceEndpoint(dataSource DataSource, config *config.EndpointConfig, environment ServiceExecutionEnvironment) *DataSourceEndpoint

func (*DataSourceEndpoint) AddEndpointConsumer

func (ep *DataSourceEndpoint) AddEndpointConsumer(endpointConsumer InputEndpointConsumer)

func (*DataSourceEndpoint) GetConfig

func (ep *DataSourceEndpoint) GetConfig() *config.EndpointConfig

func (*DataSourceEndpoint) GetDataConnector

func (ep *DataSourceEndpoint) GetDataConnector() DataConnector

func (*DataSourceEndpoint) GetDataSource

func (ep *DataSourceEndpoint) GetDataSource() DataSource

func (*DataSourceEndpoint) GetEndpointConsumers

func (ep *DataSourceEndpoint) GetEndpointConsumers() []InputEndpointConsumer

func (*DataSourceEndpoint) GetEnvironment added in v0.0.134

func (ep *DataSourceEndpoint) GetEnvironment() ServiceExecutionEnvironment

func (*DataSourceEndpoint) GetId

func (ep *DataSourceEndpoint) GetId() int

func (*DataSourceEndpoint) GetName

func (ep *DataSourceEndpoint) GetName() string

type DataSourceEndpointConsumer

type DataSourceEndpointConsumer[T any] struct {
	// contains filtered or unexported fields
}

func MakeDataSourceEndpointConsumer

func MakeDataSourceEndpointConsumer[T any](endpoint InputEndpoint, inputStream TypedInputStream[T]) *DataSourceEndpointConsumer[T]

func (*DataSourceEndpointConsumer[T]) Consume

func (ec *DataSourceEndpointConsumer[T]) Consume(value T)

func (*DataSourceEndpointConsumer[T]) Endpoint

func (ec *DataSourceEndpointConsumer[T]) Endpoint() InputEndpoint

func (*DataSourceEndpointConsumer[T]) GetEndpointReader

func (ec *DataSourceEndpointConsumer[T]) GetEndpointReader() TypedEndpointReader[T]

type DelayFunc

type DelayFunc[T any] func(T) error

type DelayFunction

type DelayFunction[T any] interface {
	Duration(Stream, T) time.Duration
}

type DelayFunctionContext

type DelayFunctionContext[T any] struct {
	StreamFunction[T]
	// contains filtered or unexported fields
}

type DelayStream

type DelayStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func MakeDelayStream

func MakeDelayStream[T any](name string, stream TypedStream[T], f DelayFunction[T]) *DelayStream[T]

func (*DelayStream[T]) Consume

func (s *DelayStream[T]) Consume(value T)

type Edge

type Edge struct {
	From   int    `json:"from"`
	To     int    `json:"to"`
	Arrows string `json:"arrows"`
	Length int    `json:"length"`
	Label  string `json:"label"`
	Color  struct {
		Opacity float32 `json:"opacity"`
		Color   string  `json:"color"`
	} `json:"color"`
}

type Endpoint

type Endpoint interface {
	GetName() string
	GetId() int
	GetDataConnector() DataConnector
}

type EndpointReader

type EndpointReader interface {
}

type EndpointWriter

type EndpointWriter interface {
}

type FilterFunction

type FilterFunction[T any] interface {
	Filter(Stream, T) bool
}

type FilterFunctionContext

type FilterFunctionContext[T any] struct {
	StreamFunction[T]
	// contains filtered or unexported fields
}

type FilterStream

type FilterStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func MakeFilterStream

func MakeFilterStream[T any](name string, stream TypedStream[T], f FilterFunction[T]) *FilterStream[T]

func (*FilterStream[T]) Consume

func (s *FilterStream[T]) Consume(value T)

type FlatMapFunction

type FlatMapFunction[T, R any] interface {
	FlatMap(Stream, T, Collect[R])
}

type FlatMapFunctionContext

type FlatMapFunctionContext[T, R any] struct {
	StreamFunction[R]
	// contains filtered or unexported fields
}

type FlatMapIterableStream

type FlatMapIterableStream[T, R any] struct {
	ConsumedStream[R]
	// contains filtered or unexported fields
}

func MakeFlatMapIterableStream

func MakeFlatMapIterableStream[T, R any](name string, stream TypedStream[T]) *FlatMapIterableStream[T, R]

func (*FlatMapIterableStream[T, R]) Consume

func (s *FlatMapIterableStream[T, R]) Consume(value T)

type FlatMapStream

type FlatMapStream[T, R any] struct {
	ConsumedStream[R]
	// contains filtered or unexported fields
}

func MakeFlatMapStream

func MakeFlatMapStream[T, R any](name string, stream TypedStream[T], f FlatMapFunction[T, R]) *FlatMapStream[T, R]

func (*FlatMapStream[T, R]) Consume

func (s *FlatMapStream[T, R]) Consume(value T)

type ForEachFunction

type ForEachFunction[T any] interface {
	ForEach(Stream, T)
}

type ForEachFunctionContext

type ForEachFunctionContext[T any] struct {
	StreamFunction[T]
	// contains filtered or unexported fields
}

type ForEachStream

type ForEachStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func MakeForEachStream

func MakeForEachStream[T any](name string, stream TypedStream[T], f ForEachFunction[T]) *ForEachStream[T]

func (*ForEachStream[T]) Consume

func (s *ForEachStream[T]) Consume(value T)

type InStubKVStream

type InStubKVStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func (*InStubKVStream[T]) Consume added in v0.0.134

func (s *InStubKVStream[T]) Consume(value T)

func (*InStubKVStream[T]) ConsumeBinary

func (s *InStubKVStream[T]) ConsumeBinary(key []byte, value []byte)

type InStubStream

type InStubStream[T any] struct {
	ConsumedStream[T]
}

func MakeInStubStream

func MakeInStubStream[T any](name string, env ServiceExecutionEnvironment) *InStubStream[T]

func (*InStubStream[T]) Consume added in v0.0.134

func (s *InStubStream[T]) Consume(value T)

func (*InStubStream[T]) ConsumeBinary

func (s *InStubStream[T]) ConsumeBinary(data []byte)

type InputDataSource

type InputDataSource struct {
	// contains filtered or unexported fields
}

func MakeInputDataSource

func MakeInputDataSource(dataConnector *config.DataConnectorConfig, environment ServiceExecutionEnvironment) *InputDataSource

func (*InputDataSource) AddEndpoint

func (ds *InputDataSource) AddEndpoint(endpoint InputEndpoint)

func (*InputDataSource) GetDataConnector

func (ds *InputDataSource) GetDataConnector() *config.DataConnectorConfig

func (*InputDataSource) GetEndpoint

func (ds *InputDataSource) GetEndpoint(id int) InputEndpoint

func (*InputDataSource) GetEndpoints

func (ds *InputDataSource) GetEndpoints() []InputEndpoint

func (*InputDataSource) GetEnvironment added in v0.0.134

func (ds *InputDataSource) GetEnvironment() ServiceExecutionEnvironment

func (*InputDataSource) GetId

func (ds *InputDataSource) GetId() int

func (*InputDataSource) GetName

func (ds *InputDataSource) GetName() string

type InputEndpoint

type InputEndpoint interface {
	Endpoint
	GetConfig() *config.EndpointConfig
	GetEnvironment() ServiceExecutionEnvironment
	GetDataSource() DataSource
	AddEndpointConsumer(consumer InputEndpointConsumer)
	GetEndpointConsumers() []InputEndpointConsumer
}

type InputEndpointConsumer

type InputEndpointConsumer interface {
	Endpoint() InputEndpoint
}

type InputKVSplitStream

type InputKVSplitStream[T any] struct {
	*SplitStream[T]
	// contains filtered or unexported fields
}

func (*InputKVSplitStream[T]) ConsumeBinary

func (s *InputKVSplitStream[T]) ConsumeBinary(key []byte, value []byte)

type InputSplitStream

type InputSplitStream[T any] struct {
	*SplitStream[T]
}

func MakeInputSplitStream

func MakeInputSplitStream[T any](name string, env ServiceExecutionEnvironment) *InputSplitStream[T]

func (*InputSplitStream[T]) ConsumeBinary

func (s *InputSplitStream[T]) ConsumeBinary(data []byte)

type InputStream

type InputStream[T any] struct {
	ConsumedStream[T]
}

func MakeInputStream

func MakeInputStream[T any](name string, env ServiceExecutionEnvironment) *InputStream[T]

func (*InputStream[T]) Consume added in v0.0.134

func (s *InputStream[T]) Consume(value T)

func (*InputStream[T]) GetEndpointId

func (s *InputStream[T]) GetEndpointId() int

type JoinFunction

type JoinFunction[K comparable, T1, T2, R any] interface {
	Join(Stream, K, []T1, []T2, Collect[R]) bool
}

type JoinFunctionContext

type JoinFunctionContext[K comparable, T1, T2, R any] struct {
	StreamFunction[R]
	// contains filtered or unexported fields
}
type JoinLink[K comparable, T1, T2, R any] struct {
	// contains filtered or unexported fields
}

func (*JoinLink[K, T1, T2, R]) Consume

func (s *JoinLink[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T2])

func (*JoinLink[K, T1, T2, R]) GetConfig

func (s *JoinLink[K, T1, T2, R]) GetConfig() *config.StreamConfig

func (*JoinLink[K, T1, T2, R]) GetConsumers added in v0.0.134

func (s *JoinLink[K, T1, T2, R]) GetConsumers() []Stream

func (*JoinLink[K, T1, T2, R]) GetEnvironment added in v0.0.134

func (s *JoinLink[K, T1, T2, R]) GetEnvironment() ServiceExecutionEnvironment

func (*JoinLink[K, T1, T2, R]) GetId

func (s *JoinLink[K, T1, T2, R]) GetId() int

func (*JoinLink[K, T1, T2, R]) GetName

func (s *JoinLink[K, T1, T2, R]) GetName() string

func (*JoinLink[K, T1, T2, R]) GetTransformationName

func (s *JoinLink[K, T1, T2, R]) GetTransformationName() string

func (*JoinLink[K, T1, T2, R]) GetTypeName

func (s *JoinLink[K, T1, T2, R]) GetTypeName() string

type JoinStream

type JoinStream[K comparable, T1, T2, R any] struct {
	ConsumedStream[R]
	// contains filtered or unexported fields
}

func MakeJoinStream

func MakeJoinStream[K comparable, T1, T2, R any](name string, stream TypedStream[datastruct.KeyValue[K, T1]],
	streamRight TypedStream[datastruct.KeyValue[K, T2]],
	f JoinFunction[K, T1, T2, R]) *JoinStream[K, T1, T2, R]

func (*JoinStream[K, T1, T2, R]) Consume

func (s *JoinStream[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T1])

func (*JoinStream[K, T1, T2, R]) ConsumeRight

func (s *JoinStream[K, T1, T2, R]) ConsumeRight(value datastruct.KeyValue[K, T2])

func (*JoinStream[K, T1, T2, R]) Out

func (s *JoinStream[K, T1, T2, R]) Out(value R)

type KeyByFunction

type KeyByFunction[T any, K comparable, V any] interface {
	KeyBy(Stream, T) datastruct.KeyValue[K, V]
}

type KeyByFunctionContext

type KeyByFunctionContext[T any, K comparable, V any] struct {
	StreamFunction[datastruct.KeyValue[K, V]]
	// contains filtered or unexported fields
}

type KeyByStream

type KeyByStream[T any, K comparable, V any] struct {
	ConsumedStream[datastruct.KeyValue[K, V]]
	// contains filtered or unexported fields
}

func MakeKeyByStream

func MakeKeyByStream[T any, K comparable, V any](name string, stream TypedStream[T], f KeyByFunction[T, K, V]) *KeyByStream[T, K, V]

func (*KeyByStream[T, K, V]) Consume

func (s *KeyByStream[T, K, V]) Consume(value T)

type LinkStream

type LinkStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func MakeLinkStream

func MakeLinkStream[T any](name string, env ServiceExecutionEnvironment) *LinkStream[T]

func (*LinkStream[T]) Consume

func (s *LinkStream[T]) Consume(value T)

func (*LinkStream[T]) SetSource

func (s *LinkStream[T]) SetSource(stream TypedConsumedStream[T])

type MapFunction

type MapFunction[T, R any] interface {
	Map(Stream, T) R
}

type MapFunctionContext

type MapFunctionContext[T, R any] struct {
	StreamFunction[R]
	// contains filtered or unexported fields
}

type MapStream

type MapStream[T, R any] struct {
	ConsumedStream[R]
	// contains filtered or unexported fields
}

func MakeMapStream

func MakeMapStream[T, R any](name string, stream TypedStream[T], f MapFunction[T, R]) *MapStream[T, R]

func (*MapStream[T, R]) Consume

func (s *MapStream[T, R]) Consume(value T)
type MergeLink[T any] struct {
	// contains filtered or unexported fields
}

func (*MergeLink[T]) Consume

func (s *MergeLink[T]) Consume(value T)

func (*MergeLink[T]) GetConfig

func (s *MergeLink[T]) GetConfig() *config.StreamConfig

func (*MergeLink[T]) GetConsumers added in v0.0.134

func (s *MergeLink[T]) GetConsumers() []Stream

func (*MergeLink[T]) GetEnvironment added in v0.0.134

func (s *MergeLink[T]) GetEnvironment() ServiceExecutionEnvironment

func (*MergeLink[T]) GetId

func (s *MergeLink[T]) GetId() int

func (*MergeLink[T]) GetName

func (s *MergeLink[T]) GetName() string

func (*MergeLink[T]) GetTransformationName

func (s *MergeLink[T]) GetTransformationName() string

func (*MergeLink[T]) GetTypeName

func (s *MergeLink[T]) GetTypeName() string

type MergeStream

type MergeStream[T any] struct {
	ConsumedStream[T]
}

func MakeMergeStream

func MakeMergeStream[T any](name string, streams ...TypedStream[T]) *MergeStream[T]

func (*MergeStream[T]) Consume added in v0.0.134

func (s *MergeStream[T]) Consume(value T)

type MultiJoinFunction

type MultiJoinFunction[K comparable, T, R any] interface {
	MultiJoin(Stream, K, [][]interface{}, Collect[R]) bool
}

type MultiJoinFunctionContext

type MultiJoinFunctionContext[K comparable, T, R any] struct {
	StreamFunction[R]
	// contains filtered or unexported fields
}

type MultiJoinLinkStream

type MultiJoinLinkStream[K comparable, T1, T2, R any] struct {
	// contains filtered or unexported fields
}

func (*MultiJoinLinkStream[K, T1, T2, R]) Consume

func (s *MultiJoinLinkStream[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T2])

func (*MultiJoinLinkStream[K, T1, T2, R]) GetConfig

func (s *MultiJoinLinkStream[K, T1, T2, R]) GetConfig() *config.StreamConfig

func (*MultiJoinLinkStream[K, T1, T2, R]) GetConsumers added in v0.0.134

func (s *MultiJoinLinkStream[K, T1, T2, R]) GetConsumers() []Stream

func (*MultiJoinLinkStream[K, T1, T2, R]) GetEnvironment added in v0.0.134

func (s *MultiJoinLinkStream[K, T1, T2, R]) GetEnvironment() ServiceExecutionEnvironment

func (*MultiJoinLinkStream[K, T1, T2, R]) GetId

func (s *MultiJoinLinkStream[K, T1, T2, R]) GetId() int

func (*MultiJoinLinkStream[K, T1, T2, R]) GetName

func (s *MultiJoinLinkStream[K, T1, T2, R]) GetName() string

func (*MultiJoinLinkStream[K, T1, T2, R]) GetTransformationName

func (s *MultiJoinLinkStream[K, T1, T2, R]) GetTransformationName() string

func (*MultiJoinLinkStream[K, T1, T2, R]) GetTypeName

func (s *MultiJoinLinkStream[K, T1, T2, R]) GetTypeName() string

type MultiJoinStream

type MultiJoinStream[K comparable, T, R any] struct {
	ConsumedStream[R]
	// contains filtered or unexported fields
}

func MakeMultiJoinStream

func MakeMultiJoinStream[K comparable, T, R any](
	name string, leftStream TypedStream[datastruct.KeyValue[K, T]],
	f MultiJoinFunction[K, T, R]) *MultiJoinStream[K, T, R]

func (*MultiJoinStream[K, T, R]) Consume

func (s *MultiJoinStream[K, T, R]) Consume(value datastruct.KeyValue[K, T])

func (*MultiJoinStream[K, T, R]) ConsumeRight

func (s *MultiJoinStream[K, T, R]) ConsumeRight(index int, value datastruct.KeyValue[K, interface{}])

func (*MultiJoinStream[K, T, R]) Out

func (s *MultiJoinStream[K, T, R]) Out(value R)

type NetworkData

type NetworkData struct {
	Nodes []*Node `json:"nodes"`
	Edges []*Edge `json:"edges"`
}

type Node

type Node struct {
	Id    int `json:"id"`
	Color struct {
		Background string `json:"background"`
	} `json:"color"`
	Opacity float32 `json:"opacity"`
	Label   string  `json:"label"`
	X       int     `json:"x"`
	Y       int     `json:"y"`
}

type OutStubBinaryKVStream

type OutStubBinaryKVStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func MakeOutStubBinaryKVStream

func MakeOutStubBinaryKVStream[T any](name string, stream TypedStream[T], consumer BinaryKVConsumerFunc) *OutStubBinaryKVStream[T]

func (*OutStubBinaryKVStream[T]) Consume

func (s *OutStubBinaryKVStream[T]) Consume(value T)

type OutStubBinaryStream

type OutStubBinaryStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func MakeOutStubBinaryStream

func MakeOutStubBinaryStream[T any](name string, stream TypedStream[T], consumer BinaryConsumerFunc) *OutStubBinaryStream[T]

func (*OutStubBinaryStream[T]) Consume

func (s *OutStubBinaryStream[T]) Consume(value T)

type OutStubStream

type OutStubStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func MakeOutStubStream

func MakeOutStubStream[T any](name string, stream TypedStream[T], consumer ConsumerFunc[T]) *OutStubStream[T]

func (*OutStubStream[T]) Consume

func (s *OutStubStream[T]) Consume(value T)

type OutputDataSink

type OutputDataSink struct {
	// contains filtered or unexported fields
}

func MakeOutputDataSink

func MakeOutputDataSink(dataConnector *config.DataConnectorConfig, environment ServiceExecutionEnvironment) *OutputDataSink

func (*OutputDataSink) AddEndpoint

func (ds *OutputDataSink) AddEndpoint(endpoint SinkEndpoint)

func (*OutputDataSink) GetDataConnector

func (ds *OutputDataSink) GetDataConnector() *config.DataConnectorConfig

func (*OutputDataSink) GetEndpoint

func (ds *OutputDataSink) GetEndpoint(id int) SinkEndpoint

func (*OutputDataSink) GetEndpoints

func (ds *OutputDataSink) GetEndpoints() []SinkEndpoint

func (*OutputDataSink) GetEnvironment added in v0.0.134

func (ds *OutputDataSink) GetEnvironment() ServiceExecutionEnvironment

func (*OutputDataSink) GetId

func (ds *OutputDataSink) GetId() int

func (*OutputDataSink) GetName

func (ds *OutputDataSink) GetName() string

type OutputEndpointConsumer

type OutputEndpointConsumer interface {
	Endpoint() SinkEndpoint
}

type ParallelsFunction

type ParallelsFunction[T, R any] interface {
	Parallels(Stream, T, Collect[R])
}

type ParallelsFunctionContext

type ParallelsFunctionContext[T, R any] struct {
	StreamFunction[R]
	// contains filtered or unexported fields
}

type ParallelsStream

type ParallelsStream[T, R any] struct {
	ConsumedStream[R]
	// contains filtered or unexported fields
}

func MakeParallelsStream

func MakeParallelsStream[T, R any](name string, stream TypedStream[T], f ParallelsFunction[T, R]) *ParallelsStream[T, R]

func (*ParallelsStream[T, R]) Consume

func (s *ParallelsStream[T, R]) Consume(value T)

type ServiceApp

type ServiceApp struct {
	// contains filtered or unexported fields
}

func (*ServiceApp) AddDataSink

func (app *ServiceApp) AddDataSink(dataSink DataSink)

func (*ServiceApp) AddDataSource

func (app *ServiceApp) AddDataSource(dataSource DataSource)

func (*ServiceApp) Delay

func (app *ServiceApp) Delay(duration time.Duration, f func())

func (*ServiceApp) GetAppConfig added in v0.0.146

func (app *ServiceApp) GetAppConfig() *config.ServiceAppConfig

func (*ServiceApp) GetConsumeTimeout

func (app *ServiceApp) GetConsumeTimeout(from int, to int) time.Duration

func (*ServiceApp) GetDataSink

func (app *ServiceApp) GetDataSink(id int) DataSink

func (*ServiceApp) GetDataSource

func (app *ServiceApp) GetDataSource(id int) DataSource

func (*ServiceApp) GetEndpointReader

func (app *ServiceApp) GetEndpointReader(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointReader

func (*ServiceApp) GetEndpointWriter

func (app *ServiceApp) GetEndpointWriter(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointWriter

func (*ServiceApp) GetMetrics

func (app *ServiceApp) GetMetrics() metrics.Metrics

func (*ServiceApp) GetRuntime added in v0.0.134

func (app *ServiceApp) GetRuntime() ServiceExecutionRuntime

func (*ServiceApp) GetServiceConfig

func (app *ServiceApp) GetServiceConfig() *config.ServiceConfig

func (*ServiceApp) ReloadConfig

func (app *ServiceApp) ReloadConfig(config config.Config)

func (*ServiceApp) Start

func (app *ServiceApp) Start(ctx context.Context) error

func (*ServiceApp) Stop

func (app *ServiceApp) Stop(ctx context.Context)

type ServiceExecutionEnvironment added in v0.0.134

type ServiceExecutionEnvironment interface {
	config.ServiceEnvironment
	GetSerde(valueType reflect.Type) (serde.Serializer, error)
	StreamsInit(ctx context.Context)
	SetConfig(config config.Config)
	Start(context.Context) error
	Stop(context.Context)
	AddDataSource(dataSource DataSource)
	GetDataSource(id int) DataSource
	AddDataSink(dataSink DataSink)
	GetDataSink(id int) DataSink
	GetConsumeTimeout(from int, to int) time.Duration
	GetEndpointReader(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointReader
	GetEndpointWriter(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointWriter
	GetMetrics() metrics.Metrics
	Delay(duration time.Duration, f func())
	GetRuntime() ServiceExecutionRuntime
}

type ServiceExecutionRuntime added in v0.0.134

type ServiceExecutionRuntime interface {
	// contains filtered or unexported methods
}

type ServiceLoader added in v0.0.145

type ServiceLoader interface {
	Stop()
}

type SinkEndpoint

type SinkEndpoint interface {
	Endpoint
	GetConfig() *config.EndpointConfig
	GetEnvironment() ServiceExecutionEnvironment
	GetDataSink() DataSink
	AddEndpointConsumer(consumer OutputEndpointConsumer)
	GetEndpointConsumers() []OutputEndpointConsumer
}

type SinkStream

type SinkStream[T any] struct {
	StreamBase[T]
	// contains filtered or unexported fields
}

func MakeSinkStream

func MakeSinkStream[T any](name string, stream TypedStream[T]) *SinkStream[T]

func (*SinkStream[T]) Consume

func (s *SinkStream[T]) Consume(value T)

func (*SinkStream[T]) GetConsumers added in v0.0.134

func (s *SinkStream[T]) GetConsumers() []Stream

func (*SinkStream[T]) GetEndpointId

func (s *SinkStream[T]) GetEndpointId() int

func (*SinkStream[T]) SetConsumer

func (s *SinkStream[T]) SetConsumer(consumer Consumer[T])
type SplitLink[T any] struct {
	// contains filtered or unexported fields
}

func (*SplitLink[T]) Consume

func (s *SplitLink[T]) Consume(value T)

func (*SplitLink[T]) GetConfig

func (s *SplitLink[T]) GetConfig() *config.StreamConfig

func (*SplitLink[T]) GetConsumer

func (s *SplitLink[T]) GetConsumer() TypedStreamConsumer[T]

func (*SplitLink[T]) GetConsumers added in v0.0.134

func (s *SplitLink[T]) GetConsumers() []Stream

func (*SplitLink[T]) GetEnvironment added in v0.0.134

func (s *SplitLink[T]) GetEnvironment() ServiceExecutionEnvironment

func (*SplitLink[T]) GetId

func (s *SplitLink[T]) GetId() int

func (*SplitLink[T]) GetName

func (s *SplitLink[T]) GetName() string

func (*SplitLink[T]) GetSerde

func (s *SplitLink[T]) GetSerde() serde.StreamSerde[T]

func (*SplitLink[T]) GetTransformationName

func (s *SplitLink[T]) GetTransformationName() string

func (*SplitLink[T]) GetTypeName

func (s *SplitLink[T]) GetTypeName() string

func (*SplitLink[T]) SetConsumer

func (s *SplitLink[T]) SetConsumer(consumer TypedStreamConsumer[T])

type SplitStream

type SplitStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func MakeSplitStream

func MakeSplitStream[T any](name string, stream TypedStream[T]) *SplitStream[T]

func (*SplitStream[T]) AddStream

func (s *SplitStream[T]) AddStream() TypedConsumedStream[T]

func (*SplitStream[T]) Consume

func (s *SplitStream[T]) Consume(value T)

func (*SplitStream[T]) GetConsumers added in v0.0.134

func (s *SplitStream[T]) GetConsumers() []Stream

type Stream

type Stream interface {
	GetName() string
	GetTransformationName() string
	GetTypeName() string
	GetId() int
	GetConfig() *config.StreamConfig
	GetEnvironment() ServiceExecutionEnvironment
	GetConsumers() []Stream
}

type StreamBase

type StreamBase[T any] struct {
	// contains filtered or unexported fields
}

func (*StreamBase[T]) GetConfig

func (s *StreamBase[T]) GetConfig() *config.StreamConfig

func (*StreamBase[T]) GetEnvironment added in v0.0.134

func (s *StreamBase[T]) GetEnvironment() ServiceExecutionEnvironment

func (*StreamBase[T]) GetId

func (s *StreamBase[T]) GetId() int

func (*StreamBase[T]) GetName

func (s *StreamBase[T]) GetName() string

func (*StreamBase[T]) GetTransformationName

func (s *StreamBase[T]) GetTransformationName() string

func (*StreamBase[T]) GetTypeName

func (s *StreamBase[T]) GetTypeName() string

type StreamFunction

type StreamFunction[T any] struct {
	// contains filtered or unexported fields
}

func (*StreamFunction[T]) AfterCall

func (f *StreamFunction[T]) AfterCall()

func (*StreamFunction[T]) BeforeCall

func (f *StreamFunction[T]) BeforeCall()

type TypedBinaryConsumedStream

type TypedBinaryConsumedStream[T any] interface {
	TypedConsumedStream[T]
	BinaryConsumer
}

type TypedBinaryKVConsumedStream

type TypedBinaryKVConsumedStream[T any] interface {
	TypedConsumedStream[T]
	BinaryKVConsumer
}

type TypedBinaryKVSplitStream

type TypedBinaryKVSplitStream[T any] interface {
	TypedBinaryKVConsumedStream[T]
	AddStream() TypedConsumedStream[T]
}

type TypedBinarySplitStream

type TypedBinarySplitStream[T any] interface {
	TypedBinaryConsumedStream[T]
	AddStream() TypedConsumedStream[T]
}

type TypedConsumedStream

type TypedConsumedStream[T any] interface {
	TypedStream[T]
	Consumer[T]
}

type TypedEndpointReader

type TypedEndpointReader[T any] interface {
	EndpointReader
	Read(io.Reader) (T, error)
}

type TypedEndpointWriter

type TypedEndpointWriter[T any] interface {
	EndpointWriter
	Write(T, io.Writer) error
}

type TypedInputStream

type TypedInputStream[T any] interface {
	TypedStream[T]
	Consumer[T]
	GetEndpointId() int
}

type TypedJoinConsumedStream

type TypedJoinConsumedStream[K comparable, T1, T2, R any] interface {
	TypedTransformConsumedStream[datastruct.KeyValue[K, T1], R]
	ConsumeRight(datastruct.KeyValue[K, T2])
}

type TypedLinkStream

type TypedLinkStream[T any] interface {
	TypedStream[T]
	Consumer[T]
	SetSource(TypedConsumedStream[T])
}

type TypedMultiJoinConsumedStream

type TypedMultiJoinConsumedStream[K comparable, T, R any] interface {
	TypedTransformConsumedStream[datastruct.KeyValue[K, T], R]
	ConsumeRight(int, datastruct.KeyValue[K, interface{}])
}

type TypedSinkStream

type TypedSinkStream[T any] interface {
	TypedStreamConsumer[T]
	GetEndpointId() int
	SetConsumer(Consumer[T])
}

type TypedSplitStream

type TypedSplitStream[T any] interface {
	TypedConsumedStream[T]
	AddStream() TypedConsumedStream[T]
}

type TypedStream

type TypedStream[T any] interface {
	Stream
	GetConsumer() TypedStreamConsumer[T]
	GetSerde() serde.StreamSerde[T]
	SetConsumer(TypedStreamConsumer[T])
}

type TypedStreamConsumer

type TypedStreamConsumer[T any] interface {
	Stream
	Consumer[T]
}

type TypedTransformConsumedStream

type TypedTransformConsumedStream[T, R any] interface {
	TypedStream[R]
	Consumer[T]
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL