Documentation ¶
Index ¶
- Variables
- func NewCollect(configurations ...runtime.Configuration[*Collect]) stateless.BatchFunction
- func WithCollectAggregator(next Aggregator[structure.Bytes, structure.Bytes, structure.Bytes]) runtime.Configuration[*Collect]
- func WithCollectCollector(collect Collector) runtime.Configuration[*Collect]
- func WithCollectPersistenceIdFunc(...) runtime.Configuration[*Collect]
- type Aggregator
- func ConvertAggregator[S any, IK any, IV any](source Aggregator[S, IK, IV], s format.Format[S], ik format.Format[IK], ...) Aggregator[structure.Bytes, structure.Bytes, structure.Bytes]
- func ConvertTopicAggregator[S any, IK any, IV any](source Aggregator[S, IK, IV], s format.Format[S], ...) Aggregator[structure.Bytes, structure.Bytes, structure.Bytes]
- type Collect
- type Collector
- type OneToOneCollector
- type OneToTwoCollector
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrBatchCollect = errors.New("batch collect") ErrCollectStateMap = errors.New("collect map message from state") ErrCollectStateCreate = errors.New("collect map state from input message") )
View Source
var (
ErrPersistenceId = errors.New("persistence id")
)
Functions ¶
func NewCollect ¶
func NewCollect(configurations ...runtime.Configuration[*Collect]) stateless.BatchFunction
constructor
func WithCollectAggregator ¶
func WithCollectAggregator(next Aggregator[structure.Bytes, structure.Bytes, structure.Bytes]) runtime.Configuration[*Collect]
func WithCollectCollector ¶
func WithCollectCollector(collect Collector) runtime.Configuration[*Collect]
Types ¶
type Aggregator ¶
type Aggregator[S any, K any, V any] func(context.Context, flow.Message[K, V], stateful.State[S]) (stateful.State[S], error)
func ConvertAggregator ¶
type Collector ¶
type Collector func(ctx context.Context, persistenceId string, s stateful.State[structure.Bytes]) ([]flow.Message[structure.Bytes, structure.Bytes], error)
type OneToOneCollector ¶
Click to show internal directories.
Click to hide internal directories.