collect

package
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2024 License: GPL-3.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrBatchCollect       = errors.New("batch collect")
	ErrCollectStateMap    = errors.New("collect map message from state")
	ErrCollectStateCreate = errors.New("collect map state from input message")
)
View Source
var (
	ErrPersistenceId = errors.New("persistence id")
)

Functions

func NewCollect

func NewCollect(configurations ...runtime.Configuration[*Collect]) stateless.BatchFunction

constructor

func WithCollectCollector

func WithCollectCollector(collect Collector) runtime.Configuration[*Collect]

func WithCollectPersistenceIdFunc

func WithCollectPersistenceIdFunc(persistenceIdFunc func(context.Context, flow.Message[structure.Bytes, structure.Bytes]) (string, error)) runtime.Configuration[*Collect]

configuration

Types

type Aggregator

type Aggregator[S any, K any, V any] func(context.Context, flow.Message[K, V], stateful.State[S]) (stateful.State[S], error)

func ConvertAggregator

func ConvertAggregator[S any, IK any, IV any](
	source Aggregator[S, IK, IV],
	s format.Format[S],
	ik format.Format[IK],
	iv format.Format[IV],
) Aggregator[structure.Bytes, structure.Bytes, structure.Bytes]

func ConvertTopicAggregator

func ConvertTopicAggregator[S any, IK any, IV any](
	source Aggregator[S, IK, IV],
	s format.Format[S],
	inputTopic flow.Topic[IK, IV],
) Aggregator[structure.Bytes, structure.Bytes, structure.Bytes]

type Collect

type Collect struct {
	// contains filtered or unexported fields
}

type Collector

type Collector func(ctx context.Context, persistenceId string, s stateful.State[structure.Bytes]) ([]flow.Message[structure.Bytes, structure.Bytes], error)

func ConvertOneToOneCollector

func ConvertOneToOneCollector[S any, OK any, OV any](
	source OneToOneCollector[S, OK, OV],
	s format.Format[S],
	ok format.Format[OK],
	ov format.Format[OV],
) Collector

func ConvertTopicOneToOneCollector

func ConvertTopicOneToOneCollector[S any, OK any, OV any](
	source OneToOneCollector[S, OK, OV],
	s format.Format[S],
	outputTopic flow.Topic[OK, OV],
) Collector

type OneToOneCollector

type OneToOneCollector[S any, OK any, OV any] func(ctx context.Context, persistenceId string, s stateful.State[S]) (*flow.Message[OK, OV], error)

type OneToTwoCollector

type OneToTwoCollector[S any, OK1 any, OV1 any, OK2 any, OV2 any] func(ctx context.Context, persistenceId string, s stateful.State[S]) (*flow.Message[OK1, OV1], *flow.Message[OK2, OV2], error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL