transport

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2021 License: MIT Imports: 7 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EncodeJSONRequest added in v0.2.0

func EncodeJSONRequest(_ context.Context, msg *kafka.Message, request interface{}) error

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(
	e endpoint.Endpoint,
	dec DecodeRequestFunc,
	opts ...ConsumerOption,
) *Consumer

func (Consumer) Handle

func (c Consumer) Handle(ctx context.Context, msg kafka.Message) (err error)

type ConsumerFinalizerFunc

type ConsumerFinalizerFunc func(ctx context.Context, msg *kafka.Message, err error)

type ConsumerOption

type ConsumerOption func(consumer *Consumer)

func ConsumerAfter

func ConsumerAfter(after ...ConsumerResponseFunc) ConsumerOption

func ConsumerBefore

func ConsumerBefore(before ...RequestFunc) ConsumerOption

func ConsumerErrorHandler

func ConsumerErrorHandler(errorHandler transport.ErrorHandler) ConsumerOption

func ConsumerErrorLogger

func ConsumerErrorLogger(logger log.Logger) ConsumerOption

func ConsumerFinalizer added in v0.2.0

func ConsumerFinalizer(f ...ConsumerFinalizerFunc) ConsumerOption

type ConsumerResponseFunc

type ConsumerResponseFunc func(ctx context.Context, response interface{}) context.Context

type DecodeRequestFunc

type DecodeRequestFunc func(ctx context.Context, msg kafka.Message) (request interface{}, err error)

type EncodeRequestFunc added in v0.2.0

type EncodeRequestFunc func(context.Context, *kafka.Message, interface{}) error

type EncodeResponseFunc added in v0.2.0

type EncodeResponseFunc func(context.Context, interface{}) error

type Handlers added in v0.1.5

type Handlers map[string][]kafka.Handler

Handlers represents Topic -> []Handler mapping

type Producer added in v0.2.0

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer added in v0.2.0

func NewProducer(
	handler kafka.Handler,
	topic string,
	enc EncodeRequestFunc,
	options ...ProducerOption,
) *Producer

func (Producer) Endpoint added in v0.2.0

func (p Producer) Endpoint() endpoint.Endpoint

type ProducerFinalizerFunc added in v0.2.0

type ProducerFinalizerFunc func(ctx context.Context, err error)

type ProducerOption added in v0.2.0

type ProducerOption func(consumer *Producer)

func ProducerAfter added in v0.2.0

func ProducerAfter(after ...ProducerResponseFunc) ProducerOption

func ProducerBefore added in v0.2.0

func ProducerBefore(before ...RequestFunc) ProducerOption

func ProducerFinalizer added in v0.2.0

func ProducerFinalizer(f ...ProducerFinalizerFunc) ProducerOption

func ProducerResponse added in v0.2.1

func ProducerResponse(response interface{}) ProducerOption

type ProducerResponseFunc added in v0.2.0

type ProducerResponseFunc func(ctx context.Context) context.Context

type RequestFunc added in v0.2.0

type RequestFunc func(ctx context.Context, msg *kafka.Message) context.Context

type Router added in v0.1.4

type Router struct {
	// contains filtered or unexported fields
}

func NewRouter added in v0.1.4

func NewRouter(opts ...RouterOption) *Router

func (*Router) AddHandler added in v0.1.4

func (r *Router) AddHandler(topic string, handler kafka.Handler) *Router

func (Router) Handle added in v0.1.4

func (r Router) Handle(ctx context.Context, msg kafka.Message) error

func (Router) Handlers added in v0.1.6

func (r Router) Handlers() Handlers

type RouterOption added in v0.1.4

type RouterOption func(*Router)

func RouterWithHandler added in v0.1.4

func RouterWithHandler(topic string, handler kafka.Handler) RouterOption

func RouterWithHandlers added in v0.1.5

func RouterWithHandlers(handlers Handlers) RouterOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL