flows

package module
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2024 License: GPL-3.0 Imports: 34 Imported by: 10

README

Flows

Simple declarative and functional dataflow with Kafka and RabbitMQ.

Developing

Tools Used

Please follow the tools setup guide in its respective repository or documentation.

  • Golang
  • Gomock
  • Podman / docker
  • Protoc
  • Protoc-gen-go
  • Psql
Commands

Makefile is heavily used.

Commands used when coding:

make test
make tidy
make update
make mocks
make proto
make cov
make htmlcov

Commands used when running examples locally:

make reset
make run
make listen
make group-delete

Main func is in main folder instead of the project root, as the project root is used for library package.

Running Example

See the source in example folder

docker-compose up -d
make reset
make run
  1. docker-compose up -d will start zookeeper, kafka, and postgresql with ports exposed to your host network detached from your terminal
  2. make reset will clean up the topics on kafka and postgresql table, and add some example events
  3. make run will start the example word count application

There are three examples, its manually switched via comments in the main folder in main.go

Test

Some tests uses testcontainers-go which are set up for rootless Podman based context. Switching between different container technology will be done some time in the future.

Principles

  • Container first
  • Do one thing, and only one thing well
  • Ease to change integrations
  • Protobuf as primary bytes encoding
  • Sane defaults
  • Simple format helpers, with bytes as default
  • Idiomatic golang

Semantics

At least once publishing with effectively once state update. Additional application based deduplication is recommended (request id header deduplication for instance).

Integrations

  • Kafka using Sarama
  • Postgresql using Bun

Patterns

Flows sits at the core of the Kappa Architecture, where it tackles four elements:

  1. Stateless functions
  2. Stateful functions
  3. Join as combination of stateless and stateful functions
  4. Materialisation
  5. Collector
  6. Long running tasks
Stateless

Stateless functions can be used to perform simple operations such as:

  1. Basic event filtering
  2. Event mirroring
  3. Merging multiple topics into one
  4. Exploding events
  5. Interfacing with external parties with at least once semantic
Stateful

Stateful functions can be used to perform state machine operations on exactly one topic to perform operations such as:

  1. Reduce or aggregate
  2. Validation
Join

To ensure events are published for the multiple topics that are being joined, there are two options:

  1. Maintain publishing state for all topics (currently, the last result state is global per key, this can be changed to be per topic per key)
  2. Merge topics into one intermediate topic, and perform a stateful function

In this codebase, option two is the chosen option for reasons of:

  1. Avoiding to impose limits on the number of messages being published, which can increase the state size written into the store, which implication should be obvious
  2. Stateless map that can be used to merge topics are cheaper in terms of time latency than transaction locking failure
  3. Kafka can be configured to be scalable enough in terms of throughput
  4. Parallelism of the intermediate topic (partition count) can be higher than the source topics
  5. Avoiding transaction will reduce cost and increase speed especially for cloud services
  6. Avoiding distributed data contention allows local state caching, reducing data query latency for cache hits

Join Pattern

It is recommended to use your own custom intermediate topic mapper for better control of your dataflow. However, a standard implementation is provided as a reference.

Materialiser

Materialise function batch upserts into database.

Collector

In an extremely high throughput situation, it makes a lot of sense to:

  1. Pre-aggregate input topics
  2. Perform stateful operation against pre-aggregated output

This reduces the amount of state writes performed. Even when RAM based reliable redundant storage is used, it is still wise to pre-aggregate in a very high throughput situation against stateful operations.

Example:

  1. In an ecommerce inventory handling for orders, the orders can be pre-aggregated (to a certain extent where the message size does not blow up the Kafka cluster), and final inventory check is performed one time for all the pre-aggregated orders
  2. In an event space management system, the updates against seat booking can be pre-aggregated before checking against the entire section
Tasks

Long running actions are tricky to deal with in Kafka, because partitions can be blocked for uneven long running task duration. The solution is to use other types of message queue, such as RabbitMQ with AMQP.

Limitations

To keep the simplicity of implementation, temporal operations are not yet considered in this project. Examples of temporal operations that are not considered for implementation yet:

  1. End of time window only publishing. With states, a window can be emulated, but an output will be published for each message received instead of only at the end of the window.
  2. Per-key publication rate limiter. Combining state storage, commit offset, and real time ticks can be implemented, however that complicates the interfaces needed.

To Do

  1. Integrations
    1. MongoDB
    2. Cassandra
    3. AWS DynamoDB
    4. GCP Bigtable
  2. Local state caching
  3. Unit test coverage
  4. Replace prometheus with otel

Notes

This project solves dataflow in a very specific way. If you are interested to improve this project more or have some feedback, please contact me.

Kafka Migration

The way to migrate stateful functions to a mirrored Kafka cluster are by:

  1. Stopping the job
  2. Clearing the internal column / field
  3. Starting the job

Due to the fact that offset numbers are different in mirrored Kafka cluster, an additional application functionality side deduplication will be required to ensure that stateful operation does not get executed twice. Such deduplication can be peformed using a unique Kafka header identifier.

However, if the application functionality can already tolerate at least once execution, then there will be no problems with migration.

Functions

The functions are used as is, because function for pointer struct can be used as is. As proof, the following unit test will pass.

import (
	"testing"

	"github.com/stretchr/testify/assert"
)

type TestStruct struct {
	Test int
}

func (t *TestStruct) Inc() {
	t.Test += 1
}

type TestStructTwo struct {
	Test int
}

func (t TestStructTwo) Get() int {
	return t.Test
}

func Inc(fn func()) {
	fn()
}

func Get(fn func() int) int {
	return fn()
}

func TestPointerStuff(t *testing.T) {
	assert := assert.New(t)
	testOne := TestStruct{
		Test: 1,
	}
	Inc(testOne.Inc)
	assert.Equal(2, testOne.Test)

	testTwo := &TestStruct{
		Test: 4,
	}
	Inc(testTwo.Inc)
	assert.Equal(5, testTwo.Test)

	testThree := TestStructTwo{
		Test: 100,
	}
	assert.Equal(100, Get(testThree.Get))

	testFour := &TestStructTwo{
		Test: 200,
	}
	assert.Equal(200, Get(testFour.Get))
}

Rootless Podman

You only need to add DOCKER_HOST according to your podman info. Example:

export DOCKER_HOST=unix:///run/user/1000/podman/podman.sock

Microbatching

Microbatching is applied in this repository to achieve better throughput. Maximum batching wait time can be configured. Per message semantics can be achieved by configuration, however throughput will suffer without increasing compute resources.

WHY?

Why do I build this instead of using tools like Spark, Flink, Kafka Streams?

This is my personal view based on experience with those three. Note that I have not tested things like Pulsar functions or NATS jetstream, those might well be solving the same thing.

The three I mentioned are heavyweight data engineering tools. It can continously process data flow with a special DSL, with exactly once semantics (at a cost) and very high throughput (also at a cost).

However, in a complex backend data flow changes, constantly. Often times, its just one step in the middle of the flow. Sometimes its removing a step, sometimes its adding steps, sometimes its reusing steps. Heavyweight tools just doesn't work well with that kind of constant change. Deploy a flow that is too small its costly, deploy a flow that is too big it constantly changes.

So the idea of a lightweight flow comes in. Its similar to Kafka streams, but every single step is independently deployed, every intermediate data types are independently designed. With a schema management system, Kafka, and Kubernetes, its the right balance of performance, ease of deployement, and flexibility. Its also written in golang, so that the resource use of each lightweight flow step is small, yet it can be scaled both horizontally and vertically very well. In theory other language like C++ and Rust will also work, but at the time of implementation I am far more familiar with golang and Java. So golang it is.

This is how I would build and deploy flows:

  1. One repository per bounded context, so that every bounded context is isolated without having hundreds of repositories
  2. One schema management repository (if no tools are used)
  3. "app-domain-function" naming convention everywhere (consumer group, kubernetes deployment, etc)

Documentation

Index

Constants

View Source
const (
	QualifierKafkaConsumer              = "QualifierKafkaConsumer"
	QualifierKafkaConsumerHandler       = "QualifierKafkaConsumerHandler"
	QualifierKafkaConsumerBatchFunction = "QualifierKafkaConsumerBatchFunction"
	QualifierKafkaConsumerKeyFunction   = "QualifierKafkaConsumerKeyFunction"
	QualifierConsumerFunction           = "QualifierFlowStateless"
)
View Source
const (
	QualifierRabbitProducer         = "QualifierRabbitProducer"
	QualifierRabbitConsumerExecutor = "QualifierRabbitConsumerExecutor"
	QualifierRabbitConsumer         = "QualifierRabbitConsumer"
)
View Source
const (
	AllInstances = "all_instances"
)
View Source
const (
	QualifierCron = "QualifierCron"
)
View Source
const (
	QualifierPostgresqlConnection = "QualifierPostgresqlConnection"
)
View Source
const (
	QualifierRetry = "QualifierRetry"
)
View Source
const (
	QualifierRoute = "QualifierRoute"
)
View Source
const (
	QualifierRuntime = "QualifierRuntime"
)

Variables

View Source
var (
	QualifierKafkaProducer = "QualifierKafkaProducer"
)

Functions

func GetKafkaProducer added in v0.0.20

func GetKafkaProducer(ctx context.Context, ci inverse.Container) (flow.Producer, error)

func GetPostgresqlConnection added in v0.2.3

func GetPostgresqlConnection(ctx context.Context, ci inverse.Container) (runtime_bun.BunConnection, error)

func GetRabbitProducer added in v0.3.1

func GetRabbitProducer(ctx context.Context, ci inverse.Container) (task.Producer, error)

func GetRetry added in v0.0.20

func GetRetry(ctx context.Context, ci inverse.Container) (*runtime.Retry, error)

func InjectedRuntimes added in v0.0.20

func InjectedRuntimes(ci inverse.Container) []runtime.Runtime

func InjectorRabbitConsumerExecutorConfiguration added in v0.3.1

func InjectorRabbitConsumerExecutorConfiguration(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_rabbit.Consumer], error)

func RegisterCollectorFunction added in v0.2.4

func RegisterCollectorFunction(
	ci inverse.Container,
	topic string,
	persistenceIdFunction stateful.PersistenceIdFunction[[]byte, []byte],
	aggregator collect.Aggregator[structure.Bytes, structure.Bytes, structure.Bytes],
	collector collect.Collector,
)

func RegisterCron added in v0.3.1

func RegisterCron(
	container inverse.Container,
	configs []runtime.Configuration[*runtime_cron.Cron],
)

func RegisterCronConfig added in v0.3.1

func RegisterCronConfig(ci inverse.Container, configs ...runtime.Configuration[*runtime_cron.Cron])

func RegisterJoinStatefulFunction added in v0.2.6

func RegisterJoinStatefulFunction(
	ci inverse.Container,
	topic string,
	tableName string,
	fn stateful.SingleFunction,
	key stateful.PersistenceIdFunction[structure.Bytes, structure.Bytes],
)

func RegisterKafkaConsumer added in v0.3.1

func RegisterKafkaConsumer(
	container inverse.Container,
	name string,
	broker string,
	configs []runtime.Configuration[*runtime_sarama.Consumer],
)

func RegisterKafkaConsumerConfig added in v0.3.1

func RegisterKafkaConsumerConfig(ci inverse.Container, configs ...runtime.Configuration[*runtime_sarama.Consumer])

func RegisterKafkaConsumerFunctionInjector added in v0.3.1

func RegisterKafkaConsumerFunctionInjector(injector inverse.Injector[KafkaConsumerFunction], ci inverse.Container)

func RegisterKafkaConsumerFunctionInstance added in v0.3.1

func RegisterKafkaConsumerFunctionInstance(instance KafkaConsumerFunction, ci inverse.Container)

func RegisterKafkaConsumerKeyedHandlerConfig added in v0.3.1

func RegisterKafkaConsumerKeyedHandlerConfig(ci inverse.Container, configs ...runtime.Configuration[*runtime_sarama.KeyedHandler])

func RegisterKafkaProducer added in v0.3.1

func RegisterKafkaProducer(
	container inverse.Container,
	broker string,
	configs []runtime.Configuration[*runtime_sarama.Producer],
)

func RegisterKafkaProducerConfig added in v0.3.1

func RegisterKafkaProducerConfig(ci inverse.Container, configs ...runtime.Configuration[*runtime_sarama.Producer])

func RegisterMaterialiseFunction added in v0.2.4

func RegisterMaterialiseFunction[T any](
	ci inverse.Container,
	topic string,
	fn materialise.MapFunction[structure.Bytes, structure.Bytes, T],
)

func RegisterPostgresql added in v0.0.20

func RegisterPostgresql(
	container inverse.Container,
	name string,
	connectionString string,
	configs []runtime.Configuration[*runtime_bun.PostgresqlConnection],
)

func RegisterPostgresqlConfig added in v0.1.4

func RegisterPostgresqlConfig(ci inverse.Container, configs ...runtime.Configuration[*runtime_bun.PostgresqlConnection])

func RegisterProducerRoute added in v0.3.0

func RegisterProducerRoute(ci inverse.Container, method string, path string, bodyMap stateless.OneToOneFunction[structure.Bytes, structure.Bytes, structure.Bytes, structure.Bytes])

func RegisterRabbitConsumer added in v0.3.1

func RegisterRabbitConsumer(
	container inverse.Container,
	name string,
	rabbitConnectionString string,
	channel string,
	configs []runtime.Configuration[*runtime_rabbit.Consumer],
)

func RegisterRabbitConsumerConfig added in v0.3.1

func RegisterRabbitConsumerConfig(ci inverse.Container, configs ...runtime.Configuration[*runtime_rabbit.Consumer])

func RegisterRabbitConsumerExecutor added in v0.3.1

func RegisterRabbitConsumerExecutor(ci inverse.Container, injector func(ctx context.Context, ci inverse.Container) (task.Executor[structure.Bytes], error))

func RegisterRabbitProducer added in v0.3.1

func RegisterRabbitProducer(
	container inverse.Container,
	name string,
	rabbitConnectionString string,
	configs []runtime.Configuration[*runtime_rabbit.Producer],
)

func RegisterRabbitProducerConfig added in v0.3.1

func RegisterRabbitProducerConfig(ci inverse.Container, configs ...runtime.Configuration[*runtime_rabbit.Producer])

func RegisterRetry added in v0.0.20

func RegisterRetry(
	container inverse.Container,
	configs []runtime.Configuration[*runtime.Retry],
)

Retry

func RegisterRoute added in v0.0.20

func RegisterRoute(
	container inverse.Container,
	port int,
	configs []runtime.Configuration[*runtime_chi.Runtime[context.Context]],
)

func RegisterRouteConfig added in v0.1.4

func RegisterRouteConfig(ci inverse.Container, configs ...runtime.Configuration[*runtime_chi.Runtime[context.Context]])

func RegisterRuntime added in v0.2.4

func RegisterRuntime(qualifier string, ci inverse.Container)

func RegisterStatefulFunction added in v0.2.4

func RegisterStatefulFunction(
	ci inverse.Container,
	topic string,
	tableName string,
	fn stateful.SingleFunction,
	key stateful.PersistenceIdFunction[structure.Bytes, structure.Bytes],
)

func RegisterStatelessBatchFunction added in v0.2.5

func RegisterStatelessBatchFunction(
	ci inverse.Container,
	topic string,
	fn stateless.BatchFunction,
)

func RegisterStatelessSingleFunction added in v0.2.4

func RegisterStatelessSingleFunction(
	ci inverse.Container,
	topic string,
	fn stateless.SingleFunction,
)

func RegisterStatelessSingleFunctionWithKey added in v0.2.4

func RegisterStatelessSingleFunctionWithKey(
	ci inverse.Container,
	topic string,
	fn stateless.SingleFunction,
	key stateful.PersistenceIdFunction[structure.Bytes, structure.Bytes],
)

func ResolveCronConfigTaskProducer added in v0.3.1

func ResolveCronConfigTaskProducer(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_cron.Cron], error)

func ResolveKafkaConsumerBatchFunction added in v0.3.1

func ResolveKafkaConsumerBatchFunction(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_sarama.KeyedHandler], error)

func ResolveKafkaConsumerHandlerConfiguration added in v0.3.1

func ResolveKafkaConsumerHandlerConfiguration(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_sarama.Consumer], error)

func ResolveKafkaConsumerKeyFunction added in v0.3.1

func ResolveKafkaConsumerKeyFunction(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_sarama.KeyedHandler], error)

func ResolveKafkaConsumerTopics added in v0.3.1

func ResolveKafkaConsumerTopics(ctx context.Context, ci inverse.Container) (runtime.Configuration[*runtime_sarama.Consumer], error)

Types

type CollectorOneToOneConfiguration added in v0.1.3

type CollectorOneToOneConfiguration[S any, IK any, IV any, OK any, OV any] struct {
	Name                       string
	InputTopic                 flow.Topic[IK, IV]
	OutputTopic                flow.Topic[OK, OV]
	Aggregator                 collect.Aggregator[S, IK, IV]
	Collector                  collect.OneToOneCollector[S, OK, OV]
	InputBroker                string
	OutputBroker               string
	HttpPort                   int
	StateFormat                format.Format[S]
	StateKeyFunction           stateful.PersistenceIdFunction[IK, IV]
	KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer]
	KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer]
	RetryConfiguration         []runtime.Configuration[*runtime.Retry]
	RouteConfiguration         []runtime.Configuration[*runtime_chi.Runtime[context.Context]]
	Neo4jConfiguration         []runtime.Configuration[*runtime_neo4j.Neo4JConnectionBasicAuth]
}

Wiring configuration

func (CollectorOneToOneConfiguration[S, IK, IV, OK, OV]) Register added in v0.1.3

func (c CollectorOneToOneConfiguration[S, IK, IV, OK, OV]) Register(ci inverse.Container)

type CronConfiguration added in v0.3.1

type CronConfiguration[T any] struct {
	Name                        string
	TaskChannel                 task.Channel[T]
	Scheduler                   task.Scheduler[T]
	Schedules                   []string
	TaskConnectionString        string
	HttpPort                    int
	RabbitProducerConfiguration []runtime.Configuration[*runtime_rabbit.Producer]
	RetryConfiguration          []runtime.Configuration[*runtime.Retry]
	RouteConfiguration          []runtime.Configuration[*runtime_chi.Runtime[context.Context]]
}

func (CronConfiguration[T]) Register added in v0.3.1

func (c CronConfiguration[T]) Register(ci inverse.Container)

type ExecutorConfiguration added in v0.3.1

type ExecutorConfiguration[T any] struct {
	Name                        string
	TaskChannel                 task.Channel[T]
	TaskExecutor                task.Executor[T]
	TaskConnectionString        string
	HttpPort                    int
	RabbitConsumerConfiguration []runtime.Configuration[*runtime_rabbit.Consumer]
	RetryConfiguration          []runtime.Configuration[*runtime.Retry]
	RouteConfiguration          []runtime.Configuration[*runtime_chi.Runtime[context.Context]]
}

func (ExecutorConfiguration[T]) Register added in v0.3.1

func (c ExecutorConfiguration[T]) Register(ci inverse.Container)

type JoinPostgresqlFunctionConfiguration

type JoinPostgresqlFunctionConfiguration struct {
	Name                       string
	StatefulFunctions          map[string]stateful.SingleFunction
	PersistenceIdFunctions     map[string]stateful.PersistenceIdFunction[[]byte, []byte]
	IntermediateTopicName      string
	InputBroker                string
	OutputBroker               string
	HttpPort                   int
	PostgresConnectionString   string
	PostgresTable              string
	PostgresqlConfiguration    []runtime.Configuration[*runtime_bun.PostgresqlConnection]
	KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer]
	KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer]
	RetryConfiguration         []runtime.Configuration[*runtime.Retry]
	RouteConfiguration         []runtime.Configuration[*runtime_chi.Runtime[context.Context]]
	Neo4jConfiguration         []runtime.Configuration[*runtime_neo4j.Neo4JConnectionBasicAuth]
}

func (JoinPostgresqlFunctionConfiguration) Register added in v0.1.1

type KafkaConsumerFunction added in v0.3.1

type KafkaConsumerFunction struct {
	Topic string
	Fn    stateless.BatchFunction
	Key   stateful.PersistenceIdFunction[structure.Bytes, structure.Bytes]
}

type Main

type Main interface {
	Runtimes(i string, r func(inverse.Container) []runtime.Runtime) error
	Prebuilt(i string, rf func(ci inverse.Container) Prebuilt) error
	Registrar(i string, rf func(ci inverse.Container)) error
	Start(i string) error
}

func NewMain added in v0.0.10

func NewMain() Main

func NewMainAllInstance added in v0.2.7

func NewMainAllInstance(allInstanceName string) Main

type MaterialisePostgresqlOneToOneFunctionConfiguration added in v0.0.17

type MaterialisePostgresqlOneToOneFunctionConfiguration[S any, IK any, IV any] struct {
	Name                       string
	InputTopic                 flow.Topic[IK, IV]
	Function                   materialise.MapFunction[IK, IV, S]
	InputBroker                string
	OutputBroker               string
	HttpPort                   int
	PostgresConnectionString   string
	PostgresqlConfiguration    []runtime.Configuration[*runtime_bun.PostgresqlConnection]
	KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer]
	KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer]
	RetryConfiguration         []runtime.Configuration[*runtime.Retry]
	RouteConfiguration         []runtime.Configuration[*runtime_chi.Runtime[context.Context]]
	Neo4jConfiguration         []runtime.Configuration[*runtime_neo4j.Neo4JConnectionBasicAuth]
}

Wiring configuration

func (MaterialisePostgresqlOneToOneFunctionConfiguration[S, IK, IV]) Register added in v0.1.1

type Prebuilt added in v0.2.7

type Prebuilt interface {
	Register(ci inverse.Container)
}

type RouterAdapterConfiguration added in v0.0.16

type RouterAdapterConfiguration[Request any, InputKey any, InputValue any] struct {
	Name                       string
	Path                       string
	ProduceTopic               flow.Topic[InputKey, InputValue]
	ProduceBroker              string
	RequestBodyFormat          format.Format[Request]
	RequestMapFunction         stateless.OneToOneFunction[structure.Bytes, Request, InputKey, InputValue]
	HttpPort                   int
	KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer]
	RouteConfiguration         []runtime.Configuration[*runtime_chi.Runtime[context.Context]]
}

Wiring configuration

func (RouterAdapterConfiguration[Request, InputKey, InputValue]) Register added in v0.1.1

func (c RouterAdapterConfiguration[Request, InputKey, InputValue]) Register(ci inverse.Container)

type RuntimeFacade added in v0.0.13

type RuntimeFacade struct {
	Runtimes []runtime.Runtime
}

func (*RuntimeFacade) Start added in v0.0.13

func (r *RuntimeFacade) Start() error

func (*RuntimeFacade) Stop added in v0.0.13

func (r *RuntimeFacade) Stop()

type StatefulPostgresqlOneToOneFunctionConfiguration added in v0.0.15

type StatefulPostgresqlOneToOneFunctionConfiguration[S any, IK any, IV any, OK any, OV any] struct {
	Name                       string
	InputTopic                 flow.Topic[IK, IV]
	OutputTopic                flow.Topic[OK, OV]
	Function                   stateful.OneToOneFunction[S, IK, IV, OK, OV]
	InputBroker                string
	OutputBroker               string
	HttpPort                   int
	StateFormat                format.Format[S]
	StateKeyFunction           stateful.PersistenceIdFunction[IK, IV]
	PostgresTable              string
	PostgresConnectionString   string
	PostgresqlConfiguration    []runtime.Configuration[*runtime_bun.PostgresqlConnection]
	KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer]
	KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer]
	RetryConfiguration         []runtime.Configuration[*runtime.Retry]
	RouteConfiguration         []runtime.Configuration[*runtime_chi.Runtime[context.Context]]
	Neo4jConfiguration         []runtime.Configuration[*runtime_neo4j.Neo4JConnectionBasicAuth]
}

Wiring configuration

func (StatefulPostgresqlOneToOneFunctionConfiguration[S, IK, IV, OK, OV]) Register added in v0.1.1

func (c StatefulPostgresqlOneToOneFunctionConfiguration[S, IK, IV, OK, OV]) Register(ci inverse.Container)

type StatefulPostgresqlOneToTwoFunctionConfiguration added in v0.0.15

type StatefulPostgresqlOneToTwoFunctionConfiguration[S any, IK any, IV any, OK1 any, OV1 any, OK2 any, OV2 any] struct {
	Name                       string
	InputTopic                 flow.Topic[IK, IV]
	OutputTopicOne             flow.Topic[OK1, OV1]
	OutputTopicTwo             flow.Topic[OK2, OV2]
	Function                   stateful.OneToTwoFunction[S, IK, IV, OK1, OV1, OK2, OV2]
	InputBroker                string
	OutputBroker               string
	HttpPort                   int
	StateFormat                format.Format[S]
	StateKeyFunction           stateful.PersistenceIdFunction[IK, IV]
	PostgresTable              string
	PostgresConnectionString   string
	PostgresqlConfiguration    []runtime.Configuration[*runtime_bun.PostgresqlConnection]
	KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer]
	KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer]
	RetryConfiguration         []runtime.Configuration[*runtime.Retry]
	RouteConfiguration         []runtime.Configuration[*runtime_chi.Runtime[context.Context]]
	Neo4jConfiguration         []runtime.Configuration[*runtime_neo4j.Neo4JConnectionBasicAuth]
}

Wiring configuration

func (StatefulPostgresqlOneToTwoFunctionConfiguration[S, IK, IV, OK1, OV1, OK2, OV2]) Register added in v0.1.1

func (c StatefulPostgresqlOneToTwoFunctionConfiguration[S, IK, IV, OK1, OV1, OK2, OV2]) Register(ci inverse.Container)

type StatelessOneToOneConfiguration added in v0.0.15

type StatelessOneToOneConfiguration[IK any, IV any, OK any, OV any] struct {
	Name                       string
	InputTopic                 flow.Topic[IK, IV]
	OutputTopic                flow.Topic[OK, OV]
	Function                   stateless.OneToOneFunction[IK, IV, OK, OV]
	ErrorHandler               stateless.ErrorHandlerFunction
	InputBroker                string
	OutputBroker               string
	HttpPort                   int
	KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer]
	KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer]
	RetryConfiguration         []runtime.Configuration[*runtime.Retry]
	RouteConfiguration         []runtime.Configuration[*runtime_chi.Runtime[context.Context]]
	Neo4jConfiguration         []runtime.Configuration[*runtime_neo4j.Neo4JConnectionBasicAuth]
}

func (StatelessOneToOneConfiguration[IK, IV, OK, OV]) Register added in v0.1.1

func (c StatelessOneToOneConfiguration[IK, IV, OK, OV]) Register(ci inverse.Container)

type StatelessOneToOneExplodeConfiguration added in v0.0.15

type StatelessOneToOneExplodeConfiguration[IK any, IV any, OK any, OV any] struct {
	Name                       string
	InputTopic                 flow.Topic[IK, IV]
	OutputTopic                flow.Topic[OK, OV]
	Function                   stateless.OneToOneExplodeFunction[IK, IV, OK, OV]
	InputBroker                string
	OutputBroker               string
	HttpPort                   int
	KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer]
	KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer]
	RetryConfiguration         []runtime.Configuration[*runtime.Retry]
	RouteConfiguration         []runtime.Configuration[*runtime_chi.Runtime[context.Context]]
	Neo4jConfiguration         []runtime.Configuration[*runtime_neo4j.Neo4JConnectionBasicAuth]
}

func (StatelessOneToOneExplodeConfiguration[IK, IV, OK, OV]) Register added in v0.1.1

func (c StatelessOneToOneExplodeConfiguration[IK, IV, OK, OV]) Register(ci inverse.Container)

type StatelessOneToTwoConfiguration added in v0.0.15

type StatelessOneToTwoConfiguration[IK any, IV any, OK1 any, OV1 any, OK2 any, OV2 any] struct {
	Name                       string
	InputTopic                 flow.Topic[IK, IV]
	OutputTopicOne             flow.Topic[OK1, OV1]
	OutputTopicTwo             flow.Topic[OK2, OV2]
	Function                   stateless.OneToTwoFunction[IK, IV, OK1, OV1, OK2, OV2]
	InputBroker                string
	OutputBroker               string
	HttpPort                   int
	KafkaProducerConfiguration []runtime.Configuration[*runtime_sarama.Producer]
	KafkaConsumerConfiguration []runtime.Configuration[*runtime_sarama.Consumer]
	RetryConfiguration         []runtime.Configuration[*runtime.Retry]
	RouteConfiguration         []runtime.Configuration[*runtime_chi.Runtime[context.Context]]
	Neo4jConfiguration         []runtime.Configuration[*runtime_neo4j.Neo4JConnectionBasicAuth]
}

func (StatelessOneToTwoConfiguration[IK, IV, OK1, OV1, OK2, OV2]) Register added in v0.1.1

func (c StatelessOneToTwoConfiguration[IK, IV, OK1, OV1, OK2, OV2]) Register(ci inverse.Container)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL