Documentation ¶
Index ¶
- Constants
- Variables
- func GetKafkaProducer(ctx context.Context, ci inverse.Container) (flow.Producer, error)
- func GetPostgresqlConnection(ctx context.Context, ci inverse.Container) (runtime_bun.BunConnection, error)
- func GetRetry(ctx context.Context, ci inverse.Container) (*runtime_retry.Retry, error)
- func InjectedRuntimes(ci inverse.Container) []runtime.Runtime
- func RegisterCollectorFunction(ci inverse.Container, topic string, ...)
- func RegisterConsumer(container inverse.Container, name string, broker string, ...)
- func RegisterConsumerConfig(ci inverse.Container, ...)
- func RegisterConsumerFunctionInjector(injector inverse.Injector[ConsumerFunction], ci inverse.Container)
- func RegisterConsumerFunctionInstance(instance ConsumerFunction, ci inverse.Container)
- func RegisterConsumerKeyedHandlerConfig(ci inverse.Container, ...)
- func RegisterJoinStatefulFunction(ci inverse.Container, topic string, tableName string, ...)
- func RegisterMaterialiseFunction[T any](ci inverse.Container, topic string, ...)
- func RegisterPostgresql(container inverse.Container, name string, connectionString string, ...)
- func RegisterPostgresqlConfig(ci inverse.Container, ...)
- func RegisterProducer(container inverse.Container, broker string, ...)
- func RegisterProducerConfig(ci inverse.Container, ...)
- func RegisterRetry(container inverse.Container, ...)
- func RegisterRoute(container inverse.Container, port int, ...)
- func RegisterRouteConfig(ci inverse.Container, ...)
- func RegisterRuntime(qualifier string, ci inverse.Container)
- func RegisterStatefulFunction(ci inverse.Container, topic string, tableName string, ...)
- func RegisterStatelessBatchFunction(ci inverse.Container, topic string, fn stateless.BatchFunction)
- func RegisterStatelessSingleFunction(ci inverse.Container, topic string, fn stateless.SingleFunction)
- func RegisterStatelessSingleFunctionWithKey(ci inverse.Container, topic string, fn stateless.SingleFunction, ...)
- func ResolveConsumerBatchFunction(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_sarama.KeyedHandler], error)
- func ResolveConsumerHandlerConfiguration(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_sarama.Consumer], error)
- func ResolveConsumerKeyFunction(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_sarama.KeyedHandler], error)
- func ResolveConsumerTopics(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_sarama.Consumer], error)
- func ResolveRouteProducer(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_bunrouter.Router], error)
- type CollectorOneToOneConfiguration
- type ConsumerFunction
- type JoinPostgresqlFunctionConfiguration
- type Main
- type MaterialisePostgresqlOneToOneFunctionConfiguration
- type Prebuilt
- type RouterAdapterConfiguration
- type RuntimeFacade
- type StatefulPostgresqlOneToOneFunctionConfiguration
- type StatefulPostgresqlOneToTwoFunctionConfiguration
- type StatelessOneToOneConfiguration
- type StatelessOneToOneExplodeConfiguration
- type StatelessOneToTwoConfiguration
Constants ¶
View Source
const ( QualifierKafkaConsumer = "QualifierKafkaConsumer" QualifierKafkaConsumerHandler = "QualifierKafkaConsumerHandler" QualifierKafkaConsumerBatchFunction = "QualifierKafkaConsumerBatchFunction" QualifierKafkaConsumerKeyFunction = "QualifierKafkaConsumerKeyFunction" QualifierConsumerFunction = "QualifierFlowStateless" )
View Source
const (
AllInstances = "all_instances"
)
View Source
const (
QualifierPostgresqlConnection = "QualifierPostgresqlConnection"
)
View Source
const (
QualifierRetry = "QualifierRetry"
)
View Source
const (
QualifierRoute = "QualifierRoute"
)
View Source
const (
QualifierRuntime = "QualifierRuntime"
)
Variables ¶
View Source
var (
QualifierKafkaProducer = "QualifierKafkaProducer"
)
Functions ¶
func GetKafkaProducer ¶ added in v0.0.20
func GetPostgresqlConnection ¶ added in v0.2.3
func GetPostgresqlConnection(ctx context.Context, ci inverse.Container) (runtime_bun.BunConnection, error)
func InjectedRuntimes ¶ added in v0.0.20
func RegisterCollectorFunction ¶ added in v0.2.4
func RegisterConsumer ¶ added in v0.1.0
func RegisterConsumer( container inverse.Container, name string, broker string, configs []runtime.Configuration[*runtime_sarama.Consumer], )
func RegisterConsumerConfig ¶ added in v0.1.4
func RegisterConsumerConfig(ci inverse.Container, configs ...runtime.Configuration[*runtime_sarama.Consumer])
func RegisterConsumerFunctionInjector ¶ added in v0.2.4
func RegisterConsumerFunctionInjector(injector inverse.Injector[ConsumerFunction], ci inverse.Container)
func RegisterConsumerFunctionInstance ¶ added in v0.2.4
func RegisterConsumerFunctionInstance(instance ConsumerFunction, ci inverse.Container)
func RegisterConsumerKeyedHandlerConfig ¶ added in v0.2.6
func RegisterConsumerKeyedHandlerConfig(ci inverse.Container, configs ...runtime.Configuration[*runtime_sarama.KeyedHandler])
func RegisterJoinStatefulFunction ¶ added in v0.2.6
func RegisterMaterialiseFunction ¶ added in v0.2.4
func RegisterMaterialiseFunction[T any]( ci inverse.Container, topic string, fn materialise.MapFunction[structure.Bytes, structure.Bytes, T], )
func RegisterPostgresql ¶ added in v0.0.20
func RegisterPostgresql( container inverse.Container, name string, connectionString string, configs []runtime.Configuration[*runtime_bun.PostgresqlConnection], )
func RegisterPostgresqlConfig ¶ added in v0.1.4
func RegisterPostgresqlConfig(ci inverse.Container, configs ...runtime.Configuration[*runtime_bun.PostgresqlConnection])
func RegisterProducer ¶ added in v0.0.20
func RegisterProducer( container inverse.Container, broker string, configs []runtime.Configuration[*runtime_sarama.Producer], )
func RegisterProducerConfig ¶ added in v0.0.20
func RegisterProducerConfig(ci inverse.Container, configs ...runtime.Configuration[*runtime_sarama.Producer])
func RegisterRetry ¶ added in v0.0.20
func RegisterRetry( container inverse.Container, configs []runtime.Configuration[*runtime_retry.Retry], )
Retry
func RegisterRoute ¶ added in v0.0.20
func RegisterRoute( container inverse.Container, port int, configs []runtime.Configuration[*runtime_bunrouter.Router], )
func RegisterRouteConfig ¶ added in v0.1.4
func RegisterRouteConfig(ci inverse.Container, configs ...runtime.Configuration[*runtime_bunrouter.Router])
func RegisterRuntime ¶ added in v0.2.4
func RegisterStatefulFunction ¶ added in v0.2.4
func RegisterStatelessBatchFunction ¶ added in v0.2.5
func RegisterStatelessBatchFunction( ci inverse.Container, topic string, fn stateless.BatchFunction, )
func RegisterStatelessSingleFunction ¶ added in v0.2.4
func RegisterStatelessSingleFunction( ci inverse.Container, topic string, fn stateless.SingleFunction, )
func RegisterStatelessSingleFunctionWithKey ¶ added in v0.2.4
func ResolveConsumerBatchFunction ¶ added in v0.2.4
func ResolveConsumerBatchFunction(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_sarama.KeyedHandler], error)
func ResolveConsumerHandlerConfiguration ¶ added in v0.2.4
func ResolveConsumerHandlerConfiguration(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_sarama.Consumer], error)
func ResolveConsumerKeyFunction ¶ added in v0.2.4
func ResolveConsumerKeyFunction(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_sarama.KeyedHandler], error)
func ResolveConsumerTopics ¶ added in v0.2.4
func ResolveConsumerTopics(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_sarama.Consumer], error)
func ResolveRouteProducer ¶ added in v0.2.4
func ResolveRouteProducer(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_bunrouter.Router], error)
Types ¶
type CollectorOneToOneConfiguration ¶ added in v0.1.3
type CollectorOneToOneConfiguration[S any, IK any, IV any, OK any, OV any] struct { Name string InputTopic flow.Topic[IK, IV] OutputTopic flow.Topic[OK, OV] Aggregator collect.Aggregator[S, IK, IV] Collector collect.OneToOneCollector[S, OK, OV] InputBroker string OutputBroker string HttpPort int StateFormat format.Format[S] StateKeyFunction stateful.PersistenceIdFunction[IK, IV] KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer] KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer] RetryConfiguration []runtime.Configuration[*runtime_retry.Retry] RouteConfiguration []runtime.Configuration[*runtime_bunrouter.Router] }
Wiring configuration
func (CollectorOneToOneConfiguration[S, IK, IV, OK, OV]) Register ¶ added in v0.1.3
func (c CollectorOneToOneConfiguration[S, IK, IV, OK, OV]) Register(ci inverse.Container)
type ConsumerFunction ¶ added in v0.2.4
type ConsumerFunction struct { Topic string Fn stateless.BatchFunction Key stateful.PersistenceIdFunction[structure.Bytes, structure.Bytes] }
type JoinPostgresqlFunctionConfiguration ¶
type JoinPostgresqlFunctionConfiguration struct { Name string StatefulFunctions map[string]stateful.SingleFunction PersistenceIdFunctions map[string]stateful.PersistenceIdFunction[[]byte, []byte] IntermediateTopicName string InputBroker string OutputBroker string HttpPort int PostgresConnectionString string PostgresTable string PostgresqlConfiguration []runtime.Configuration[*runtime_bun.PostgresqlConnection] KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer] KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer] RetryConfiguration []runtime.Configuration[*runtime_retry.Retry] RouteConfiguration []runtime.Configuration[*runtime_bunrouter.Router] }
func (JoinPostgresqlFunctionConfiguration) Register ¶ added in v0.1.1
func (c JoinPostgresqlFunctionConfiguration) Register(ci inverse.Container)
type Main ¶
type Main interface { Runtimes(i string, r func(inverse.Container) []runtime.Runtime) error Prebuilt(i string, rf func(ci inverse.Container) Prebuilt) error Start(i string) error }
func NewMainAllInstance ¶ added in v0.2.7
type MaterialisePostgresqlOneToOneFunctionConfiguration ¶ added in v0.0.17
type MaterialisePostgresqlOneToOneFunctionConfiguration[S any, IK any, IV any] struct { Name string InputTopic flow.Topic[IK, IV] Function materialise.MapFunction[IK, IV, S] InputBroker string OutputBroker string HttpPort int PostgresConnectionString string PostgresqlConfiguration []runtime.Configuration[*runtime_bun.PostgresqlConnection] KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer] KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer] RetryConfiguration []runtime.Configuration[*runtime_retry.Retry] RouteConfiguration []runtime.Configuration[*runtime_bunrouter.Router] }
Wiring configuration
func (MaterialisePostgresqlOneToOneFunctionConfiguration[S, IK, IV]) Register ¶ added in v0.1.1
func (c MaterialisePostgresqlOneToOneFunctionConfiguration[S, IK, IV]) Register(ci inverse.Container)
type RouterAdapterConfiguration ¶ added in v0.0.16
type RouterAdapterConfiguration[Request any, InputKey any, InputValue any] struct { Name string ProduceTopic flow.Topic[InputKey, InputValue] ProduceBroker string RequestBodyFormat format.Format[Request] RequestMapFunction stateless.OneToOneFunction[structure.Bytes, Request, InputKey, InputValue] HttpPort int KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer] RouteConfiguration []runtime.Configuration[*runtime_bunrouter.Router] }
Wiring configuration
func (RouterAdapterConfiguration[Request, InputKey, InputValue]) Register ¶ added in v0.1.1
func (c RouterAdapterConfiguration[Request, InputKey, InputValue]) Register(ci inverse.Container)
type RuntimeFacade ¶ added in v0.0.13
func (*RuntimeFacade) Start ¶ added in v0.0.13
func (r *RuntimeFacade) Start() error
func (*RuntimeFacade) Stop ¶ added in v0.0.13
func (r *RuntimeFacade) Stop()
type StatefulPostgresqlOneToOneFunctionConfiguration ¶ added in v0.0.15
type StatefulPostgresqlOneToOneFunctionConfiguration[S any, IK any, IV any, OK any, OV any] struct { Name string InputTopic flow.Topic[IK, IV] OutputTopic flow.Topic[OK, OV] Function stateful.OneToOneFunction[S, IK, IV, OK, OV] InputBroker string OutputBroker string HttpPort int StateFormat format.Format[S] StateKeyFunction stateful.PersistenceIdFunction[IK, IV] PostgresTable string PostgresConnectionString string PostgresqlConfiguration []runtime.Configuration[*runtime_bun.PostgresqlConnection] KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer] KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer] RetryConfiguration []runtime.Configuration[*runtime_retry.Retry] RouteConfiguration []runtime.Configuration[*runtime_bunrouter.Router] }
Wiring configuration
func (StatefulPostgresqlOneToOneFunctionConfiguration[S, IK, IV, OK, OV]) Register ¶ added in v0.1.1
func (c StatefulPostgresqlOneToOneFunctionConfiguration[S, IK, IV, OK, OV]) Register(ci inverse.Container)
type StatefulPostgresqlOneToTwoFunctionConfiguration ¶ added in v0.0.15
type StatefulPostgresqlOneToTwoFunctionConfiguration[S any, IK any, IV any, OK1 any, OV1 any, OK2 any, OV2 any] struct { Name string InputTopic flow.Topic[IK, IV] OutputTopicOne flow.Topic[OK1, OV1] OutputTopicTwo flow.Topic[OK2, OV2] Function stateful.OneToTwoFunction[S, IK, IV, OK1, OV1, OK2, OV2] InputBroker string OutputBroker string HttpPort int StateFormat format.Format[S] StateKeyFunction stateful.PersistenceIdFunction[IK, IV] PostgresTable string PostgresConnectionString string PostgresqlConfiguration []runtime.Configuration[*runtime_bun.PostgresqlConnection] KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer] KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer] RetryConfiguration []runtime.Configuration[*runtime_retry.Retry] RouteConfiguration []runtime.Configuration[*runtime_bunrouter.Router] }
Wiring configuration
func (StatefulPostgresqlOneToTwoFunctionConfiguration[S, IK, IV, OK1, OV1, OK2, OV2]) Register ¶ added in v0.1.1
func (c StatefulPostgresqlOneToTwoFunctionConfiguration[S, IK, IV, OK1, OV1, OK2, OV2]) Register(ci inverse.Container)
type StatelessOneToOneConfiguration ¶ added in v0.0.15
type StatelessOneToOneConfiguration[IK any, IV any, OK any, OV any] struct { Name string InputTopic flow.Topic[IK, IV] OutputTopic flow.Topic[OK, OV] Function stateless.OneToOneFunction[IK, IV, OK, OV] ErrorHandler stateless.ErrorHandlerFunction InputBroker string OutputBroker string HttpPort int KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer] KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer] RetryConfiguration []runtime.Configuration[*runtime_retry.Retry] RouteConfiguration []runtime.Configuration[*runtime_bunrouter.Router] }
func (StatelessOneToOneConfiguration[IK, IV, OK, OV]) Register ¶ added in v0.1.1
func (c StatelessOneToOneConfiguration[IK, IV, OK, OV]) Register(ci inverse.Container)
type StatelessOneToOneExplodeConfiguration ¶ added in v0.0.15
type StatelessOneToOneExplodeConfiguration[IK any, IV any, OK any, OV any] struct { Name string InputTopic flow.Topic[IK, IV] OutputTopic flow.Topic[OK, OV] Function stateless.OneToOneExplodeFunction[IK, IV, OK, OV] InputBroker string OutputBroker string HttpPort int KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer] KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer] RetryConfiguration []runtime.Configuration[*runtime_retry.Retry] RouteConfiguration []runtime.Configuration[*runtime_bunrouter.Router] }
func (StatelessOneToOneExplodeConfiguration[IK, IV, OK, OV]) Register ¶ added in v0.1.1
func (c StatelessOneToOneExplodeConfiguration[IK, IV, OK, OV]) Register(ci inverse.Container)
type StatelessOneToTwoConfiguration ¶ added in v0.0.15
type StatelessOneToTwoConfiguration[IK any, IV any, OK1 any, OV1 any, OK2 any, OV2 any] struct { Name string InputTopic flow.Topic[IK, IV] OutputTopicOne flow.Topic[OK1, OV1] OutputTopicTwo flow.Topic[OK2, OV2] Function stateless.OneToTwoFunction[IK, IV, OK1, OV1, OK2, OV2] InputBroker string OutputBroker string HttpPort int KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer] KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer] RetryConfiguration []runtime.Configuration[*runtime_retry.Retry] RouteConfiguration []runtime.Configuration[*runtime_bunrouter.Router] }
func (StatelessOneToTwoConfiguration[IK, IV, OK1, OV1, OK2, OV2]) Register ¶ added in v0.1.1
func (c StatelessOneToTwoConfiguration[IK, IV, OK1, OV1, OK2, OV2]) Register(ci inverse.Container)
Source Files ¶
- register_consumer.go
- register_consumer_function.go
- register_postgresql.go
- register_producer.go
- register_retry.go
- register_route.go
- register_runtime.go
- runtime_collector_one_to_one.go
- runtime_postgresql_join.go
- runtime_postgresql_materialise_one_to_one.go
- runtime_postgresql_stateful_one_to_one.go
- runtime_postgresql_stateful_one_to_two.go
- runtime_router_adapter.go
- runtime_stateless_one_to_one.go
- runtime_stateless_one_to_one_explode.go
- runtime_stateless_one_to_two.go
- util_runtime_facade.go
- util_runtime_inverse.go
- util_runtime_main.go
Directories ¶
Path | Synopsis |
---|---|
Package mock is a generated GoMock package.
|
Package mock is a generated GoMock package. |
Click to show internal directories.
Click to hide internal directories.