Documentation ¶
Index ¶
- func IsKeyValueType[T any]() bool
- func MakeKeyValueSerde[K comparable, V any](runtime StreamExecutionRuntime) serde.StreamKeyValueSerde[datastruct.KeyValue[K, V]]
- func MakeMultiJoinLink[K comparable, T1, T2, R any](multiJoin TypedStream[R], stream TypedStream[datastruct.KeyValue[K, T2]])
- func MakeSerde[T any](runtime StreamExecutionRuntime) serde.StreamSerde[T]
- func MakeService[Runtime StreamExecutionRuntime, Cfg config.Config](name string, configSettings *config.ConfigSettings) Runtime
- func RegisterKeyValueSerde[K comparable, V any](runtime StreamExecutionRuntime) serde.StreamKeyValueSerde[datastruct.KeyValue[K, V]]
- func RegisterSerde[T any](runtime StreamExecutionRuntime) serde.StreamSerde[T]
- type AppSinkStream
- type BinaryConsumer
- type BinaryConsumerFunc
- type BinaryKVConsumer
- type BinaryKVConsumerFunc
- type Caller
- type Collect
- type ConsumeStatistics
- type ConsumedStream
- type Consumer
- type ConsumerFunc
- type DataConnector
- type DataSink
- type DataSinkEndpoint
- func (ep *DataSinkEndpoint) AddEndpointConsumer(endpointConsumer OutputEndpointConsumer)
- func (ep *DataSinkEndpoint) GetConfig() *config.EndpointConfig
- func (ep *DataSinkEndpoint) GetDataConnector() DataConnector
- func (ep *DataSinkEndpoint) GetDataSink() DataSink
- func (ep *DataSinkEndpoint) GetEndpointConsumers() []OutputEndpointConsumer
- func (ep *DataSinkEndpoint) GetId() int
- func (ep *DataSinkEndpoint) GetName() string
- func (ep *DataSinkEndpoint) GetRuntime() StreamExecutionRuntime
- type DataSinkEndpointConsumer
- type DataSource
- type DataSourceEndpoint
- func (ep *DataSourceEndpoint) AddEndpointConsumer(endpointConsumer InputEndpointConsumer)
- func (ep *DataSourceEndpoint) GetConfig() *config.EndpointConfig
- func (ep *DataSourceEndpoint) GetDataConnector() DataConnector
- func (ep *DataSourceEndpoint) GetDataSource() DataSource
- func (ep *DataSourceEndpoint) GetEndpointConsumers() []InputEndpointConsumer
- func (ep *DataSourceEndpoint) GetId() int
- func (ep *DataSourceEndpoint) GetName() string
- func (ep *DataSourceEndpoint) GetRuntime() StreamExecutionRuntime
- type DataSourceEndpointConsumer
- type DelayFunc
- type DelayFunction
- type DelayFunctionContext
- type DelayStream
- type Edge
- type Endpoint
- type EndpointReader
- type EndpointWriter
- type FilterFunction
- type FilterFunctionContext
- type FilterStream
- type FlatMapFunction
- type FlatMapFunctionContext
- type FlatMapIterableStream
- type FlatMapStream
- type ForEachFunction
- type ForEachFunctionContext
- type ForEachStream
- type InStubKVStream
- type InStubStream
- type InputDataSource
- func (ds *InputDataSource) AddEndpoint(endpoint InputEndpoint)
- func (ds *InputDataSource) GetDataConnector() *config.DataConnectorConfig
- func (ds *InputDataSource) GetEndpoint(id int) InputEndpoint
- func (ds *InputDataSource) GetEndpoints() []InputEndpoint
- func (ds *InputDataSource) GetId() int
- func (ds *InputDataSource) GetName() string
- func (ds *InputDataSource) GetRuntime() StreamExecutionRuntime
- type InputEndpoint
- type InputEndpointConsumer
- type InputKVSplitStream
- type InputSplitStream
- type InputStream
- type JoinFunction
- type JoinFunctionContext
- type JoinLink
- func (s *JoinLink[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T2])
- func (s *JoinLink[K, T1, T2, R]) GetConfig() *config.StreamConfig
- func (s *JoinLink[K, T1, T2, R]) GetId() int
- func (s *JoinLink[K, T1, T2, R]) GetName() string
- func (s *JoinLink[K, T1, T2, R]) GetRuntime() StreamExecutionRuntime
- func (s *JoinLink[K, T1, T2, R]) GetTransformationName() string
- func (s *JoinLink[K, T1, T2, R]) GetTypeName() string
- type JoinStream
- type KeyByFunction
- type KeyByFunctionContext
- type KeyByStream
- type LinkStream
- type MapFunction
- type MapFunctionContext
- type MapStream
- type MergeLink
- func (s *MergeLink[T]) Consume(value T)
- func (s *MergeLink[T]) GetConfig() *config.StreamConfig
- func (s *MergeLink[T]) GetId() int
- func (s *MergeLink[T]) GetName() string
- func (s *MergeLink[T]) GetRuntime() StreamExecutionRuntime
- func (s *MergeLink[T]) GetTransformationName() string
- func (s *MergeLink[T]) GetTypeName() string
- type MergeStream
- type MultiJoinFunction
- type MultiJoinFunctionContext
- type MultiJoinLinkStream
- func (s *MultiJoinLinkStream[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T2])
- func (s *MultiJoinLinkStream[K, T1, T2, R]) GetConfig() *config.StreamConfig
- func (s *MultiJoinLinkStream[K, T1, T2, R]) GetId() int
- func (s *MultiJoinLinkStream[K, T1, T2, R]) GetName() string
- func (s *MultiJoinLinkStream[K, T1, T2, R]) GetRuntime() StreamExecutionRuntime
- func (s *MultiJoinLinkStream[K, T1, T2, R]) GetTransformationName() string
- func (s *MultiJoinLinkStream[K, T1, T2, R]) GetTypeName() string
- type MultiJoinStream
- type NetworkData
- type Node
- type OutStubBinaryKVStream
- type OutStubBinaryStream
- type OutStubStream
- type OutputDataSink
- func (ds *OutputDataSink) AddEndpoint(endpoint SinkEndpoint)
- func (ds *OutputDataSink) GetDataConnector() *config.DataConnectorConfig
- func (ds *OutputDataSink) GetEndpoint(id int) SinkEndpoint
- func (ds *OutputDataSink) GetEndpoints() []SinkEndpoint
- func (ds *OutputDataSink) GetId() int
- func (ds *OutputDataSink) GetName() string
- func (ds *OutputDataSink) GetRuntime() StreamExecutionRuntime
- type OutputEndpointConsumer
- type ParallelsFunction
- type ParallelsFunctionContext
- type ParallelsStream
- type ServiceApp
- func (app *ServiceApp) AddDataSink(dataSink DataSink)
- func (app *ServiceApp) AddDataSource(dataSource DataSource)
- func (app *ServiceApp) Delay(duration time.Duration, f func())
- func (app *ServiceApp) GetConfig() *config.ServiceAppConfig
- func (app *ServiceApp) GetConsumeTimeout(from int, to int) time.Duration
- func (app *ServiceApp) GetDataSink(id int) DataSink
- func (app *ServiceApp) GetDataSource(id int) DataSource
- func (app *ServiceApp) GetEndpointReader(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointReader
- func (app *ServiceApp) GetEndpointWriter(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointWriter
- func (app *ServiceApp) GetMetrics() metrics.Metrics
- func (app *ServiceApp) GetServiceConfig() *config.ServiceConfig
- func (app *ServiceApp) ReloadConfig(config config.Config)
- func (app *ServiceApp) Start(ctx context.Context) error
- func (app *ServiceApp) Stop(ctx context.Context)
- type ServiceStream
- type SinkEndpoint
- type SinkStream
- type SplitLink
- func (s *SplitLink[T]) Consume(value T)
- func (s *SplitLink[T]) GetConfig() *config.StreamConfig
- func (s *SplitLink[T]) GetConsumer() TypedStreamConsumer[T]
- func (s *SplitLink[T]) GetId() int
- func (s *SplitLink[T]) GetName() string
- func (s *SplitLink[T]) GetRuntime() StreamExecutionRuntime
- func (s *SplitLink[T]) GetSerde() serde.StreamSerde[T]
- func (s *SplitLink[T]) GetTransformationName() string
- func (s *SplitLink[T]) GetTypeName() string
- func (s *SplitLink[T]) SetConsumer(consumer TypedStreamConsumer[T])
- type SplitStream
- type Stream
- type StreamBase
- type StreamExecutionEnvironment
- type StreamExecutionRuntime
- type StreamFunction
- type TypedBinaryConsumedStream
- type TypedBinaryKVConsumedStream
- type TypedBinaryKVSplitStream
- type TypedBinarySplitStream
- type TypedConsumedStream
- type TypedEndpointReader
- type TypedEndpointWriter
- type TypedInputStream
- type TypedJoinConsumedStream
- type TypedLinkStream
- type TypedMultiJoinConsumedStream
- type TypedSinkStream
- type TypedSplitStream
- type TypedStream
- type TypedStreamConsumer
- type TypedTransformConsumedStream
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsKeyValueType ¶
func MakeKeyValueSerde ¶
func MakeKeyValueSerde[K comparable, V any](runtime StreamExecutionRuntime) serde.StreamKeyValueSerde[datastruct.KeyValue[K, V]]
func MakeMultiJoinLink ¶
func MakeMultiJoinLink[K comparable, T1, T2, R any]( multiJoin TypedStream[R], stream TypedStream[datastruct.KeyValue[K, T2]])
func MakeSerde ¶
func MakeSerde[T any](runtime StreamExecutionRuntime) serde.StreamSerde[T]
func MakeService ¶
func MakeService[Runtime StreamExecutionRuntime, Cfg config.Config](name string, configSettings *config.ConfigSettings) Runtime
func RegisterKeyValueSerde ¶
func RegisterKeyValueSerde[K comparable, V any](runtime StreamExecutionRuntime) serde.StreamKeyValueSerde[datastruct.KeyValue[K, V]]
func RegisterSerde ¶
func RegisterSerde[T any](runtime StreamExecutionRuntime) serde.StreamSerde[T]
Types ¶
type AppSinkStream ¶
type AppSinkStream[T any] struct { *StreamBase[T] // contains filtered or unexported fields }
func MakeAppSinkStream ¶
func MakeAppSinkStream[T any](name string, stream TypedStream[T], consumer ConsumerFunc[T]) *AppSinkStream[T]
func (*AppSinkStream[T]) Consume ¶
func (s *AppSinkStream[T]) Consume(value T)
type BinaryConsumer ¶
type BinaryConsumer interface {
ConsumeBinary([]byte)
}
type BinaryConsumerFunc ¶
func (BinaryConsumerFunc) Consume ¶
func (f BinaryConsumerFunc) Consume(data []byte) error
type BinaryKVConsumer ¶
type BinaryKVConsumerFunc ¶
type ConsumeStatistics ¶
type ConsumedStream ¶
type ConsumedStream[T any] struct { *StreamBase[T] // contains filtered or unexported fields }
func (*ConsumedStream[T]) Consume ¶
func (s *ConsumedStream[T]) Consume(value T)
func (*ConsumedStream[T]) GetConsumer ¶
func (s *ConsumedStream[T]) GetConsumer() TypedStreamConsumer[T]
func (*ConsumedStream[T]) GetSerde ¶
func (s *ConsumedStream[T]) GetSerde() serde.StreamSerde[T]
func (*ConsumedStream[T]) SetConsumer ¶
func (s *ConsumedStream[T]) SetConsumer(consumer TypedStreamConsumer[T])
type ConsumerFunc ¶
func (ConsumerFunc[T]) Consume ¶
func (f ConsumerFunc[T]) Consume(value T) error
type DataConnector ¶
type DataSink ¶
type DataSink interface { DataConnector Start(context.Context) error Stop(context.Context) GetDataConnector() *config.DataConnectorConfig GetRuntime() StreamExecutionRuntime AddEndpoint(SinkEndpoint) GetEndpoint(id int) SinkEndpoint GetEndpoints() []SinkEndpoint }
type DataSinkEndpoint ¶
type DataSinkEndpoint struct {
// contains filtered or unexported fields
}
func MakeDataSinkEndpoint ¶
func MakeDataSinkEndpoint(dataSink DataSink, config *config.EndpointConfig, runtime StreamExecutionRuntime) *DataSinkEndpoint
func (*DataSinkEndpoint) AddEndpointConsumer ¶
func (ep *DataSinkEndpoint) AddEndpointConsumer(endpointConsumer OutputEndpointConsumer)
func (*DataSinkEndpoint) GetConfig ¶
func (ep *DataSinkEndpoint) GetConfig() *config.EndpointConfig
func (*DataSinkEndpoint) GetDataConnector ¶
func (ep *DataSinkEndpoint) GetDataConnector() DataConnector
func (*DataSinkEndpoint) GetDataSink ¶
func (ep *DataSinkEndpoint) GetDataSink() DataSink
func (*DataSinkEndpoint) GetEndpointConsumers ¶
func (ep *DataSinkEndpoint) GetEndpointConsumers() []OutputEndpointConsumer
func (*DataSinkEndpoint) GetId ¶
func (ep *DataSinkEndpoint) GetId() int
func (*DataSinkEndpoint) GetName ¶
func (ep *DataSinkEndpoint) GetName() string
func (*DataSinkEndpoint) GetRuntime ¶
func (ep *DataSinkEndpoint) GetRuntime() StreamExecutionRuntime
type DataSinkEndpointConsumer ¶
type DataSinkEndpointConsumer[T any] struct { // contains filtered or unexported fields }
func MakeDataSinkEndpointConsumer ¶
func MakeDataSinkEndpointConsumer[T any](endpoint SinkEndpoint, stream TypedSinkStream[T]) *DataSinkEndpointConsumer[T]
func (*DataSinkEndpointConsumer[T]) Endpoint ¶
func (ec *DataSinkEndpointConsumer[T]) Endpoint() SinkEndpoint
func (*DataSinkEndpointConsumer[T]) GetEndpointWriter ¶
func (ec *DataSinkEndpointConsumer[T]) GetEndpointWriter() TypedEndpointWriter[T]
type DataSource ¶
type DataSource interface { DataConnector Start(context.Context) error Stop(context.Context) GetDataConnector() *config.DataConnectorConfig GetRuntime() StreamExecutionRuntime AddEndpoint(InputEndpoint) GetEndpoint(id int) InputEndpoint GetEndpoints() []InputEndpoint }
type DataSourceEndpoint ¶
type DataSourceEndpoint struct {
// contains filtered or unexported fields
}
func MakeDataSourceEndpoint ¶
func MakeDataSourceEndpoint(dataSource DataSource, config *config.EndpointConfig, runtime StreamExecutionRuntime) *DataSourceEndpoint
func (*DataSourceEndpoint) AddEndpointConsumer ¶
func (ep *DataSourceEndpoint) AddEndpointConsumer(endpointConsumer InputEndpointConsumer)
func (*DataSourceEndpoint) GetConfig ¶
func (ep *DataSourceEndpoint) GetConfig() *config.EndpointConfig
func (*DataSourceEndpoint) GetDataConnector ¶
func (ep *DataSourceEndpoint) GetDataConnector() DataConnector
func (*DataSourceEndpoint) GetDataSource ¶
func (ep *DataSourceEndpoint) GetDataSource() DataSource
func (*DataSourceEndpoint) GetEndpointConsumers ¶
func (ep *DataSourceEndpoint) GetEndpointConsumers() []InputEndpointConsumer
func (*DataSourceEndpoint) GetId ¶
func (ep *DataSourceEndpoint) GetId() int
func (*DataSourceEndpoint) GetName ¶
func (ep *DataSourceEndpoint) GetName() string
func (*DataSourceEndpoint) GetRuntime ¶
func (ep *DataSourceEndpoint) GetRuntime() StreamExecutionRuntime
type DataSourceEndpointConsumer ¶
type DataSourceEndpointConsumer[T any] struct { // contains filtered or unexported fields }
func MakeDataSourceEndpointConsumer ¶
func MakeDataSourceEndpointConsumer[T any](endpoint InputEndpoint, inputStream TypedInputStream[T]) *DataSourceEndpointConsumer[T]
func (*DataSourceEndpointConsumer[T]) Consume ¶
func (ec *DataSourceEndpointConsumer[T]) Consume(value T)
func (*DataSourceEndpointConsumer[T]) Endpoint ¶
func (ec *DataSourceEndpointConsumer[T]) Endpoint() InputEndpoint
func (*DataSourceEndpointConsumer[T]) GetEndpointReader ¶
func (ec *DataSourceEndpointConsumer[T]) GetEndpointReader() TypedEndpointReader[T]
type DelayFunctionContext ¶
type DelayFunctionContext[T any] struct { StreamFunction[T] // contains filtered or unexported fields }
type DelayStream ¶
type DelayStream[T any] struct { *ConsumedStream[T] // contains filtered or unexported fields }
func MakeDelayStream ¶
func MakeDelayStream[T any](name string, stream TypedStream[T], f DelayFunction[T]) *DelayStream[T]
func (*DelayStream[T]) Consume ¶
func (s *DelayStream[T]) Consume(value T)
type Endpoint ¶
type Endpoint interface { GetName() string GetId() int GetDataConnector() DataConnector }
type EndpointReader ¶
type EndpointReader interface { }
type EndpointWriter ¶
type EndpointWriter interface { }
type FilterFunction ¶
type FilterFunctionContext ¶
type FilterFunctionContext[T any] struct { StreamFunction[T] // contains filtered or unexported fields }
type FilterStream ¶
type FilterStream[T any] struct { *ConsumedStream[T] // contains filtered or unexported fields }
func MakeFilterStream ¶
func MakeFilterStream[T any](name string, stream TypedStream[T], f FilterFunction[T]) *FilterStream[T]
func (*FilterStream[T]) Consume ¶
func (s *FilterStream[T]) Consume(value T)
type FlatMapFunction ¶
type FlatMapFunctionContext ¶
type FlatMapFunctionContext[T, R any] struct { StreamFunction[R] // contains filtered or unexported fields }
type FlatMapIterableStream ¶
type FlatMapIterableStream[T, R any] struct { *ConsumedStream[R] // contains filtered or unexported fields }
func MakeFlatMapIterableStream ¶
func MakeFlatMapIterableStream[T, R any](name string, stream TypedStream[T]) *FlatMapIterableStream[T, R]
func (*FlatMapIterableStream[T, R]) Consume ¶
func (s *FlatMapIterableStream[T, R]) Consume(value T)
type FlatMapStream ¶
type FlatMapStream[T, R any] struct { *ConsumedStream[R] // contains filtered or unexported fields }
func MakeFlatMapStream ¶
func MakeFlatMapStream[T, R any](name string, stream TypedStream[T], f FlatMapFunction[T, R]) *FlatMapStream[T, R]
func (*FlatMapStream[T, R]) Consume ¶
func (s *FlatMapStream[T, R]) Consume(value T)
type ForEachFunction ¶
type ForEachFunctionContext ¶
type ForEachFunctionContext[T any] struct { StreamFunction[T] // contains filtered or unexported fields }
type ForEachStream ¶
type ForEachStream[T any] struct { *ConsumedStream[T] // contains filtered or unexported fields }
func MakeForEachStream ¶
func MakeForEachStream[T any](name string, stream TypedStream[T], f ForEachFunction[T]) *ForEachStream[T]
func (*ForEachStream[T]) Consume ¶
func (s *ForEachStream[T]) Consume(value T)
type InStubKVStream ¶
type InStubKVStream[T any] struct { *ConsumedStream[T] // contains filtered or unexported fields }
func MakeInStubKVStream ¶
func MakeInStubKVStream[T any](name string, runtime StreamExecutionRuntime) *InStubKVStream[T]
func (*InStubKVStream[T]) ConsumeBinary ¶
func (s *InStubKVStream[T]) ConsumeBinary(key []byte, value []byte)
type InStubStream ¶
type InStubStream[T any] struct { *ConsumedStream[T] }
func MakeInStubStream ¶
func MakeInStubStream[T any](name string, runtime StreamExecutionRuntime) *InStubStream[T]
func (*InStubStream[T]) ConsumeBinary ¶
func (s *InStubStream[T]) ConsumeBinary(data []byte)
type InputDataSource ¶
type InputDataSource struct {
// contains filtered or unexported fields
}
func MakeInputDataSource ¶
func MakeInputDataSource(dataConnector *config.DataConnectorConfig, runtime StreamExecutionRuntime) *InputDataSource
func (*InputDataSource) AddEndpoint ¶
func (ds *InputDataSource) AddEndpoint(endpoint InputEndpoint)
func (*InputDataSource) GetDataConnector ¶
func (ds *InputDataSource) GetDataConnector() *config.DataConnectorConfig
func (*InputDataSource) GetEndpoint ¶
func (ds *InputDataSource) GetEndpoint(id int) InputEndpoint
func (*InputDataSource) GetEndpoints ¶
func (ds *InputDataSource) GetEndpoints() []InputEndpoint
func (*InputDataSource) GetId ¶
func (ds *InputDataSource) GetId() int
func (*InputDataSource) GetName ¶
func (ds *InputDataSource) GetName() string
func (*InputDataSource) GetRuntime ¶
func (ds *InputDataSource) GetRuntime() StreamExecutionRuntime
type InputEndpoint ¶
type InputEndpoint interface { Endpoint GetConfig() *config.EndpointConfig GetRuntime() StreamExecutionRuntime GetDataSource() DataSource AddEndpointConsumer(consumer InputEndpointConsumer) GetEndpointConsumers() []InputEndpointConsumer }
type InputEndpointConsumer ¶
type InputEndpointConsumer interface {
Endpoint() InputEndpoint
}
type InputKVSplitStream ¶
type InputKVSplitStream[T any] struct { *SplitStream[T] // contains filtered or unexported fields }
func MakeInputKVSplitStream ¶
func MakeInputKVSplitStream[T any](name string, runtime StreamExecutionRuntime) *InputKVSplitStream[T]
func (*InputKVSplitStream[T]) ConsumeBinary ¶
func (s *InputKVSplitStream[T]) ConsumeBinary(key []byte, value []byte)
type InputSplitStream ¶
type InputSplitStream[T any] struct { *SplitStream[T] }
func MakeInputSplitStream ¶
func MakeInputSplitStream[T any](name string, runtime StreamExecutionRuntime) *InputSplitStream[T]
func (*InputSplitStream[T]) ConsumeBinary ¶
func (s *InputSplitStream[T]) ConsumeBinary(data []byte)
type InputStream ¶
type InputStream[T any] struct { *ConsumedStream[T] }
func MakeInputStream ¶
func MakeInputStream[T any](name string, streamExecutionRuntime StreamExecutionRuntime) *InputStream[T]
func (*InputStream[T]) GetEndpointId ¶
func (s *InputStream[T]) GetEndpointId() int
type JoinFunction ¶
type JoinFunction[K comparable, T1, T2, R any] interface { Join(Stream, K, []T1, []T2, Collect[R]) bool }
type JoinFunctionContext ¶
type JoinFunctionContext[K comparable, T1, T2, R any] struct { StreamFunction[R] // contains filtered or unexported fields }
type JoinLink ¶
type JoinLink[K comparable, T1, T2, R any] struct { // contains filtered or unexported fields }
func (*JoinLink[K, T1, T2, R]) Consume ¶
func (s *JoinLink[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T2])
func (*JoinLink[K, T1, T2, R]) GetConfig ¶
func (s *JoinLink[K, T1, T2, R]) GetConfig() *config.StreamConfig
func (*JoinLink[K, T1, T2, R]) GetRuntime ¶
func (s *JoinLink[K, T1, T2, R]) GetRuntime() StreamExecutionRuntime
func (*JoinLink[K, T1, T2, R]) GetTransformationName ¶
func (*JoinLink[K, T1, T2, R]) GetTypeName ¶
type JoinStream ¶
type JoinStream[K comparable, T1, T2, R any] struct { *ConsumedStream[R] // contains filtered or unexported fields }
func MakeJoinStream ¶
func MakeJoinStream[K comparable, T1, T2, R any](name string, stream TypedStream[datastruct.KeyValue[K, T1]], streamRight TypedStream[datastruct.KeyValue[K, T2]], f JoinFunction[K, T1, T2, R]) *JoinStream[K, T1, T2, R]
func (*JoinStream[K, T1, T2, R]) Consume ¶
func (s *JoinStream[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T1])
func (*JoinStream[K, T1, T2, R]) ConsumeRight ¶
func (s *JoinStream[K, T1, T2, R]) ConsumeRight(value datastruct.KeyValue[K, T2])
func (*JoinStream[K, T1, T2, R]) Out ¶
func (s *JoinStream[K, T1, T2, R]) Out(value R)
type KeyByFunction ¶
type KeyByFunction[T any, K comparable, V any] interface { KeyBy(Stream, T) datastruct.KeyValue[K, V] }
type KeyByFunctionContext ¶
type KeyByFunctionContext[T any, K comparable, V any] struct { StreamFunction[datastruct.KeyValue[K, V]] // contains filtered or unexported fields }
type KeyByStream ¶
type KeyByStream[T any, K comparable, V any] struct { *ConsumedStream[datastruct.KeyValue[K, V]] // contains filtered or unexported fields }
func MakeKeyByStream ¶
func MakeKeyByStream[T any, K comparable, V any](name string, stream TypedStream[T], f KeyByFunction[T, K, V]) *KeyByStream[T, K, V]
func (*KeyByStream[T, K, V]) Consume ¶
func (s *KeyByStream[T, K, V]) Consume(value T)
type LinkStream ¶
type LinkStream[T any] struct { *ConsumedStream[T] // contains filtered or unexported fields }
func MakeLinkStream ¶
func MakeLinkStream[T any](name string, runtime StreamExecutionRuntime) *LinkStream[T]
func (*LinkStream[T]) Consume ¶
func (s *LinkStream[T]) Consume(value T)
func (*LinkStream[T]) SetSource ¶
func (s *LinkStream[T]) SetSource(stream TypedConsumedStream[T])
type MapFunction ¶
type MapFunctionContext ¶
type MapFunctionContext[T, R any] struct { StreamFunction[R] // contains filtered or unexported fields }
type MapStream ¶
type MapStream[T, R any] struct { *ConsumedStream[R] // contains filtered or unexported fields }
func MakeMapStream ¶
func MakeMapStream[T, R any](name string, stream TypedStream[T], f MapFunction[T, R]) *MapStream[T, R]
type MergeLink ¶
type MergeLink[T any] struct { // contains filtered or unexported fields }
func (*MergeLink[T]) GetConfig ¶
func (s *MergeLink[T]) GetConfig() *config.StreamConfig
func (*MergeLink[T]) GetRuntime ¶
func (s *MergeLink[T]) GetRuntime() StreamExecutionRuntime
func (*MergeLink[T]) GetTransformationName ¶
func (*MergeLink[T]) GetTypeName ¶
type MergeStream ¶
type MergeStream[T any] struct { *ConsumedStream[T] }
func MakeMergeStream ¶
func MakeMergeStream[T any](name string, streams ...TypedStream[T]) *MergeStream[T]
type MultiJoinFunction ¶
type MultiJoinFunction[K comparable, T, R any] interface { MultiJoin(Stream, K, [][]interface{}, Collect[R]) bool }
type MultiJoinFunctionContext ¶
type MultiJoinFunctionContext[K comparable, T, R any] struct { StreamFunction[R] // contains filtered or unexported fields }
type MultiJoinLinkStream ¶
type MultiJoinLinkStream[K comparable, T1, T2, R any] struct { // contains filtered or unexported fields }
func (*MultiJoinLinkStream[K, T1, T2, R]) Consume ¶
func (s *MultiJoinLinkStream[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T2])
func (*MultiJoinLinkStream[K, T1, T2, R]) GetConfig ¶
func (s *MultiJoinLinkStream[K, T1, T2, R]) GetConfig() *config.StreamConfig
func (*MultiJoinLinkStream[K, T1, T2, R]) GetId ¶
func (s *MultiJoinLinkStream[K, T1, T2, R]) GetId() int
func (*MultiJoinLinkStream[K, T1, T2, R]) GetName ¶
func (s *MultiJoinLinkStream[K, T1, T2, R]) GetName() string
func (*MultiJoinLinkStream[K, T1, T2, R]) GetRuntime ¶
func (s *MultiJoinLinkStream[K, T1, T2, R]) GetRuntime() StreamExecutionRuntime
func (*MultiJoinLinkStream[K, T1, T2, R]) GetTransformationName ¶
func (s *MultiJoinLinkStream[K, T1, T2, R]) GetTransformationName() string
func (*MultiJoinLinkStream[K, T1, T2, R]) GetTypeName ¶
func (s *MultiJoinLinkStream[K, T1, T2, R]) GetTypeName() string
type MultiJoinStream ¶
type MultiJoinStream[K comparable, T, R any] struct { *ConsumedStream[R] // contains filtered or unexported fields }
func MakeMultiJoinStream ¶
func MakeMultiJoinStream[K comparable, T, R any]( name string, leftStream TypedStream[datastruct.KeyValue[K, T]], f MultiJoinFunction[K, T, R]) *MultiJoinStream[K, T, R]
func (*MultiJoinStream[K, T, R]) Consume ¶
func (s *MultiJoinStream[K, T, R]) Consume(value datastruct.KeyValue[K, T])
func (*MultiJoinStream[K, T, R]) ConsumeRight ¶
func (s *MultiJoinStream[K, T, R]) ConsumeRight(index int, value datastruct.KeyValue[K, interface{}])
func (*MultiJoinStream[K, T, R]) Out ¶
func (s *MultiJoinStream[K, T, R]) Out(value R)
type NetworkData ¶
type OutStubBinaryKVStream ¶
type OutStubBinaryKVStream[T any] struct { *ConsumedStream[T] // contains filtered or unexported fields }
func MakeOutStubBinaryKVStream ¶
func MakeOutStubBinaryKVStream[T any](name string, stream TypedStream[T], consumer BinaryKVConsumerFunc) *OutStubBinaryKVStream[T]
func (*OutStubBinaryKVStream[T]) Consume ¶
func (s *OutStubBinaryKVStream[T]) Consume(value T)
type OutStubBinaryStream ¶
type OutStubBinaryStream[T any] struct { *ConsumedStream[T] // contains filtered or unexported fields }
func MakeOutStubBinaryStream ¶
func MakeOutStubBinaryStream[T any](name string, stream TypedStream[T], consumer BinaryConsumerFunc) *OutStubBinaryStream[T]
func (*OutStubBinaryStream[T]) Consume ¶
func (s *OutStubBinaryStream[T]) Consume(value T)
type OutStubStream ¶
type OutStubStream[T any] struct { *ConsumedStream[T] // contains filtered or unexported fields }
func MakeOutStubStream ¶
func MakeOutStubStream[T any](name string, stream TypedStream[T], consumer ConsumerFunc[T]) *OutStubStream[T]
func (*OutStubStream[T]) Consume ¶
func (s *OutStubStream[T]) Consume(value T)
type OutputDataSink ¶
type OutputDataSink struct {
// contains filtered or unexported fields
}
func MakeOutputDataSink ¶
func MakeOutputDataSink(dataConnector *config.DataConnectorConfig, runtime StreamExecutionRuntime) *OutputDataSink
func (*OutputDataSink) AddEndpoint ¶
func (ds *OutputDataSink) AddEndpoint(endpoint SinkEndpoint)
func (*OutputDataSink) GetDataConnector ¶
func (ds *OutputDataSink) GetDataConnector() *config.DataConnectorConfig
func (*OutputDataSink) GetEndpoint ¶
func (ds *OutputDataSink) GetEndpoint(id int) SinkEndpoint
func (*OutputDataSink) GetEndpoints ¶
func (ds *OutputDataSink) GetEndpoints() []SinkEndpoint
func (*OutputDataSink) GetId ¶
func (ds *OutputDataSink) GetId() int
func (*OutputDataSink) GetName ¶
func (ds *OutputDataSink) GetName() string
func (*OutputDataSink) GetRuntime ¶
func (ds *OutputDataSink) GetRuntime() StreamExecutionRuntime
type OutputEndpointConsumer ¶
type OutputEndpointConsumer interface {
Endpoint() SinkEndpoint
}
type ParallelsFunction ¶
type ParallelsFunctionContext ¶
type ParallelsFunctionContext[T, R any] struct { StreamFunction[R] // contains filtered or unexported fields }
type ParallelsStream ¶
type ParallelsStream[T, R any] struct { *ConsumedStream[R] // contains filtered or unexported fields }
func MakeParallelsStream ¶
func MakeParallelsStream[T, R any](name string, stream TypedStream[T], f ParallelsFunction[T, R]) *ParallelsStream[T, R]
func (*ParallelsStream[T, R]) Consume ¶
func (s *ParallelsStream[T, R]) Consume(value T)
type ServiceApp ¶
type ServiceApp struct {
// contains filtered or unexported fields
}
func (*ServiceApp) AddDataSink ¶
func (app *ServiceApp) AddDataSink(dataSink DataSink)
func (*ServiceApp) AddDataSource ¶
func (app *ServiceApp) AddDataSource(dataSource DataSource)
func (*ServiceApp) Delay ¶
func (app *ServiceApp) Delay(duration time.Duration, f func())
func (*ServiceApp) GetConfig ¶
func (app *ServiceApp) GetConfig() *config.ServiceAppConfig
func (*ServiceApp) GetConsumeTimeout ¶
func (app *ServiceApp) GetConsumeTimeout(from int, to int) time.Duration
func (*ServiceApp) GetDataSink ¶
func (app *ServiceApp) GetDataSink(id int) DataSink
func (*ServiceApp) GetDataSource ¶
func (app *ServiceApp) GetDataSource(id int) DataSource
func (*ServiceApp) GetEndpointReader ¶
func (app *ServiceApp) GetEndpointReader(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointReader
func (*ServiceApp) GetEndpointWriter ¶
func (app *ServiceApp) GetEndpointWriter(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointWriter
func (*ServiceApp) GetMetrics ¶
func (app *ServiceApp) GetMetrics() metrics.Metrics
func (*ServiceApp) GetServiceConfig ¶
func (app *ServiceApp) GetServiceConfig() *config.ServiceConfig
func (*ServiceApp) ReloadConfig ¶
func (app *ServiceApp) ReloadConfig(config config.Config)
func (*ServiceApp) Stop ¶
func (app *ServiceApp) Stop(ctx context.Context)
type ServiceStream ¶
type ServiceStream interface { Stream // contains filtered or unexported methods }
type SinkEndpoint ¶
type SinkEndpoint interface { Endpoint GetConfig() *config.EndpointConfig GetRuntime() StreamExecutionRuntime GetDataSink() DataSink AddEndpointConsumer(consumer OutputEndpointConsumer) GetEndpointConsumers() []OutputEndpointConsumer }
type SinkStream ¶
type SinkStream[T any] struct { *StreamBase[T] // contains filtered or unexported fields }
func MakeSinkStream ¶
func MakeSinkStream[T any](name string, stream TypedStream[T]) *SinkStream[T]
func (*SinkStream[T]) Consume ¶
func (s *SinkStream[T]) Consume(value T)
func (*SinkStream[T]) GetEndpointId ¶
func (s *SinkStream[T]) GetEndpointId() int
func (*SinkStream[T]) SetConsumer ¶
func (s *SinkStream[T]) SetConsumer(consumer Consumer[T])
type SplitLink ¶
type SplitLink[T any] struct { // contains filtered or unexported fields }
func (*SplitLink[T]) GetConfig ¶
func (s *SplitLink[T]) GetConfig() *config.StreamConfig
func (*SplitLink[T]) GetConsumer ¶
func (s *SplitLink[T]) GetConsumer() TypedStreamConsumer[T]
func (*SplitLink[T]) GetRuntime ¶
func (s *SplitLink[T]) GetRuntime() StreamExecutionRuntime
func (*SplitLink[T]) GetSerde ¶
func (s *SplitLink[T]) GetSerde() serde.StreamSerde[T]
func (*SplitLink[T]) GetTransformationName ¶
func (*SplitLink[T]) GetTypeName ¶
func (*SplitLink[T]) SetConsumer ¶
func (s *SplitLink[T]) SetConsumer(consumer TypedStreamConsumer[T])
type SplitStream ¶
type SplitStream[T any] struct { *ConsumedStream[T] // contains filtered or unexported fields }
func MakeSplitStream ¶
func MakeSplitStream[T any](name string, stream TypedStream[T]) *SplitStream[T]
func (*SplitStream[T]) AddStream ¶
func (s *SplitStream[T]) AddStream() TypedConsumedStream[T]
func (*SplitStream[T]) Consume ¶
func (s *SplitStream[T]) Consume(value T)
type Stream ¶
type Stream interface { GetName() string GetTransformationName() string GetTypeName() string GetId() int GetConfig() *config.StreamConfig GetRuntime() StreamExecutionRuntime }
type StreamBase ¶
type StreamBase[T any] struct { // contains filtered or unexported fields }
func (*StreamBase[T]) GetConfig ¶
func (s *StreamBase[T]) GetConfig() *config.StreamConfig
func (*StreamBase[T]) GetId ¶
func (s *StreamBase[T]) GetId() int
func (*StreamBase[T]) GetName ¶
func (s *StreamBase[T]) GetName() string
func (*StreamBase[T]) GetRuntime ¶
func (s *StreamBase[T]) GetRuntime() StreamExecutionRuntime
func (*StreamBase[T]) GetTransformationName ¶
func (s *StreamBase[T]) GetTransformationName() string
func (*StreamBase[T]) GetTypeName ¶
func (s *StreamBase[T]) GetTypeName() string
type StreamExecutionEnvironment ¶
type StreamExecutionEnvironment interface { config.ServiceEnvironmentConfig GetSerde(valueType reflect.Type) (serde.Serializer, error) StreamsInit(ctx context.Context) SetConfig(config config.Config) Start(context.Context) error Stop(context.Context) AddDataSource(dataSource DataSource) GetDataSource(id int) DataSource AddDataSink(dataSink DataSink) GetDataSink(id int) DataSink GetConsumeTimeout(from int, to int) time.Duration GetEndpointReader(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointReader GetEndpointWriter(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointWriter GetMetrics() metrics.Metrics Delay(duration time.Duration, f func()) }
type StreamExecutionRuntime ¶
type StreamExecutionRuntime interface { StreamExecutionEnvironment // contains filtered or unexported methods }
type StreamFunction ¶
type StreamFunction[T any] struct { // contains filtered or unexported fields }
func (*StreamFunction[T]) AfterCall ¶
func (f *StreamFunction[T]) AfterCall()
func (*StreamFunction[T]) BeforeCall ¶
func (f *StreamFunction[T]) BeforeCall()
type TypedBinaryConsumedStream ¶
type TypedBinaryConsumedStream[T any] interface { TypedConsumedStream[T] BinaryConsumer }
type TypedBinaryKVConsumedStream ¶
type TypedBinaryKVConsumedStream[T any] interface { TypedConsumedStream[T] BinaryKVConsumer }
type TypedBinaryKVSplitStream ¶
type TypedBinaryKVSplitStream[T any] interface { TypedBinaryKVConsumedStream[T] AddStream() TypedConsumedStream[T] }
type TypedBinarySplitStream ¶
type TypedBinarySplitStream[T any] interface { TypedBinaryConsumedStream[T] AddStream() TypedConsumedStream[T] }
type TypedConsumedStream ¶
type TypedConsumedStream[T any] interface { TypedStream[T] Consumer[T] }
type TypedEndpointReader ¶
type TypedEndpointReader[T any] interface { EndpointReader Read(io.Reader) (T, error) }
type TypedEndpointWriter ¶
type TypedEndpointWriter[T any] interface { EndpointWriter Write(T, io.Writer) error }
type TypedInputStream ¶
type TypedInputStream[T any] interface { TypedStream[T] Consumer[T] GetEndpointId() int }
type TypedJoinConsumedStream ¶
type TypedJoinConsumedStream[K comparable, T1, T2, R any] interface { TypedTransformConsumedStream[datastruct.KeyValue[K, T1], R] ConsumeRight(datastruct.KeyValue[K, T2]) }
type TypedLinkStream ¶
type TypedLinkStream[T any] interface { TypedStream[T] Consumer[T] SetSource(TypedConsumedStream[T]) }
type TypedMultiJoinConsumedStream ¶
type TypedMultiJoinConsumedStream[K comparable, T, R any] interface { TypedTransformConsumedStream[datastruct.KeyValue[K, T], R] ConsumeRight(int, datastruct.KeyValue[K, interface{}]) }
type TypedSinkStream ¶
type TypedSinkStream[T any] interface { TypedStreamConsumer[T] GetEndpointId() int SetConsumer(Consumer[T]) }
type TypedSplitStream ¶
type TypedSplitStream[T any] interface { TypedConsumedStream[T] AddStream() TypedConsumedStream[T] }
type TypedStream ¶
type TypedStream[T any] interface { Stream GetConsumer() TypedStreamConsumer[T] GetSerde() serde.StreamSerde[T] SetConsumer(TypedStreamConsumer[T]) }
type TypedStreamConsumer ¶
type TypedTransformConsumedStream ¶
type TypedTransformConsumedStream[T any, R any] interface { TypedStream[R] Consumer[T] }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.