Documentation ¶
Index ¶
- func IsKeyValueType[T any]() bool
- func MakeKeyValueSerde[K comparable, V any](runtime ServiceExecutionRuntime) serde.StreamKeyValueSerde[datastruct.KeyValue[K, V]]
- func MakeMultiJoinLink[K comparable, T1, T2, R any](multiJoinStream TypedMultiJoinConsumedStream[K, T1, R], ...)
- func MakeSerde[T any](runtime ServiceExecutionRuntime) serde.StreamSerde[T]
- func MakeService[Environment ServiceExecutionEnvironment, Cfg config.Config](name string, dep environment.ServiceDependency, ...) (Environment, error)
- type AppInputStream
- type AppSinkStream
- type BinaryConsumer
- type BinaryConsumerFunc
- type BinaryKVConsumer
- type BinaryKVConsumerFunc
- type Caller
- type Collect
- type Collection
- type ConsumeStatistics
- type ConsumedStream
- type Consumer
- type ConsumerFunc
- type DataConnector
- type DataSink
- type DataSinkEndpoint
- func (ep *DataSinkEndpoint) AddEndpointConsumer(endpointConsumer OutputEndpointConsumer)
- func (ep *DataSinkEndpoint) GetConfig() *config.EndpointConfig
- func (ep *DataSinkEndpoint) GetDataConnector() DataConnector
- func (ep *DataSinkEndpoint) GetDataSink() DataSink
- func (ep *DataSinkEndpoint) GetEndpointConsumers() Collection[OutputEndpointConsumer]
- func (ep *DataSinkEndpoint) GetEnvironment() ServiceExecutionEnvironment
- func (ep *DataSinkEndpoint) GetId() int
- func (ep *DataSinkEndpoint) GetName() string
- type DataSinkEndpointConsumer
- type DataSource
- type DataSourceEndpoint
- func (ep *DataSourceEndpoint) AddEndpointConsumer(endpointConsumer InputEndpointConsumer)
- func (ep *DataSourceEndpoint) GetConfig() *config.EndpointConfig
- func (ep *DataSourceEndpoint) GetDataConnector() DataConnector
- func (ep *DataSourceEndpoint) GetDataSource() DataSource
- func (ep *DataSourceEndpoint) GetEndpointConsumers() Collection[InputEndpointConsumer]
- func (ep *DataSourceEndpoint) GetEnvironment() ServiceExecutionEnvironment
- func (ep *DataSourceEndpoint) GetId() int
- func (ep *DataSourceEndpoint) GetName() string
- type DataSourceEndpointConsumer
- type DelayFunc
- type DelayFunction
- type DelayFunctionContext
- type DelayStream
- type Edge
- type Endpoint
- type EndpointReader
- type EndpointWriter
- type FilterFunction
- type FilterFunctionContext
- type FilterStream
- type FlatMapFunction
- type FlatMapFunctionContext
- type FlatMapIterableStream
- type FlatMapStream
- type ForEachFunction
- type ForEachFunctionContext
- type ForEachStream
- type InStubKVStream
- type InStubStream
- type InputDataSource
- func (ds *InputDataSource) AddEndpoint(endpoint InputEndpoint)
- func (ds *InputDataSource) GetDataConnector() *config.DataConnectorConfig
- func (ds *InputDataSource) GetEndpoint(id int) InputEndpoint
- func (ds *InputDataSource) GetEndpoints() Collection[InputEndpoint]
- func (ds *InputDataSource) GetEnvironment() ServiceExecutionEnvironment
- func (ds *InputDataSource) GetId() int
- func (ds *InputDataSource) GetName() string
- type InputEndpoint
- type InputEndpointConsumer
- type InputKVSplitStream
- type InputSplitStream
- type InputStream
- type JoinFunction
- type JoinFunctionContext
- type JoinLink
- func (s *JoinLink[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T2])
- func (s *JoinLink[K, T1, T2, R]) GetConfig() *config.StreamConfig
- func (s *JoinLink[K, T1, T2, R]) GetConsumers() []Stream
- func (s *JoinLink[K, T1, T2, R]) GetEnvironment() ServiceExecutionEnvironment
- func (s *JoinLink[K, T1, T2, R]) GetId() int
- func (s *JoinLink[K, T1, T2, R]) GetName() string
- func (s *JoinLink[K, T1, T2, R]) GetTransformationName() string
- func (s *JoinLink[K, T1, T2, R]) GetTypeName() string
- func (s *JoinLink[K, T1, T2, R]) Validate() error
- type JoinStream
- type KeyByFunction
- type KeyByFunctionContext
- type KeyByStream
- type LinkStream
- type MapFunction
- type MapFunctionContext
- type MapStream
- type MergeLink
- func (s *MergeLink[T]) Consume(value T)
- func (s *MergeLink[T]) GetConfig() *config.StreamConfig
- func (s *MergeLink[T]) GetConsumers() []Stream
- func (s *MergeLink[T]) GetEnvironment() ServiceExecutionEnvironment
- func (s *MergeLink[T]) GetId() int
- func (s *MergeLink[T]) GetName() string
- func (s *MergeLink[T]) GetTransformationName() string
- func (s *MergeLink[T]) GetTypeName() string
- func (s *MergeLink[T]) Validate() error
- type MergeStream
- type MultiJoinFunction
- type MultiJoinFunctionContext
- type MultiJoinLinkStream
- func (s *MultiJoinLinkStream[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T2])
- func (s *MultiJoinLinkStream[K, T1, T2, R]) GetConfig() *config.StreamConfig
- func (s *MultiJoinLinkStream[K, T1, T2, R]) GetConsumers() []Stream
- func (s *MultiJoinLinkStream[K, T1, T2, R]) GetEnvironment() ServiceExecutionEnvironment
- func (s *MultiJoinLinkStream[K, T1, T2, R]) GetId() int
- func (s *MultiJoinLinkStream[K, T1, T2, R]) GetName() string
- func (s *MultiJoinLinkStream[K, T1, T2, R]) GetTransformationName() string
- func (s *MultiJoinLinkStream[K, T1, T2, R]) GetTypeName() string
- func (s *MultiJoinLinkStream[K, T1, T2, R]) Validate() error
- type MultiJoinStream
- type NetworkData
- type Node
- type OutStubBinaryKVStream
- type OutStubBinaryStream
- type OutStubStream
- type OutputDataSink
- func (ds *OutputDataSink) AddEndpoint(endpoint SinkEndpoint)
- func (ds *OutputDataSink) GetDataConnector() *config.DataConnectorConfig
- func (ds *OutputDataSink) GetEndpoint(id int) SinkEndpoint
- func (ds *OutputDataSink) GetEndpoints() Collection[SinkEndpoint]
- func (ds *OutputDataSink) GetEnvironment() ServiceExecutionEnvironment
- func (ds *OutputDataSink) GetId() int
- func (ds *OutputDataSink) GetName() string
- type OutputEndpointConsumer
- type ParallelsFunction
- type ParallelsFunctionContext
- type ParallelsStream
- type ServiceApp
- func (app *ServiceApp) AddDataSink(dataSink DataSink)
- func (app *ServiceApp) AddDataSource(dataSource DataSource)
- func (app *ServiceApp) AppConfig() *config.ServiceAppConfig
- func (app *ServiceApp) Delay(duration time.Duration, f func())
- func (app *ServiceApp) GetConsumeTimeout(from int, to int) time.Duration
- func (app *ServiceApp) GetDataSink(id int) DataSink
- func (app *ServiceApp) GetDataSource(id int) DataSource
- func (app *ServiceApp) GetEndpointReader(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointReader
- func (app *ServiceApp) GetEndpointWriter(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointWriter
- func (app *ServiceApp) GetHttpRoute() httproute.HttpRoute
- func (app *ServiceApp) GetPriorityTaskPool(name string) pool.PriorityTaskPool
- func (app *ServiceApp) GetRuntime() ServiceExecutionRuntime
- func (app *ServiceApp) GetTaskPool(name string) pool.TaskPool
- func (app *ServiceApp) Log() log.Logger
- func (app *ServiceApp) Metrics() metrics.Metrics
- func (app *ServiceApp) Release()
- func (app *ServiceApp) ReloadConfig(config config.Config)
- func (app *ServiceApp) ServiceConfig() *config.ServiceConfig
- func (app *ServiceApp) ServiceDependency() environment.ServiceDependency
- func (app *ServiceApp) ServiceInit() error
- func (app *ServiceApp) Start(ctx context.Context) error
- func (app *ServiceApp) Stop(ctx context.Context)
- type ServiceExecutionEnvironment
- type ServiceExecutionRuntime
- type ServiceLoader
- type ServiceStream
- func (s *ServiceStream[T]) GetConfig() *config.StreamConfig
- func (s *ServiceStream[T]) GetEnvironment() ServiceExecutionEnvironment
- func (s *ServiceStream[T]) GetId() int
- func (s *ServiceStream[T]) GetName() string
- func (s *ServiceStream[T]) GetTransformationName() string
- func (s *ServiceStream[T]) GetTypeName() string
- func (s *ServiceStream[T]) Validate() error
- type SinkCallback
- type SinkConsumer
- type SinkEndpoint
- type SinkFunction
- type SinkFunctionContext
- type SinkStream
- type SplitLink
- func (s *SplitLink[T]) Consume(value T)
- func (s *SplitLink[T]) GetConfig() *config.StreamConfig
- func (s *SplitLink[T]) GetConsumer() TypedStreamConsumer[T]
- func (s *SplitLink[T]) GetConsumers() []Stream
- func (s *SplitLink[T]) GetEnvironment() ServiceExecutionEnvironment
- func (s *SplitLink[T]) GetId() int
- func (s *SplitLink[T]) GetName() string
- func (s *SplitLink[T]) GetSerde() serde.StreamSerde[T]
- func (s *SplitLink[T]) GetTransformationName() string
- func (s *SplitLink[T]) GetTypeName() string
- func (s *SplitLink[T]) SetConsumer(consumer TypedStreamConsumer[T])
- func (s *SplitLink[T]) Validate() error
- type SplitStream
- type Stream
- type StreamFunction
- type TypedBinaryConsumedStream
- type TypedBinaryKVConsumedStream
- type TypedBinaryKVSplitStream
- type TypedBinarySplitStream
- type TypedConsumedStream
- type TypedEndpointReader
- type TypedEndpointWriter
- type TypedInputStream
- type TypedJoinConsumedStream
- type TypedLinkStream
- type TypedMultiJoinConsumedStream
- type TypedSinkStream
- type TypedSplitStream
- type TypedStream
- type TypedStreamConsumer
- type TypedTransformConsumedStream
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsKeyValueType ¶
func MakeKeyValueSerde ¶
func MakeKeyValueSerde[K comparable, V any](runtime ServiceExecutionRuntime) serde.StreamKeyValueSerde[datastruct.KeyValue[K, V]]
func MakeMultiJoinLink ¶
func MakeMultiJoinLink[K comparable, T1, T2, R any]( multiJoinStream TypedMultiJoinConsumedStream[K, T1, R], rightStream TypedStream[datastruct.KeyValue[K, T2]])
func MakeSerde ¶
func MakeSerde[T any](runtime ServiceExecutionRuntime) serde.StreamSerde[T]
func MakeService ¶
func MakeService[Environment ServiceExecutionEnvironment, Cfg config.Config](name string, dep environment.ServiceDependency, configSettings *config.ConfigSettings) (Environment, error)
Types ¶
type AppInputStream ¶ added in v0.0.180
type AppInputStream[T any] struct { ConsumedStream[T] }
func MakeAppInputStream ¶ added in v0.0.180
func MakeAppInputStream[T any](name string, env ServiceExecutionEnvironment) *AppInputStream[T]
func (*AppInputStream[T]) Consume ¶ added in v0.0.180
func (s *AppInputStream[T]) Consume(value T)
type AppSinkStream ¶
type AppSinkStream[T any] struct { ServiceStream[T] // contains filtered or unexported fields }
func MakeAppSinkStream ¶
func MakeAppSinkStream[T any](name string, stream TypedStream[T], consumer ConsumerFunc[T]) *AppSinkStream[T]
func (*AppSinkStream[T]) Consume ¶
func (s *AppSinkStream[T]) Consume(value T)
func (*AppSinkStream[T]) GetConsumers ¶ added in v0.0.134
func (s *AppSinkStream[T]) GetConsumers() []Stream
type BinaryConsumer ¶
type BinaryConsumer interface {
ConsumeBinary([]byte)
}
type BinaryConsumerFunc ¶
func (BinaryConsumerFunc) Consume ¶
func (f BinaryConsumerFunc) Consume(data []byte) error
type BinaryKVConsumer ¶
type BinaryKVConsumerFunc ¶
type Collection ¶ added in v0.0.173
type Collection[T any] struct { // contains filtered or unexported fields }
func NewCollection ¶ added in v0.0.173
func NewCollection[T any](data []T) Collection[T]
func (Collection[T]) At ¶ added in v0.0.173
func (s Collection[T]) At(i int) T
func (Collection[T]) Len ¶ added in v0.0.173
func (s Collection[T]) Len() int
type ConsumeStatistics ¶
type ConsumeStatistics interface {
Count() int64
}
type ConsumedStream ¶
type ConsumedStream[T any] struct { ServiceStream[T] // contains filtered or unexported fields }
func (*ConsumedStream[T]) GetConsumer ¶
func (s *ConsumedStream[T]) GetConsumer() TypedStreamConsumer[T]
func (*ConsumedStream[T]) GetConsumers ¶ added in v0.0.134
func (s *ConsumedStream[T]) GetConsumers() []Stream
func (*ConsumedStream[T]) GetSerde ¶
func (s *ConsumedStream[T]) GetSerde() serde.StreamSerde[T]
func (*ConsumedStream[T]) SetConsumer ¶
func (s *ConsumedStream[T]) SetConsumer(consumer TypedStreamConsumer[T])
type ConsumerFunc ¶
func (ConsumerFunc[T]) Consume ¶
func (f ConsumerFunc[T]) Consume(value T) error
type DataConnector ¶
type DataSink ¶
type DataSink interface { DataConnector Start(context.Context) error Stop(context.Context) GetDataConnector() *config.DataConnectorConfig GetEnvironment() ServiceExecutionEnvironment AddEndpoint(SinkEndpoint) GetEndpoint(id int) SinkEndpoint GetEndpoints() Collection[SinkEndpoint] }
type DataSinkEndpoint ¶
type DataSinkEndpoint struct {
// contains filtered or unexported fields
}
func MakeDataSinkEndpoint ¶
func MakeDataSinkEndpoint(dataSink DataSink, id int, environment ServiceExecutionEnvironment) *DataSinkEndpoint
func (*DataSinkEndpoint) AddEndpointConsumer ¶
func (ep *DataSinkEndpoint) AddEndpointConsumer(endpointConsumer OutputEndpointConsumer)
func (*DataSinkEndpoint) GetConfig ¶
func (ep *DataSinkEndpoint) GetConfig() *config.EndpointConfig
func (*DataSinkEndpoint) GetDataConnector ¶
func (ep *DataSinkEndpoint) GetDataConnector() DataConnector
func (*DataSinkEndpoint) GetDataSink ¶
func (ep *DataSinkEndpoint) GetDataSink() DataSink
func (*DataSinkEndpoint) GetEndpointConsumers ¶
func (ep *DataSinkEndpoint) GetEndpointConsumers() Collection[OutputEndpointConsumer]
func (*DataSinkEndpoint) GetEnvironment ¶ added in v0.0.134
func (ep *DataSinkEndpoint) GetEnvironment() ServiceExecutionEnvironment
func (*DataSinkEndpoint) GetId ¶
func (ep *DataSinkEndpoint) GetId() int
func (*DataSinkEndpoint) GetName ¶
func (ep *DataSinkEndpoint) GetName() string
type DataSinkEndpointConsumer ¶
type DataSinkEndpointConsumer[T, R any] struct { // contains filtered or unexported fields }
func MakeDataSinkEndpointConsumer ¶
func MakeDataSinkEndpointConsumer[T, R any](endpoint SinkEndpoint, stream TypedSinkStream[T, R]) *DataSinkEndpointConsumer[T, R]
func (*DataSinkEndpointConsumer[T, R]) Endpoint ¶
func (ec *DataSinkEndpointConsumer[T, R]) Endpoint() SinkEndpoint
func (*DataSinkEndpointConsumer[T, R]) Stream ¶ added in v0.0.175
func (ec *DataSinkEndpointConsumer[T, R]) Stream() TypedSinkStream[T, R]
type DataSource ¶
type DataSource interface { DataConnector Start(context.Context) error Stop(context.Context) GetDataConnector() *config.DataConnectorConfig GetEnvironment() ServiceExecutionEnvironment AddEndpoint(InputEndpoint) GetEndpoint(id int) InputEndpoint GetEndpoints() Collection[InputEndpoint] }
type DataSourceEndpoint ¶
type DataSourceEndpoint struct {
// contains filtered or unexported fields
}
func MakeDataSourceEndpoint ¶
func MakeDataSourceEndpoint(dataSource DataSource, id int, environment ServiceExecutionEnvironment) *DataSourceEndpoint
func (*DataSourceEndpoint) AddEndpointConsumer ¶
func (ep *DataSourceEndpoint) AddEndpointConsumer(endpointConsumer InputEndpointConsumer)
func (*DataSourceEndpoint) GetConfig ¶
func (ep *DataSourceEndpoint) GetConfig() *config.EndpointConfig
func (*DataSourceEndpoint) GetDataConnector ¶
func (ep *DataSourceEndpoint) GetDataConnector() DataConnector
func (*DataSourceEndpoint) GetDataSource ¶
func (ep *DataSourceEndpoint) GetDataSource() DataSource
func (*DataSourceEndpoint) GetEndpointConsumers ¶
func (ep *DataSourceEndpoint) GetEndpointConsumers() Collection[InputEndpointConsumer]
func (*DataSourceEndpoint) GetEnvironment ¶ added in v0.0.134
func (ep *DataSourceEndpoint) GetEnvironment() ServiceExecutionEnvironment
func (*DataSourceEndpoint) GetId ¶
func (ep *DataSourceEndpoint) GetId() int
func (*DataSourceEndpoint) GetName ¶
func (ep *DataSourceEndpoint) GetName() string
type DataSourceEndpointConsumer ¶
type DataSourceEndpointConsumer[T any] struct { // contains filtered or unexported fields }
func MakeDataSourceEndpointConsumer ¶
func MakeDataSourceEndpointConsumer[T any](endpoint InputEndpoint, inputStream TypedInputStream[T]) *DataSourceEndpointConsumer[T]
func (*DataSourceEndpointConsumer[T]) Consume ¶
func (ec *DataSourceEndpointConsumer[T]) Consume(value T)
func (*DataSourceEndpointConsumer[T]) Endpoint ¶
func (ec *DataSourceEndpointConsumer[T]) Endpoint() InputEndpoint
func (*DataSourceEndpointConsumer[T]) Stream ¶ added in v0.0.175
func (ec *DataSourceEndpointConsumer[T]) Stream() TypedInputStream[T]
type DelayFunctionContext ¶
type DelayFunctionContext[T any] struct { StreamFunction[T] // contains filtered or unexported fields }
type DelayStream ¶
type DelayStream[T any] struct { ConsumedStream[T] // contains filtered or unexported fields }
func MakeDelayStream ¶
func MakeDelayStream[T any](name string, stream TypedStream[T], f DelayFunction[T]) *DelayStream[T]
func (*DelayStream[T]) Consume ¶
func (s *DelayStream[T]) Consume(value T)
type Endpoint ¶
type Endpoint interface { GetName() string GetId() int GetDataConnector() DataConnector }
type EndpointReader ¶
type EndpointReader interface { }
type EndpointWriter ¶
type EndpointWriter interface { }
type FilterFunction ¶
type FilterFunctionContext ¶
type FilterFunctionContext[T any] struct { StreamFunction[T] // contains filtered or unexported fields }
type FilterStream ¶
type FilterStream[T any] struct { ConsumedStream[T] // contains filtered or unexported fields }
func MakeFilterStream ¶
func MakeFilterStream[T any](name string, stream TypedStream[T], f FilterFunction[T]) *FilterStream[T]
func (*FilterStream[T]) Consume ¶
func (s *FilterStream[T]) Consume(value T)
type FlatMapFunction ¶
type FlatMapFunctionContext ¶
type FlatMapFunctionContext[T, R any] struct { StreamFunction[R] // contains filtered or unexported fields }
type FlatMapIterableStream ¶
type FlatMapIterableStream[T, R any] struct { ConsumedStream[R] // contains filtered or unexported fields }
func MakeFlatMapIterableStream ¶
func MakeFlatMapIterableStream[T, R any](name string, stream TypedStream[T]) *FlatMapIterableStream[T, R]
func (*FlatMapIterableStream[T, R]) Consume ¶
func (s *FlatMapIterableStream[T, R]) Consume(value T)
type FlatMapStream ¶
type FlatMapStream[T, R any] struct { ConsumedStream[R] // contains filtered or unexported fields }
func MakeFlatMapStream ¶
func MakeFlatMapStream[T, R any](name string, stream TypedStream[T], f FlatMapFunction[T, R]) *FlatMapStream[T, R]
func (*FlatMapStream[T, R]) Consume ¶
func (s *FlatMapStream[T, R]) Consume(value T)
type ForEachFunction ¶
type ForEachFunctionContext ¶
type ForEachFunctionContext[T any] struct { StreamFunction[T] // contains filtered or unexported fields }
type ForEachStream ¶
type ForEachStream[T any] struct { ConsumedStream[T] // contains filtered or unexported fields }
func MakeForEachStream ¶
func MakeForEachStream[T any](name string, stream TypedStream[T], f ForEachFunction[T]) *ForEachStream[T]
func (*ForEachStream[T]) Consume ¶
func (s *ForEachStream[T]) Consume(value T)
type InStubKVStream ¶
type InStubKVStream[T any] struct { ConsumedStream[T] // contains filtered or unexported fields }
func MakeInStubKVStream ¶
func MakeInStubKVStream[K comparable, V any](name string, env ServiceExecutionEnvironment) *InStubKVStream[datastruct.KeyValue[K, V]]
func (*InStubKVStream[T]) Consume ¶ added in v0.0.134
func (s *InStubKVStream[T]) Consume(value T)
func (*InStubKVStream[T]) ConsumeBinary ¶
func (s *InStubKVStream[T]) ConsumeBinary(key []byte, value []byte)
type InStubStream ¶
type InStubStream[T any] struct { ConsumedStream[T] }
func MakeInStubStream ¶
func MakeInStubStream[T any](name string, env ServiceExecutionEnvironment) *InStubStream[T]
func (*InStubStream[T]) Consume ¶ added in v0.0.134
func (s *InStubStream[T]) Consume(value T)
func (*InStubStream[T]) ConsumeBinary ¶
func (s *InStubStream[T]) ConsumeBinary(data []byte)
type InputDataSource ¶
type InputDataSource struct {
// contains filtered or unexported fields
}
func MakeInputDataSource ¶
func MakeInputDataSource(dataConnector *config.DataConnectorConfig, environment ServiceExecutionEnvironment) *InputDataSource
func (*InputDataSource) AddEndpoint ¶
func (ds *InputDataSource) AddEndpoint(endpoint InputEndpoint)
func (*InputDataSource) GetDataConnector ¶
func (ds *InputDataSource) GetDataConnector() *config.DataConnectorConfig
func (*InputDataSource) GetEndpoint ¶
func (ds *InputDataSource) GetEndpoint(id int) InputEndpoint
func (*InputDataSource) GetEndpoints ¶
func (ds *InputDataSource) GetEndpoints() Collection[InputEndpoint]
func (*InputDataSource) GetEnvironment ¶ added in v0.0.134
func (ds *InputDataSource) GetEnvironment() ServiceExecutionEnvironment
func (*InputDataSource) GetId ¶
func (ds *InputDataSource) GetId() int
func (*InputDataSource) GetName ¶
func (ds *InputDataSource) GetName() string
type InputEndpoint ¶
type InputEndpoint interface { Endpoint GetConfig() *config.EndpointConfig GetEnvironment() ServiceExecutionEnvironment GetDataSource() DataSource AddEndpointConsumer(consumer InputEndpointConsumer) GetEndpointConsumers() Collection[InputEndpointConsumer] }
type InputEndpointConsumer ¶
type InputEndpointConsumer interface {
Endpoint() InputEndpoint
}
type InputKVSplitStream ¶
type InputKVSplitStream[T any] struct { *SplitStream[T] // contains filtered or unexported fields }
func MakeInputKVSplitStream ¶
func MakeInputKVSplitStream[K comparable, V any](name string, env ServiceExecutionEnvironment) *InputKVSplitStream[datastruct.KeyValue[K, V]]
func (*InputKVSplitStream[T]) ConsumeBinary ¶
func (s *InputKVSplitStream[T]) ConsumeBinary(key []byte, value []byte)
type InputSplitStream ¶
type InputSplitStream[T any] struct { *SplitStream[T] }
func MakeInputSplitStream ¶
func MakeInputSplitStream[T any](name string, env ServiceExecutionEnvironment) *InputSplitStream[T]
func (*InputSplitStream[T]) ConsumeBinary ¶
func (s *InputSplitStream[T]) ConsumeBinary(data []byte)
type InputStream ¶
type InputStream[T any] struct { ConsumedStream[T] }
func MakeInputStream ¶
func MakeInputStream[T any](name string, env ServiceExecutionEnvironment) *InputStream[T]
func (*InputStream[T]) Consume ¶ added in v0.0.134
func (s *InputStream[T]) Consume(value T)
func (*InputStream[T]) GetEndpointId ¶
func (s *InputStream[T]) GetEndpointId() int
type JoinFunction ¶
type JoinFunction[K comparable, T1, T2, R any] interface { Join(Stream, K, []T1, []T2, Collect[R]) bool }
type JoinFunctionContext ¶
type JoinFunctionContext[K comparable, T1, T2, R any] struct { StreamFunction[R] // contains filtered or unexported fields }
type JoinLink ¶
type JoinLink[K comparable, T1, T2, R any] struct { // contains filtered or unexported fields }
func (*JoinLink[K, T1, T2, R]) Consume ¶
func (s *JoinLink[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T2])
func (*JoinLink[K, T1, T2, R]) GetConfig ¶
func (s *JoinLink[K, T1, T2, R]) GetConfig() *config.StreamConfig
func (*JoinLink[K, T1, T2, R]) GetConsumers ¶ added in v0.0.134
func (*JoinLink[K, T1, T2, R]) GetEnvironment ¶ added in v0.0.134
func (s *JoinLink[K, T1, T2, R]) GetEnvironment() ServiceExecutionEnvironment
func (*JoinLink[K, T1, T2, R]) GetTransformationName ¶
func (*JoinLink[K, T1, T2, R]) GetTypeName ¶
type JoinStream ¶
type JoinStream[K comparable, T1, T2, R any] struct { ConsumedStream[R] // contains filtered or unexported fields }
func MakeJoinStream ¶
func MakeJoinStream[K comparable, T1, T2, R any](name string, stream TypedStream[datastruct.KeyValue[K, T1]], streamRight TypedStream[datastruct.KeyValue[K, T2]], f JoinFunction[K, T1, T2, R]) *JoinStream[K, T1, T2, R]
func (*JoinStream[K, T1, T2, R]) Consume ¶
func (s *JoinStream[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T1])
func (*JoinStream[K, T1, T2, R]) ConsumeRight ¶
func (s *JoinStream[K, T1, T2, R]) ConsumeRight(value datastruct.KeyValue[K, T2])
func (*JoinStream[K, T1, T2, R]) Out ¶
func (s *JoinStream[K, T1, T2, R]) Out(value R)
type KeyByFunction ¶
type KeyByFunction[T any, K comparable, V any] interface { KeyBy(Stream, T) datastruct.KeyValue[K, V] }
type KeyByFunctionContext ¶
type KeyByFunctionContext[T any, K comparable, V any] struct { StreamFunction[datastruct.KeyValue[K, V]] // contains filtered or unexported fields }
type KeyByStream ¶
type KeyByStream[T any, K comparable, V any] struct { ConsumedStream[datastruct.KeyValue[K, V]] // contains filtered or unexported fields }
func MakeKeyByStream ¶
func MakeKeyByStream[T any, K comparable, V any](name string, stream TypedStream[T], f KeyByFunction[T, K, V]) *KeyByStream[T, K, V]
func (*KeyByStream[T, K, V]) Consume ¶
func (s *KeyByStream[T, K, V]) Consume(value T)
type LinkStream ¶
type LinkStream[T any] struct { ConsumedStream[T] // contains filtered or unexported fields }
func MakeLinkStream ¶
func MakeLinkStream[T any](name string, env ServiceExecutionEnvironment) *LinkStream[T]
func (*LinkStream[T]) Consume ¶
func (s *LinkStream[T]) Consume(value T)
func (*LinkStream[T]) SetSource ¶
func (s *LinkStream[T]) SetSource(stream TypedConsumedStream[T])
type MapFunction ¶
type MapFunctionContext ¶
type MapFunctionContext[T, R any] struct { StreamFunction[R] // contains filtered or unexported fields }
type MapStream ¶
type MapStream[T, R any] struct { ConsumedStream[R] // contains filtered or unexported fields }
func MakeMapStream ¶
func MakeMapStream[T, R any](name string, stream TypedStream[T], f MapFunction[T, R]) *MapStream[T, R]
type MergeLink ¶
type MergeLink[T any] struct { // contains filtered or unexported fields }
func (*MergeLink[T]) GetConfig ¶
func (s *MergeLink[T]) GetConfig() *config.StreamConfig
func (*MergeLink[T]) GetConsumers ¶ added in v0.0.134
func (*MergeLink[T]) GetEnvironment ¶ added in v0.0.134
func (s *MergeLink[T]) GetEnvironment() ServiceExecutionEnvironment
func (*MergeLink[T]) GetTransformationName ¶
func (*MergeLink[T]) GetTypeName ¶
type MergeStream ¶
type MergeStream[T any] struct { ConsumedStream[T] // contains filtered or unexported fields }
func MakeMergeStream ¶
func MakeMergeStream[T any](name string, stream TypedStream[T], streams ...TypedStream[T]) *MergeStream[T]
func (*MergeStream[T]) Consume ¶ added in v0.0.134
func (s *MergeStream[T]) Consume(value T)
type MultiJoinFunction ¶
type MultiJoinFunction[K comparable, T, R any] interface { MultiJoin(Stream, K, [][]interface{}, Collect[R]) bool }
type MultiJoinFunctionContext ¶
type MultiJoinFunctionContext[K comparable, T, R any] struct { StreamFunction[R] // contains filtered or unexported fields }
type MultiJoinLinkStream ¶
type MultiJoinLinkStream[K comparable, T1, T2, R any] struct { // contains filtered or unexported fields }
func (*MultiJoinLinkStream[K, T1, T2, R]) Consume ¶
func (s *MultiJoinLinkStream[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T2])
func (*MultiJoinLinkStream[K, T1, T2, R]) GetConfig ¶
func (s *MultiJoinLinkStream[K, T1, T2, R]) GetConfig() *config.StreamConfig
func (*MultiJoinLinkStream[K, T1, T2, R]) GetConsumers ¶ added in v0.0.134
func (s *MultiJoinLinkStream[K, T1, T2, R]) GetConsumers() []Stream
func (*MultiJoinLinkStream[K, T1, T2, R]) GetEnvironment ¶ added in v0.0.134
func (s *MultiJoinLinkStream[K, T1, T2, R]) GetEnvironment() ServiceExecutionEnvironment
func (*MultiJoinLinkStream[K, T1, T2, R]) GetId ¶
func (s *MultiJoinLinkStream[K, T1, T2, R]) GetId() int
func (*MultiJoinLinkStream[K, T1, T2, R]) GetName ¶
func (s *MultiJoinLinkStream[K, T1, T2, R]) GetName() string
func (*MultiJoinLinkStream[K, T1, T2, R]) GetTransformationName ¶
func (s *MultiJoinLinkStream[K, T1, T2, R]) GetTransformationName() string
func (*MultiJoinLinkStream[K, T1, T2, R]) GetTypeName ¶
func (s *MultiJoinLinkStream[K, T1, T2, R]) GetTypeName() string
func (*MultiJoinLinkStream[K, T1, T2, R]) Validate ¶ added in v0.0.167
func (s *MultiJoinLinkStream[K, T1, T2, R]) Validate() error
type MultiJoinStream ¶
type MultiJoinStream[K comparable, T, R any] struct { ConsumedStream[R] // contains filtered or unexported fields }
func MakeMultiJoinStream ¶
func MakeMultiJoinStream[K comparable, T, R any]( name string, leftStream TypedStream[datastruct.KeyValue[K, T]], f MultiJoinFunction[K, T, R]) *MultiJoinStream[K, T, R]
func (*MultiJoinStream[K, T, R]) Consume ¶
func (s *MultiJoinStream[K, T, R]) Consume(value datastruct.KeyValue[K, T])
func (*MultiJoinStream[K, T, R]) ConsumeRight ¶
func (s *MultiJoinStream[K, T, R]) ConsumeRight(index int, value datastruct.KeyValue[K, interface{}])
func (*MultiJoinStream[K, T, R]) Out ¶
func (s *MultiJoinStream[K, T, R]) Out(value R)
type NetworkData ¶
type OutStubBinaryKVStream ¶
type OutStubBinaryKVStream[T any] struct { ServiceStream[T] // contains filtered or unexported fields }
func MakeOutStubBinaryKVStream ¶
func MakeOutStubBinaryKVStream[T any](name string, stream TypedStream[T], consumer BinaryKVConsumerFunc) *OutStubBinaryKVStream[T]
func (*OutStubBinaryKVStream[T]) Consume ¶
func (s *OutStubBinaryKVStream[T]) Consume(value T)
func (*OutStubBinaryKVStream[T]) GetConsumers ¶ added in v0.0.170
func (s *OutStubBinaryKVStream[T]) GetConsumers() []Stream
type OutStubBinaryStream ¶
type OutStubBinaryStream[T any] struct { ServiceStream[T] // contains filtered or unexported fields }
func MakeOutStubBinaryStream ¶
func MakeOutStubBinaryStream[T any](name string, stream TypedStream[T], consumer BinaryConsumerFunc) *OutStubBinaryStream[T]
func (*OutStubBinaryStream[T]) Consume ¶
func (s *OutStubBinaryStream[T]) Consume(value T)
func (*OutStubBinaryStream[T]) GetConsumers ¶ added in v0.0.170
func (s *OutStubBinaryStream[T]) GetConsumers() []Stream
type OutStubStream ¶
type OutStubStream[T any] struct { ServiceStream[T] // contains filtered or unexported fields }
func MakeOutStubStream ¶
func MakeOutStubStream[T any](name string, stream TypedStream[T], consumer ConsumerFunc[T]) *OutStubStream[T]
func (*OutStubStream[T]) Consume ¶
func (s *OutStubStream[T]) Consume(value T)
func (*OutStubStream[T]) GetConsumers ¶ added in v0.0.170
func (s *OutStubStream[T]) GetConsumers() []Stream
type OutputDataSink ¶
type OutputDataSink struct {
// contains filtered or unexported fields
}
func MakeOutputDataSink ¶
func MakeOutputDataSink(dataConnector *config.DataConnectorConfig, environment ServiceExecutionEnvironment) *OutputDataSink
func (*OutputDataSink) AddEndpoint ¶
func (ds *OutputDataSink) AddEndpoint(endpoint SinkEndpoint)
func (*OutputDataSink) GetDataConnector ¶
func (ds *OutputDataSink) GetDataConnector() *config.DataConnectorConfig
func (*OutputDataSink) GetEndpoint ¶
func (ds *OutputDataSink) GetEndpoint(id int) SinkEndpoint
func (*OutputDataSink) GetEndpoints ¶
func (ds *OutputDataSink) GetEndpoints() Collection[SinkEndpoint]
func (*OutputDataSink) GetEnvironment ¶ added in v0.0.134
func (ds *OutputDataSink) GetEnvironment() ServiceExecutionEnvironment
func (*OutputDataSink) GetId ¶
func (ds *OutputDataSink) GetId() int
func (*OutputDataSink) GetName ¶
func (ds *OutputDataSink) GetName() string
type OutputEndpointConsumer ¶
type OutputEndpointConsumer interface {
Endpoint() SinkEndpoint
}
type ParallelsFunction ¶
type ParallelsFunctionContext ¶
type ParallelsFunctionContext[T, R any] struct { StreamFunction[R] // contains filtered or unexported fields }
type ParallelsStream ¶
type ParallelsStream[T, R any] struct { ConsumedStream[R] // contains filtered or unexported fields }
func MakeParallelsStream ¶
func MakeParallelsStream[T, R any](name string, stream TypedStream[T], f ParallelsFunction[T, R]) *ParallelsStream[T, R]
func (*ParallelsStream[T, R]) Consume ¶
func (s *ParallelsStream[T, R]) Consume(value T)
type ServiceApp ¶
type ServiceApp struct {
// contains filtered or unexported fields
}
func (*ServiceApp) AddDataSink ¶
func (app *ServiceApp) AddDataSink(dataSink DataSink)
func (*ServiceApp) AddDataSource ¶
func (app *ServiceApp) AddDataSource(dataSource DataSource)
func (*ServiceApp) AppConfig ¶ added in v0.0.161
func (app *ServiceApp) AppConfig() *config.ServiceAppConfig
func (*ServiceApp) Delay ¶
func (app *ServiceApp) Delay(duration time.Duration, f func())
func (*ServiceApp) GetConsumeTimeout ¶
func (app *ServiceApp) GetConsumeTimeout(from int, to int) time.Duration
func (*ServiceApp) GetDataSink ¶
func (app *ServiceApp) GetDataSink(id int) DataSink
func (*ServiceApp) GetDataSource ¶
func (app *ServiceApp) GetDataSource(id int) DataSource
func (*ServiceApp) GetEndpointReader ¶
func (app *ServiceApp) GetEndpointReader(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointReader
func (*ServiceApp) GetEndpointWriter ¶
func (app *ServiceApp) GetEndpointWriter(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointWriter
func (*ServiceApp) GetHttpRoute ¶ added in v0.0.184
func (app *ServiceApp) GetHttpRoute() httproute.HttpRoute
func (*ServiceApp) GetPriorityTaskPool ¶ added in v0.0.180
func (app *ServiceApp) GetPriorityTaskPool(name string) pool.PriorityTaskPool
func (*ServiceApp) GetRuntime ¶ added in v0.0.134
func (app *ServiceApp) GetRuntime() ServiceExecutionRuntime
func (*ServiceApp) GetTaskPool ¶ added in v0.0.180
func (app *ServiceApp) GetTaskPool(name string) pool.TaskPool
func (*ServiceApp) Log ¶ added in v0.0.161
func (app *ServiceApp) Log() log.Logger
func (*ServiceApp) Metrics ¶ added in v0.0.161
func (app *ServiceApp) Metrics() metrics.Metrics
func (*ServiceApp) Release ¶ added in v0.0.161
func (app *ServiceApp) Release()
func (*ServiceApp) ReloadConfig ¶
func (app *ServiceApp) ReloadConfig(config config.Config)
func (*ServiceApp) ServiceConfig ¶ added in v0.0.161
func (app *ServiceApp) ServiceConfig() *config.ServiceConfig
func (*ServiceApp) ServiceDependency ¶ added in v0.0.176
func (app *ServiceApp) ServiceDependency() environment.ServiceDependency
func (*ServiceApp) ServiceInit ¶ added in v0.0.184
func (app *ServiceApp) ServiceInit() error
func (*ServiceApp) Stop ¶
func (app *ServiceApp) Stop(ctx context.Context)
type ServiceExecutionEnvironment ¶ added in v0.0.134
type ServiceExecutionEnvironment interface { environment.ServiceEnvironment GetSerde(valueType reflect.Type) (serde.Serializer, error) ServiceInit() error StreamsInit(ctx context.Context) SetConfig(config config.Config) Start(context.Context) error Stop(context.Context) AddDataSource(dataSource DataSource) GetDataSource(id int) DataSource GetTaskPool(name string) pool.TaskPool GetPriorityTaskPool(name string) pool.PriorityTaskPool AddDataSink(dataSink DataSink) GetDataSink(id int) DataSink GetConsumeTimeout(from int, to int) time.Duration GetEndpointReader(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointReader GetEndpointWriter(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointWriter Delay(duration time.Duration, f func()) GetRuntime() ServiceExecutionRuntime Release() GetHttpRoute() httproute.HttpRoute }
type ServiceExecutionRuntime ¶ added in v0.0.134
type ServiceExecutionRuntime interface {
// contains filtered or unexported methods
}
type ServiceLoader ¶ added in v0.0.145
type ServiceLoader interface {
Stop()
}
type ServiceStream ¶
type ServiceStream[T any] struct { // contains filtered or unexported fields }
func (*ServiceStream[T]) GetConfig ¶ added in v0.0.163
func (s *ServiceStream[T]) GetConfig() *config.StreamConfig
func (*ServiceStream[T]) GetEnvironment ¶ added in v0.0.163
func (s *ServiceStream[T]) GetEnvironment() ServiceExecutionEnvironment
func (*ServiceStream[T]) GetId ¶ added in v0.0.163
func (s *ServiceStream[T]) GetId() int
func (*ServiceStream[T]) GetName ¶ added in v0.0.163
func (s *ServiceStream[T]) GetName() string
func (*ServiceStream[T]) GetTransformationName ¶ added in v0.0.163
func (s *ServiceStream[T]) GetTransformationName() string
func (*ServiceStream[T]) GetTypeName ¶ added in v0.0.163
func (s *ServiceStream[T]) GetTypeName() string
func (*ServiceStream[T]) Validate ¶ added in v0.0.167
func (s *ServiceStream[T]) Validate() error
type SinkCallback ¶ added in v0.0.186
type SinkConsumer ¶ added in v0.0.186
type SinkConsumer[T any] interface { Consumer[T] SetSinkCallback(SinkCallback[T]) }
type SinkEndpoint ¶
type SinkEndpoint interface { Endpoint GetConfig() *config.EndpointConfig GetEnvironment() ServiceExecutionEnvironment GetDataSink() DataSink AddEndpointConsumer(consumer OutputEndpointConsumer) GetEndpointConsumers() Collection[OutputEndpointConsumer] }
type SinkFunction ¶ added in v0.0.190
type SinkFunctionContext ¶ added in v0.0.190
type SinkFunctionContext[T, R any] struct { StreamFunction[R] // contains filtered or unexported fields }
type SinkStream ¶
type SinkStream[T, R any] struct { ConsumedStream[R] // contains filtered or unexported fields }
func MakeSinkStream ¶
func MakeSinkStream[T, R any](name string, stream TypedStream[T], f SinkFunction[T, R]) *SinkStream[T, R]
func (*SinkStream[T, R]) Consume ¶
func (s *SinkStream[T, R]) Consume(value T)
func (*SinkStream[T, R]) Done ¶ added in v0.0.186
func (s *SinkStream[T, R]) Done(value T, err error)
func (*SinkStream[T, R]) GetEndpointId ¶
func (s *SinkStream[T, R]) GetEndpointId() int
func (*SinkStream[T, R]) SetSinkConsumer ¶ added in v0.0.186
func (s *SinkStream[T, R]) SetSinkConsumer(sinkConsumer SinkConsumer[T])
type SplitLink ¶
type SplitLink[T any] struct { // contains filtered or unexported fields }
func (*SplitLink[T]) GetConfig ¶
func (s *SplitLink[T]) GetConfig() *config.StreamConfig
func (*SplitLink[T]) GetConsumer ¶
func (s *SplitLink[T]) GetConsumer() TypedStreamConsumer[T]
func (*SplitLink[T]) GetConsumers ¶ added in v0.0.134
func (*SplitLink[T]) GetEnvironment ¶ added in v0.0.134
func (s *SplitLink[T]) GetEnvironment() ServiceExecutionEnvironment
func (*SplitLink[T]) GetSerde ¶
func (s *SplitLink[T]) GetSerde() serde.StreamSerde[T]
func (*SplitLink[T]) GetTransformationName ¶
func (*SplitLink[T]) GetTypeName ¶
func (*SplitLink[T]) SetConsumer ¶
func (s *SplitLink[T]) SetConsumer(consumer TypedStreamConsumer[T])
type SplitStream ¶
type SplitStream[T any] struct { ConsumedStream[T] // contains filtered or unexported fields }
func MakeSplitStream ¶
func MakeSplitStream[T any](name string, stream TypedStream[T]) *SplitStream[T]
func (*SplitStream[T]) AddStream ¶
func (s *SplitStream[T]) AddStream() TypedConsumedStream[T]
func (*SplitStream[T]) Consume ¶
func (s *SplitStream[T]) Consume(value T)
func (*SplitStream[T]) GetConsumers ¶ added in v0.0.134
func (s *SplitStream[T]) GetConsumers() []Stream
func (*SplitStream[T]) Validate ¶ added in v0.0.167
func (s *SplitStream[T]) Validate() error
type Stream ¶
type Stream interface { GetName() string GetTransformationName() string GetTypeName() string GetId() int GetConfig() *config.StreamConfig GetEnvironment() ServiceExecutionEnvironment GetConsumers() []Stream Validate() error }
type StreamFunction ¶
type StreamFunction[T any] struct { // contains filtered or unexported fields }
func (*StreamFunction[T]) AfterCall ¶
func (f *StreamFunction[T]) AfterCall()
func (*StreamFunction[T]) BeforeCall ¶
func (f *StreamFunction[T]) BeforeCall()
type TypedBinaryConsumedStream ¶
type TypedBinaryConsumedStream[T any] interface { TypedConsumedStream[T] BinaryConsumer }
type TypedBinaryKVConsumedStream ¶
type TypedBinaryKVConsumedStream[T any] interface { TypedConsumedStream[T] BinaryKVConsumer }
type TypedBinaryKVSplitStream ¶
type TypedBinaryKVSplitStream[T any] interface { TypedBinaryKVConsumedStream[T] AddStream() TypedConsumedStream[T] }
type TypedBinarySplitStream ¶
type TypedBinarySplitStream[T any] interface { TypedBinaryConsumedStream[T] AddStream() TypedConsumedStream[T] }
type TypedConsumedStream ¶
type TypedConsumedStream[T any] interface { TypedStream[T] Consumer[T] }
type TypedEndpointReader ¶
type TypedEndpointReader[T any] interface { EndpointReader Read(io.Reader) (T, error) }
type TypedEndpointWriter ¶
type TypedEndpointWriter[T any] interface { EndpointWriter Write(T, io.Writer) error }
type TypedInputStream ¶
type TypedInputStream[T any] interface { TypedConsumedStream[T] GetEndpointId() int }
type TypedJoinConsumedStream ¶
type TypedJoinConsumedStream[K comparable, T1, T2, R any] interface { TypedTransformConsumedStream[datastruct.KeyValue[K, T1], R] ConsumeRight(datastruct.KeyValue[K, T2]) }
type TypedLinkStream ¶
type TypedLinkStream[T any] interface { TypedConsumedStream[T] SetSource(TypedConsumedStream[T]) }
type TypedMultiJoinConsumedStream ¶
type TypedMultiJoinConsumedStream[K comparable, T, R any] interface { TypedTransformConsumedStream[datastruct.KeyValue[K, T], R] ConsumeRight(int, datastruct.KeyValue[K, interface{}]) }
type TypedSinkStream ¶
type TypedSinkStream[T, R any] interface { TypedTransformConsumedStream[T, R] GetEndpointId() int SetSinkConsumer(SinkConsumer[T]) }
type TypedSplitStream ¶
type TypedSplitStream[T any] interface { TypedConsumedStream[T] AddStream() TypedConsumedStream[T] }
type TypedStream ¶
type TypedStream[T any] interface { Stream GetConsumer() TypedStreamConsumer[T] GetSerde() serde.StreamSerde[T] SetConsumer(TypedStreamConsumer[T]) }
type TypedStreamConsumer ¶
type TypedTransformConsumedStream ¶
type TypedTransformConsumedStream[T, R any] interface { TypedStream[R] Consumer[T] }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.