runtime

package
v0.0.191 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2024 License: BSD-3-Clause Imports: 37 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsKeyValueType

func IsKeyValueType[T any]() bool
func MakeMultiJoinLink[K comparable, T1, T2, R any](
	multiJoinStream TypedMultiJoinConsumedStream[K, T1, R],
	rightStream TypedStream[datastruct.KeyValue[K, T2]])

func MakeSerde

func MakeSerde[T any](runtime ServiceExecutionRuntime) serde.StreamSerde[T]

func MakeService

func MakeService[Environment ServiceExecutionEnvironment, Cfg config.Config](name string,
	dep environment.ServiceDependency,
	configSettings *config.ConfigSettings) (Environment, error)

Types

type AppInputStream added in v0.0.180

type AppInputStream[T any] struct {
	ConsumedStream[T]
}

func MakeAppInputStream added in v0.0.180

func MakeAppInputStream[T any](name string, env ServiceExecutionEnvironment) *AppInputStream[T]

func (*AppInputStream[T]) Consume added in v0.0.180

func (s *AppInputStream[T]) Consume(value T)

type AppSinkStream

type AppSinkStream[T any] struct {
	ServiceStream[T]
	// contains filtered or unexported fields
}

func MakeAppSinkStream

func MakeAppSinkStream[T any](name string, stream TypedStream[T], consumer ConsumerFunc[T]) *AppSinkStream[T]

func (*AppSinkStream[T]) Consume

func (s *AppSinkStream[T]) Consume(value T)

func (*AppSinkStream[T]) GetConsumers added in v0.0.134

func (s *AppSinkStream[T]) GetConsumers() []Stream

type BinaryConsumer

type BinaryConsumer interface {
	ConsumeBinary([]byte)
}

type BinaryConsumerFunc

type BinaryConsumerFunc func([]byte) error

func (BinaryConsumerFunc) Consume

func (f BinaryConsumerFunc) Consume(data []byte) error

type BinaryKVConsumer

type BinaryKVConsumer interface {
	ConsumeBinary([]byte, []byte)
}

type BinaryKVConsumerFunc

type BinaryKVConsumerFunc func([]byte, []byte) error

func (BinaryKVConsumerFunc) Consume

func (f BinaryKVConsumerFunc) Consume(key []byte, value []byte) error

type Caller

type Caller[T any] interface {
	Consumer[T]
}

type Collect

type Collect[T any] interface {
	Out(value T)
}

type Collection added in v0.0.173

type Collection[T any] struct {
	// contains filtered or unexported fields
}

func NewCollection added in v0.0.173

func NewCollection[T any](data []T) Collection[T]

func (Collection[T]) At added in v0.0.173

func (s Collection[T]) At(i int) T

func (Collection[T]) Len added in v0.0.173

func (s Collection[T]) Len() int

type ConsumeStatistics

type ConsumeStatistics interface {
	Count() int64
}

type ConsumedStream

type ConsumedStream[T any] struct {
	ServiceStream[T]
	// contains filtered or unexported fields
}

func (*ConsumedStream[T]) GetConsumer

func (s *ConsumedStream[T]) GetConsumer() TypedStreamConsumer[T]

func (*ConsumedStream[T]) GetConsumers added in v0.0.134

func (s *ConsumedStream[T]) GetConsumers() []Stream

func (*ConsumedStream[T]) GetSerde

func (s *ConsumedStream[T]) GetSerde() serde.StreamSerde[T]

func (*ConsumedStream[T]) SetConsumer

func (s *ConsumedStream[T]) SetConsumer(consumer TypedStreamConsumer[T])

type Consumer

type Consumer[T any] interface {
	Consume(T)
}

type ConsumerFunc

type ConsumerFunc[T any] func(T) error

func (ConsumerFunc[T]) Consume

func (f ConsumerFunc[T]) Consume(value T) error

type DataConnector

type DataConnector interface {
	GetName() string
	GetId() int
}

type DataSink

type DataSink interface {
	DataConnector
	Start(context.Context) error
	Stop(context.Context)
	GetDataConnector() *config.DataConnectorConfig
	GetEnvironment() ServiceExecutionEnvironment
	AddEndpoint(SinkEndpoint)
	GetEndpoint(id int) SinkEndpoint
	GetEndpoints() Collection[SinkEndpoint]
}

type DataSinkEndpoint

type DataSinkEndpoint struct {
	// contains filtered or unexported fields
}

func MakeDataSinkEndpoint

func MakeDataSinkEndpoint(dataSink DataSink, id int, environment ServiceExecutionEnvironment) *DataSinkEndpoint

func (*DataSinkEndpoint) AddEndpointConsumer

func (ep *DataSinkEndpoint) AddEndpointConsumer(endpointConsumer OutputEndpointConsumer)

func (*DataSinkEndpoint) GetConfig

func (ep *DataSinkEndpoint) GetConfig() *config.EndpointConfig

func (*DataSinkEndpoint) GetDataConnector

func (ep *DataSinkEndpoint) GetDataConnector() DataConnector

func (*DataSinkEndpoint) GetDataSink

func (ep *DataSinkEndpoint) GetDataSink() DataSink

func (*DataSinkEndpoint) GetEndpointConsumers

func (ep *DataSinkEndpoint) GetEndpointConsumers() Collection[OutputEndpointConsumer]

func (*DataSinkEndpoint) GetEnvironment added in v0.0.134

func (ep *DataSinkEndpoint) GetEnvironment() ServiceExecutionEnvironment

func (*DataSinkEndpoint) GetId

func (ep *DataSinkEndpoint) GetId() int

func (*DataSinkEndpoint) GetName

func (ep *DataSinkEndpoint) GetName() string

type DataSinkEndpointConsumer

type DataSinkEndpointConsumer[T, R any] struct {
	// contains filtered or unexported fields
}

func MakeDataSinkEndpointConsumer

func MakeDataSinkEndpointConsumer[T, R any](endpoint SinkEndpoint, stream TypedSinkStream[T, R]) *DataSinkEndpointConsumer[T, R]

func (*DataSinkEndpointConsumer[T, R]) Endpoint

func (ec *DataSinkEndpointConsumer[T, R]) Endpoint() SinkEndpoint

func (*DataSinkEndpointConsumer[T, R]) Stream added in v0.0.175

func (ec *DataSinkEndpointConsumer[T, R]) Stream() TypedSinkStream[T, R]

type DataSource

type DataSource interface {
	DataConnector
	Start(context.Context) error
	Stop(context.Context)
	GetDataConnector() *config.DataConnectorConfig
	GetEnvironment() ServiceExecutionEnvironment
	AddEndpoint(InputEndpoint)
	GetEndpoint(id int) InputEndpoint
	GetEndpoints() Collection[InputEndpoint]
}

type DataSourceEndpoint

type DataSourceEndpoint struct {
	// contains filtered or unexported fields
}

func MakeDataSourceEndpoint

func MakeDataSourceEndpoint(dataSource DataSource, id int, environment ServiceExecutionEnvironment) *DataSourceEndpoint

func (*DataSourceEndpoint) AddEndpointConsumer

func (ep *DataSourceEndpoint) AddEndpointConsumer(endpointConsumer InputEndpointConsumer)

func (*DataSourceEndpoint) GetConfig

func (ep *DataSourceEndpoint) GetConfig() *config.EndpointConfig

func (*DataSourceEndpoint) GetDataConnector

func (ep *DataSourceEndpoint) GetDataConnector() DataConnector

func (*DataSourceEndpoint) GetDataSource

func (ep *DataSourceEndpoint) GetDataSource() DataSource

func (*DataSourceEndpoint) GetEndpointConsumers

func (ep *DataSourceEndpoint) GetEndpointConsumers() Collection[InputEndpointConsumer]

func (*DataSourceEndpoint) GetEnvironment added in v0.0.134

func (ep *DataSourceEndpoint) GetEnvironment() ServiceExecutionEnvironment

func (*DataSourceEndpoint) GetId

func (ep *DataSourceEndpoint) GetId() int

func (*DataSourceEndpoint) GetName

func (ep *DataSourceEndpoint) GetName() string

type DataSourceEndpointConsumer

type DataSourceEndpointConsumer[T any] struct {
	// contains filtered or unexported fields
}

func MakeDataSourceEndpointConsumer

func MakeDataSourceEndpointConsumer[T any](endpoint InputEndpoint, inputStream TypedInputStream[T]) *DataSourceEndpointConsumer[T]

func (*DataSourceEndpointConsumer[T]) Consume

func (ec *DataSourceEndpointConsumer[T]) Consume(value T)

func (*DataSourceEndpointConsumer[T]) Endpoint

func (ec *DataSourceEndpointConsumer[T]) Endpoint() InputEndpoint

func (*DataSourceEndpointConsumer[T]) Stream added in v0.0.175

func (ec *DataSourceEndpointConsumer[T]) Stream() TypedInputStream[T]

type DelayFunc

type DelayFunc[T any] func(T) error

type DelayFunction

type DelayFunction[T any] interface {
	Duration(Stream, T) time.Duration
}

type DelayFunctionContext

type DelayFunctionContext[T any] struct {
	StreamFunction[T]
	// contains filtered or unexported fields
}

type DelayStream

type DelayStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func MakeDelayStream

func MakeDelayStream[T any](name string, stream TypedStream[T], f DelayFunction[T]) *DelayStream[T]

func (*DelayStream[T]) Consume

func (s *DelayStream[T]) Consume(value T)

type Edge

type Edge struct {
	From   int    `json:"from"`
	To     int    `json:"to"`
	Arrows string `json:"arrows"`
	Length int    `json:"length"`
	Label  string `json:"label"`
	Color  struct {
		Opacity float32 `json:"opacity"`
		Color   string  `json:"color"`
	} `json:"color"`
}

type Endpoint

type Endpoint interface {
	GetName() string
	GetId() int
	GetDataConnector() DataConnector
}

type EndpointReader

type EndpointReader interface {
}

type EndpointWriter

type EndpointWriter interface {
}

type FilterFunction

type FilterFunction[T any] interface {
	Filter(Stream, T) bool
}

type FilterFunctionContext

type FilterFunctionContext[T any] struct {
	StreamFunction[T]
	// contains filtered or unexported fields
}

type FilterStream

type FilterStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func MakeFilterStream

func MakeFilterStream[T any](name string, stream TypedStream[T], f FilterFunction[T]) *FilterStream[T]

func (*FilterStream[T]) Consume

func (s *FilterStream[T]) Consume(value T)

type FlatMapFunction

type FlatMapFunction[T, R any] interface {
	FlatMap(Stream, T, Collect[R])
}

type FlatMapFunctionContext

type FlatMapFunctionContext[T, R any] struct {
	StreamFunction[R]
	// contains filtered or unexported fields
}

type FlatMapIterableStream

type FlatMapIterableStream[T, R any] struct {
	ConsumedStream[R]
	// contains filtered or unexported fields
}

func MakeFlatMapIterableStream

func MakeFlatMapIterableStream[T, R any](name string, stream TypedStream[T]) *FlatMapIterableStream[T, R]

func (*FlatMapIterableStream[T, R]) Consume

func (s *FlatMapIterableStream[T, R]) Consume(value T)

type FlatMapStream

type FlatMapStream[T, R any] struct {
	ConsumedStream[R]
	// contains filtered or unexported fields
}

func MakeFlatMapStream

func MakeFlatMapStream[T, R any](name string, stream TypedStream[T], f FlatMapFunction[T, R]) *FlatMapStream[T, R]

func (*FlatMapStream[T, R]) Consume

func (s *FlatMapStream[T, R]) Consume(value T)

type ForEachFunction

type ForEachFunction[T any] interface {
	ForEach(Stream, T)
}

type ForEachFunctionContext

type ForEachFunctionContext[T any] struct {
	StreamFunction[T]
	// contains filtered or unexported fields
}

type ForEachStream

type ForEachStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func MakeForEachStream

func MakeForEachStream[T any](name string, stream TypedStream[T], f ForEachFunction[T]) *ForEachStream[T]

func (*ForEachStream[T]) Consume

func (s *ForEachStream[T]) Consume(value T)

type InStubKVStream

type InStubKVStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func (*InStubKVStream[T]) Consume added in v0.0.134

func (s *InStubKVStream[T]) Consume(value T)

func (*InStubKVStream[T]) ConsumeBinary

func (s *InStubKVStream[T]) ConsumeBinary(key []byte, value []byte)

type InStubStream

type InStubStream[T any] struct {
	ConsumedStream[T]
}

func MakeInStubStream

func MakeInStubStream[T any](name string, env ServiceExecutionEnvironment) *InStubStream[T]

func (*InStubStream[T]) Consume added in v0.0.134

func (s *InStubStream[T]) Consume(value T)

func (*InStubStream[T]) ConsumeBinary

func (s *InStubStream[T]) ConsumeBinary(data []byte)

type InputDataSource

type InputDataSource struct {
	// contains filtered or unexported fields
}

func MakeInputDataSource

func MakeInputDataSource(dataConnector *config.DataConnectorConfig, environment ServiceExecutionEnvironment) *InputDataSource

func (*InputDataSource) AddEndpoint

func (ds *InputDataSource) AddEndpoint(endpoint InputEndpoint)

func (*InputDataSource) GetDataConnector

func (ds *InputDataSource) GetDataConnector() *config.DataConnectorConfig

func (*InputDataSource) GetEndpoint

func (ds *InputDataSource) GetEndpoint(id int) InputEndpoint

func (*InputDataSource) GetEndpoints

func (ds *InputDataSource) GetEndpoints() Collection[InputEndpoint]

func (*InputDataSource) GetEnvironment added in v0.0.134

func (ds *InputDataSource) GetEnvironment() ServiceExecutionEnvironment

func (*InputDataSource) GetId

func (ds *InputDataSource) GetId() int

func (*InputDataSource) GetName

func (ds *InputDataSource) GetName() string

type InputEndpoint

type InputEndpoint interface {
	Endpoint
	GetConfig() *config.EndpointConfig
	GetEnvironment() ServiceExecutionEnvironment
	GetDataSource() DataSource
	AddEndpointConsumer(consumer InputEndpointConsumer)
	GetEndpointConsumers() Collection[InputEndpointConsumer]
}

type InputEndpointConsumer

type InputEndpointConsumer interface {
	Endpoint() InputEndpoint
}

type InputKVSplitStream

type InputKVSplitStream[T any] struct {
	*SplitStream[T]
	// contains filtered or unexported fields
}

func (*InputKVSplitStream[T]) ConsumeBinary

func (s *InputKVSplitStream[T]) ConsumeBinary(key []byte, value []byte)

type InputSplitStream

type InputSplitStream[T any] struct {
	*SplitStream[T]
}

func MakeInputSplitStream

func MakeInputSplitStream[T any](name string, env ServiceExecutionEnvironment) *InputSplitStream[T]

func (*InputSplitStream[T]) ConsumeBinary

func (s *InputSplitStream[T]) ConsumeBinary(data []byte)

type InputStream

type InputStream[T any] struct {
	ConsumedStream[T]
}

func MakeInputStream

func MakeInputStream[T any](name string, env ServiceExecutionEnvironment) *InputStream[T]

func (*InputStream[T]) Consume added in v0.0.134

func (s *InputStream[T]) Consume(value T)

func (*InputStream[T]) GetEndpointId

func (s *InputStream[T]) GetEndpointId() int

type JoinFunction

type JoinFunction[K comparable, T1, T2, R any] interface {
	Join(Stream, K, []T1, []T2, Collect[R]) bool
}

type JoinFunctionContext

type JoinFunctionContext[K comparable, T1, T2, R any] struct {
	StreamFunction[R]
	// contains filtered or unexported fields
}
type JoinLink[K comparable, T1, T2, R any] struct {
	// contains filtered or unexported fields
}

func (*JoinLink[K, T1, T2, R]) Consume

func (s *JoinLink[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T2])

func (*JoinLink[K, T1, T2, R]) GetConfig

func (s *JoinLink[K, T1, T2, R]) GetConfig() *config.StreamConfig

func (*JoinLink[K, T1, T2, R]) GetConsumers added in v0.0.134

func (s *JoinLink[K, T1, T2, R]) GetConsumers() []Stream

func (*JoinLink[K, T1, T2, R]) GetEnvironment added in v0.0.134

func (s *JoinLink[K, T1, T2, R]) GetEnvironment() ServiceExecutionEnvironment

func (*JoinLink[K, T1, T2, R]) GetId

func (s *JoinLink[K, T1, T2, R]) GetId() int

func (*JoinLink[K, T1, T2, R]) GetName

func (s *JoinLink[K, T1, T2, R]) GetName() string

func (*JoinLink[K, T1, T2, R]) GetTransformationName

func (s *JoinLink[K, T1, T2, R]) GetTransformationName() string

func (*JoinLink[K, T1, T2, R]) GetTypeName

func (s *JoinLink[K, T1, T2, R]) GetTypeName() string

func (*JoinLink[K, T1, T2, R]) Validate added in v0.0.167

func (s *JoinLink[K, T1, T2, R]) Validate() error

type JoinStream

type JoinStream[K comparable, T1, T2, R any] struct {
	ConsumedStream[R]
	// contains filtered or unexported fields
}

func MakeJoinStream

func MakeJoinStream[K comparable, T1, T2, R any](name string, stream TypedStream[datastruct.KeyValue[K, T1]],
	streamRight TypedStream[datastruct.KeyValue[K, T2]],
	f JoinFunction[K, T1, T2, R]) *JoinStream[K, T1, T2, R]

func (*JoinStream[K, T1, T2, R]) Consume

func (s *JoinStream[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T1])

func (*JoinStream[K, T1, T2, R]) ConsumeRight

func (s *JoinStream[K, T1, T2, R]) ConsumeRight(value datastruct.KeyValue[K, T2])

func (*JoinStream[K, T1, T2, R]) Out

func (s *JoinStream[K, T1, T2, R]) Out(value R)

type KeyByFunction

type KeyByFunction[T any, K comparable, V any] interface {
	KeyBy(Stream, T) datastruct.KeyValue[K, V]
}

type KeyByFunctionContext

type KeyByFunctionContext[T any, K comparable, V any] struct {
	StreamFunction[datastruct.KeyValue[K, V]]
	// contains filtered or unexported fields
}

type KeyByStream

type KeyByStream[T any, K comparable, V any] struct {
	ConsumedStream[datastruct.KeyValue[K, V]]
	// contains filtered or unexported fields
}

func MakeKeyByStream

func MakeKeyByStream[T any, K comparable, V any](name string, stream TypedStream[T], f KeyByFunction[T, K, V]) *KeyByStream[T, K, V]

func (*KeyByStream[T, K, V]) Consume

func (s *KeyByStream[T, K, V]) Consume(value T)

type LinkStream

type LinkStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func MakeLinkStream

func MakeLinkStream[T any](name string, env ServiceExecutionEnvironment) *LinkStream[T]

func (*LinkStream[T]) Consume

func (s *LinkStream[T]) Consume(value T)

func (*LinkStream[T]) SetSource

func (s *LinkStream[T]) SetSource(stream TypedConsumedStream[T])

type MapFunction

type MapFunction[T, R any] interface {
	Map(Stream, T) R
}

type MapFunctionContext

type MapFunctionContext[T, R any] struct {
	StreamFunction[R]
	// contains filtered or unexported fields
}

type MapStream

type MapStream[T, R any] struct {
	ConsumedStream[R]
	// contains filtered or unexported fields
}

func MakeMapStream

func MakeMapStream[T, R any](name string, stream TypedStream[T], f MapFunction[T, R]) *MapStream[T, R]

func (*MapStream[T, R]) Consume

func (s *MapStream[T, R]) Consume(value T)
type MergeLink[T any] struct {
	// contains filtered or unexported fields
}

func (*MergeLink[T]) Consume

func (s *MergeLink[T]) Consume(value T)

func (*MergeLink[T]) GetConfig

func (s *MergeLink[T]) GetConfig() *config.StreamConfig

func (*MergeLink[T]) GetConsumers added in v0.0.134

func (s *MergeLink[T]) GetConsumers() []Stream

func (*MergeLink[T]) GetEnvironment added in v0.0.134

func (s *MergeLink[T]) GetEnvironment() ServiceExecutionEnvironment

func (*MergeLink[T]) GetId

func (s *MergeLink[T]) GetId() int

func (*MergeLink[T]) GetName

func (s *MergeLink[T]) GetName() string

func (*MergeLink[T]) GetTransformationName

func (s *MergeLink[T]) GetTransformationName() string

func (*MergeLink[T]) GetTypeName

func (s *MergeLink[T]) GetTypeName() string

func (*MergeLink[T]) Validate added in v0.0.167

func (s *MergeLink[T]) Validate() error

type MergeStream

type MergeStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func MakeMergeStream

func MakeMergeStream[T any](name string, stream TypedStream[T], streams ...TypedStream[T]) *MergeStream[T]

func (*MergeStream[T]) Consume added in v0.0.134

func (s *MergeStream[T]) Consume(value T)

type MultiJoinFunction

type MultiJoinFunction[K comparable, T, R any] interface {
	MultiJoin(Stream, K, [][]interface{}, Collect[R]) bool
}

type MultiJoinFunctionContext

type MultiJoinFunctionContext[K comparable, T, R any] struct {
	StreamFunction[R]
	// contains filtered or unexported fields
}

type MultiJoinLinkStream

type MultiJoinLinkStream[K comparable, T1, T2, R any] struct {
	// contains filtered or unexported fields
}

func (*MultiJoinLinkStream[K, T1, T2, R]) Consume

func (s *MultiJoinLinkStream[K, T1, T2, R]) Consume(value datastruct.KeyValue[K, T2])

func (*MultiJoinLinkStream[K, T1, T2, R]) GetConfig

func (s *MultiJoinLinkStream[K, T1, T2, R]) GetConfig() *config.StreamConfig

func (*MultiJoinLinkStream[K, T1, T2, R]) GetConsumers added in v0.0.134

func (s *MultiJoinLinkStream[K, T1, T2, R]) GetConsumers() []Stream

func (*MultiJoinLinkStream[K, T1, T2, R]) GetEnvironment added in v0.0.134

func (s *MultiJoinLinkStream[K, T1, T2, R]) GetEnvironment() ServiceExecutionEnvironment

func (*MultiJoinLinkStream[K, T1, T2, R]) GetId

func (s *MultiJoinLinkStream[K, T1, T2, R]) GetId() int

func (*MultiJoinLinkStream[K, T1, T2, R]) GetName

func (s *MultiJoinLinkStream[K, T1, T2, R]) GetName() string

func (*MultiJoinLinkStream[K, T1, T2, R]) GetTransformationName

func (s *MultiJoinLinkStream[K, T1, T2, R]) GetTransformationName() string

func (*MultiJoinLinkStream[K, T1, T2, R]) GetTypeName

func (s *MultiJoinLinkStream[K, T1, T2, R]) GetTypeName() string

func (*MultiJoinLinkStream[K, T1, T2, R]) Validate added in v0.0.167

func (s *MultiJoinLinkStream[K, T1, T2, R]) Validate() error

type MultiJoinStream

type MultiJoinStream[K comparable, T, R any] struct {
	ConsumedStream[R]
	// contains filtered or unexported fields
}

func MakeMultiJoinStream

func MakeMultiJoinStream[K comparable, T, R any](
	name string, leftStream TypedStream[datastruct.KeyValue[K, T]],
	f MultiJoinFunction[K, T, R]) *MultiJoinStream[K, T, R]

func (*MultiJoinStream[K, T, R]) Consume

func (s *MultiJoinStream[K, T, R]) Consume(value datastruct.KeyValue[K, T])

func (*MultiJoinStream[K, T, R]) ConsumeRight

func (s *MultiJoinStream[K, T, R]) ConsumeRight(index int, value datastruct.KeyValue[K, interface{}])

func (*MultiJoinStream[K, T, R]) Out

func (s *MultiJoinStream[K, T, R]) Out(value R)

type NetworkData

type NetworkData struct {
	Nodes []*Node `json:"nodes"`
	Edges []*Edge `json:"edges"`
}

type Node

type Node struct {
	Id    int `json:"id"`
	Color struct {
		Background string `json:"background"`
	} `json:"color"`
	Opacity float32 `json:"opacity"`
	Label   string  `json:"label"`
	X       int     `json:"x"`
	Y       int     `json:"y"`
}

type OutStubBinaryKVStream

type OutStubBinaryKVStream[T any] struct {
	ServiceStream[T]
	// contains filtered or unexported fields
}

func MakeOutStubBinaryKVStream

func MakeOutStubBinaryKVStream[T any](name string, stream TypedStream[T],
	consumer BinaryKVConsumerFunc) *OutStubBinaryKVStream[T]

func (*OutStubBinaryKVStream[T]) Consume

func (s *OutStubBinaryKVStream[T]) Consume(value T)

func (*OutStubBinaryKVStream[T]) GetConsumers added in v0.0.170

func (s *OutStubBinaryKVStream[T]) GetConsumers() []Stream

type OutStubBinaryStream

type OutStubBinaryStream[T any] struct {
	ServiceStream[T]
	// contains filtered or unexported fields
}

func MakeOutStubBinaryStream

func MakeOutStubBinaryStream[T any](name string, stream TypedStream[T],
	consumer BinaryConsumerFunc) *OutStubBinaryStream[T]

func (*OutStubBinaryStream[T]) Consume

func (s *OutStubBinaryStream[T]) Consume(value T)

func (*OutStubBinaryStream[T]) GetConsumers added in v0.0.170

func (s *OutStubBinaryStream[T]) GetConsumers() []Stream

type OutStubStream

type OutStubStream[T any] struct {
	ServiceStream[T]
	// contains filtered or unexported fields
}

func MakeOutStubStream

func MakeOutStubStream[T any](name string, stream TypedStream[T],
	consumer ConsumerFunc[T]) *OutStubStream[T]

func (*OutStubStream[T]) Consume

func (s *OutStubStream[T]) Consume(value T)

func (*OutStubStream[T]) GetConsumers added in v0.0.170

func (s *OutStubStream[T]) GetConsumers() []Stream

type OutputDataSink

type OutputDataSink struct {
	// contains filtered or unexported fields
}

func MakeOutputDataSink

func MakeOutputDataSink(dataConnector *config.DataConnectorConfig, environment ServiceExecutionEnvironment) *OutputDataSink

func (*OutputDataSink) AddEndpoint

func (ds *OutputDataSink) AddEndpoint(endpoint SinkEndpoint)

func (*OutputDataSink) GetDataConnector

func (ds *OutputDataSink) GetDataConnector() *config.DataConnectorConfig

func (*OutputDataSink) GetEndpoint

func (ds *OutputDataSink) GetEndpoint(id int) SinkEndpoint

func (*OutputDataSink) GetEndpoints

func (ds *OutputDataSink) GetEndpoints() Collection[SinkEndpoint]

func (*OutputDataSink) GetEnvironment added in v0.0.134

func (ds *OutputDataSink) GetEnvironment() ServiceExecutionEnvironment

func (*OutputDataSink) GetId

func (ds *OutputDataSink) GetId() int

func (*OutputDataSink) GetName

func (ds *OutputDataSink) GetName() string

type OutputEndpointConsumer

type OutputEndpointConsumer interface {
	Endpoint() SinkEndpoint
}

type ParallelsFunction

type ParallelsFunction[T, R any] interface {
	Parallels(Stream, T, Collect[R])
}

type ParallelsFunctionContext

type ParallelsFunctionContext[T, R any] struct {
	StreamFunction[R]
	// contains filtered or unexported fields
}

type ParallelsStream

type ParallelsStream[T, R any] struct {
	ConsumedStream[R]
	// contains filtered or unexported fields
}

func MakeParallelsStream

func MakeParallelsStream[T, R any](name string, stream TypedStream[T], f ParallelsFunction[T, R]) *ParallelsStream[T, R]

func (*ParallelsStream[T, R]) Consume

func (s *ParallelsStream[T, R]) Consume(value T)

type ServiceApp

type ServiceApp struct {
	// contains filtered or unexported fields
}

func (*ServiceApp) AddDataSink

func (app *ServiceApp) AddDataSink(dataSink DataSink)

func (*ServiceApp) AddDataSource

func (app *ServiceApp) AddDataSource(dataSource DataSource)

func (*ServiceApp) AppConfig added in v0.0.161

func (app *ServiceApp) AppConfig() *config.ServiceAppConfig

func (*ServiceApp) Delay

func (app *ServiceApp) Delay(duration time.Duration, f func())

func (*ServiceApp) GetConsumeTimeout

func (app *ServiceApp) GetConsumeTimeout(from int, to int) time.Duration

func (*ServiceApp) GetDataSink

func (app *ServiceApp) GetDataSink(id int) DataSink

func (*ServiceApp) GetDataSource

func (app *ServiceApp) GetDataSource(id int) DataSource

func (*ServiceApp) GetEndpointReader

func (app *ServiceApp) GetEndpointReader(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointReader

func (*ServiceApp) GetEndpointWriter

func (app *ServiceApp) GetEndpointWriter(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointWriter

func (*ServiceApp) GetHttpRoute added in v0.0.184

func (app *ServiceApp) GetHttpRoute() httproute.HttpRoute

func (*ServiceApp) GetPriorityTaskPool added in v0.0.180

func (app *ServiceApp) GetPriorityTaskPool(name string) pool.PriorityTaskPool

func (*ServiceApp) GetRuntime added in v0.0.134

func (app *ServiceApp) GetRuntime() ServiceExecutionRuntime

func (*ServiceApp) GetTaskPool added in v0.0.180

func (app *ServiceApp) GetTaskPool(name string) pool.TaskPool

func (*ServiceApp) Log added in v0.0.161

func (app *ServiceApp) Log() log.Logger

func (*ServiceApp) Metrics added in v0.0.161

func (app *ServiceApp) Metrics() metrics.Metrics

func (*ServiceApp) Release added in v0.0.161

func (app *ServiceApp) Release()

func (*ServiceApp) ReloadConfig

func (app *ServiceApp) ReloadConfig(config config.Config)

func (*ServiceApp) ServiceConfig added in v0.0.161

func (app *ServiceApp) ServiceConfig() *config.ServiceConfig

func (*ServiceApp) ServiceDependency added in v0.0.176

func (app *ServiceApp) ServiceDependency() environment.ServiceDependency

func (*ServiceApp) ServiceInit added in v0.0.184

func (app *ServiceApp) ServiceInit() error

func (*ServiceApp) Start

func (app *ServiceApp) Start(ctx context.Context) error

func (*ServiceApp) Stop

func (app *ServiceApp) Stop(ctx context.Context)

type ServiceExecutionEnvironment added in v0.0.134

type ServiceExecutionEnvironment interface {
	environment.ServiceEnvironment
	GetSerde(valueType reflect.Type) (serde.Serializer, error)
	ServiceInit() error
	StreamsInit(ctx context.Context)
	SetConfig(config config.Config)
	Start(context.Context) error
	Stop(context.Context)
	AddDataSource(dataSource DataSource)
	GetDataSource(id int) DataSource
	GetTaskPool(name string) pool.TaskPool
	GetPriorityTaskPool(name string) pool.PriorityTaskPool
	AddDataSink(dataSink DataSink)
	GetDataSink(id int) DataSink
	GetConsumeTimeout(from int, to int) time.Duration
	GetEndpointReader(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointReader
	GetEndpointWriter(endpoint Endpoint, stream Stream, valueType reflect.Type) EndpointWriter
	Delay(duration time.Duration, f func())
	GetRuntime() ServiceExecutionRuntime
	Release()
	GetHttpRoute() httproute.HttpRoute
}

type ServiceExecutionRuntime added in v0.0.134

type ServiceExecutionRuntime interface {
	// contains filtered or unexported methods
}

type ServiceLoader added in v0.0.145

type ServiceLoader interface {
	Stop()
}

type ServiceStream

type ServiceStream[T any] struct {
	// contains filtered or unexported fields
}

func (*ServiceStream[T]) GetConfig added in v0.0.163

func (s *ServiceStream[T]) GetConfig() *config.StreamConfig

func (*ServiceStream[T]) GetEnvironment added in v0.0.163

func (s *ServiceStream[T]) GetEnvironment() ServiceExecutionEnvironment

func (*ServiceStream[T]) GetId added in v0.0.163

func (s *ServiceStream[T]) GetId() int

func (*ServiceStream[T]) GetName added in v0.0.163

func (s *ServiceStream[T]) GetName() string

func (*ServiceStream[T]) GetTransformationName added in v0.0.163

func (s *ServiceStream[T]) GetTransformationName() string

func (*ServiceStream[T]) GetTypeName added in v0.0.163

func (s *ServiceStream[T]) GetTypeName() string

func (*ServiceStream[T]) Validate added in v0.0.167

func (s *ServiceStream[T]) Validate() error

type SinkCallback added in v0.0.186

type SinkCallback[T any] interface {
	Done(T, error)
}

type SinkConsumer added in v0.0.186

type SinkConsumer[T any] interface {
	Consumer[T]
	SetSinkCallback(SinkCallback[T])
}

type SinkEndpoint

type SinkEndpoint interface {
	Endpoint
	GetConfig() *config.EndpointConfig
	GetEnvironment() ServiceExecutionEnvironment
	GetDataSink() DataSink
	AddEndpointConsumer(consumer OutputEndpointConsumer)
	GetEndpointConsumers() Collection[OutputEndpointConsumer]
}

type SinkFunction added in v0.0.190

type SinkFunction[T, R any] interface {
	Sink(Stream, T, error, Collect[R])
}

type SinkFunctionContext added in v0.0.190

type SinkFunctionContext[T, R any] struct {
	StreamFunction[R]
	// contains filtered or unexported fields
}

type SinkStream

type SinkStream[T, R any] struct {
	ConsumedStream[R]
	// contains filtered or unexported fields
}

func MakeSinkStream

func MakeSinkStream[T, R any](name string, stream TypedStream[T], f SinkFunction[T, R]) *SinkStream[T, R]

func (*SinkStream[T, R]) Consume

func (s *SinkStream[T, R]) Consume(value T)

func (*SinkStream[T, R]) Done added in v0.0.186

func (s *SinkStream[T, R]) Done(value T, err error)

func (*SinkStream[T, R]) GetEndpointId

func (s *SinkStream[T, R]) GetEndpointId() int

func (*SinkStream[T, R]) SetSinkConsumer added in v0.0.186

func (s *SinkStream[T, R]) SetSinkConsumer(sinkConsumer SinkConsumer[T])
type SplitLink[T any] struct {
	// contains filtered or unexported fields
}

func (*SplitLink[T]) Consume

func (s *SplitLink[T]) Consume(value T)

func (*SplitLink[T]) GetConfig

func (s *SplitLink[T]) GetConfig() *config.StreamConfig

func (*SplitLink[T]) GetConsumer

func (s *SplitLink[T]) GetConsumer() TypedStreamConsumer[T]

func (*SplitLink[T]) GetConsumers added in v0.0.134

func (s *SplitLink[T]) GetConsumers() []Stream

func (*SplitLink[T]) GetEnvironment added in v0.0.134

func (s *SplitLink[T]) GetEnvironment() ServiceExecutionEnvironment

func (*SplitLink[T]) GetId

func (s *SplitLink[T]) GetId() int

func (*SplitLink[T]) GetName

func (s *SplitLink[T]) GetName() string

func (*SplitLink[T]) GetSerde

func (s *SplitLink[T]) GetSerde() serde.StreamSerde[T]

func (*SplitLink[T]) GetTransformationName

func (s *SplitLink[T]) GetTransformationName() string

func (*SplitLink[T]) GetTypeName

func (s *SplitLink[T]) GetTypeName() string

func (*SplitLink[T]) SetConsumer

func (s *SplitLink[T]) SetConsumer(consumer TypedStreamConsumer[T])

func (*SplitLink[T]) Validate added in v0.0.167

func (s *SplitLink[T]) Validate() error

type SplitStream

type SplitStream[T any] struct {
	ConsumedStream[T]
	// contains filtered or unexported fields
}

func MakeSplitStream

func MakeSplitStream[T any](name string, stream TypedStream[T]) *SplitStream[T]

func (*SplitStream[T]) AddStream

func (s *SplitStream[T]) AddStream() TypedConsumedStream[T]

func (*SplitStream[T]) Consume

func (s *SplitStream[T]) Consume(value T)

func (*SplitStream[T]) GetConsumers added in v0.0.134

func (s *SplitStream[T]) GetConsumers() []Stream

func (*SplitStream[T]) Validate added in v0.0.167

func (s *SplitStream[T]) Validate() error

type Stream

type Stream interface {
	GetName() string
	GetTransformationName() string
	GetTypeName() string
	GetId() int
	GetConfig() *config.StreamConfig
	GetEnvironment() ServiceExecutionEnvironment
	GetConsumers() []Stream
	Validate() error
}

type StreamFunction

type StreamFunction[T any] struct {
	// contains filtered or unexported fields
}

func (*StreamFunction[T]) AfterCall

func (f *StreamFunction[T]) AfterCall()

func (*StreamFunction[T]) BeforeCall

func (f *StreamFunction[T]) BeforeCall()

type TypedBinaryConsumedStream

type TypedBinaryConsumedStream[T any] interface {
	TypedConsumedStream[T]
	BinaryConsumer
}

type TypedBinaryKVConsumedStream

type TypedBinaryKVConsumedStream[T any] interface {
	TypedConsumedStream[T]
	BinaryKVConsumer
}

type TypedBinaryKVSplitStream

type TypedBinaryKVSplitStream[T any] interface {
	TypedBinaryKVConsumedStream[T]
	AddStream() TypedConsumedStream[T]
}

type TypedBinarySplitStream

type TypedBinarySplitStream[T any] interface {
	TypedBinaryConsumedStream[T]
	AddStream() TypedConsumedStream[T]
}

type TypedConsumedStream

type TypedConsumedStream[T any] interface {
	TypedStream[T]
	Consumer[T]
}

type TypedEndpointReader

type TypedEndpointReader[T any] interface {
	EndpointReader
	Read(io.Reader) (T, error)
}

type TypedEndpointWriter

type TypedEndpointWriter[T any] interface {
	EndpointWriter
	Write(T, io.Writer) error
}

type TypedInputStream

type TypedInputStream[T any] interface {
	TypedConsumedStream[T]
	GetEndpointId() int
}

type TypedJoinConsumedStream

type TypedJoinConsumedStream[K comparable, T1, T2, R any] interface {
	TypedTransformConsumedStream[datastruct.KeyValue[K, T1], R]
	ConsumeRight(datastruct.KeyValue[K, T2])
}

type TypedLinkStream

type TypedLinkStream[T any] interface {
	TypedConsumedStream[T]
	SetSource(TypedConsumedStream[T])
}

type TypedMultiJoinConsumedStream

type TypedMultiJoinConsumedStream[K comparable, T, R any] interface {
	TypedTransformConsumedStream[datastruct.KeyValue[K, T], R]
	ConsumeRight(int, datastruct.KeyValue[K, interface{}])
}

type TypedSinkStream

type TypedSinkStream[T, R any] interface {
	TypedTransformConsumedStream[T, R]
	GetEndpointId() int
	SetSinkConsumer(SinkConsumer[T])
}

type TypedSplitStream

type TypedSplitStream[T any] interface {
	TypedConsumedStream[T]
	AddStream() TypedConsumedStream[T]
}

type TypedStream

type TypedStream[T any] interface {
	Stream
	GetConsumer() TypedStreamConsumer[T]
	GetSerde() serde.StreamSerde[T]
	SetConsumer(TypedStreamConsumer[T])
}

type TypedStreamConsumer

type TypedStreamConsumer[T any] interface {
	Stream
	Consumer[T]
}

type TypedTransformConsumedStream

type TypedTransformConsumedStream[T, R any] interface {
	TypedStream[R]
	Consumer[T]
}

Directories

Path Synopsis
log

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL