Documentation ¶
Overview ¶
Package transport provides a Kafka transport abstraction.
Index ¶
- func EncodeJSONRequest(_ context.Context, msg *kafka.Message, request interface{}) error
- type Consumer
- type ConsumerFinalizerFunc
- type ConsumerOption
- type ConsumerResponseFunc
- type DecodeRequestFunc
- type EncodeRequestFunc
- type EncodeResponseFunc
- type Producer
- type ProducerFinalizerFunc
- type ProducerOption
- type ProducerResponseFunc
- type RequestFunc
- type Router
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func EncodeJSONRequest ¶ added in v0.2.0
EncodeJSONRequest is an EncodeRequestFunc that serializes the request as a JSON object to the Message value. Many services can use it as a sensible default.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer wraps an endpoint and implements kafka.Handler.
func NewConsumer ¶
func NewConsumer( e endpoint.Endpoint, dec DecodeRequestFunc, opts ...ConsumerOption, ) *Consumer
NewConsumer constructs a new consumer, which implements kafka.Handler and wraps the provided endpoint.
type ConsumerFinalizerFunc ¶
ConsumerFinalizerFunc can be used to perform work at the end of message processing, after the response has been constructed. The principal intended use is for request logging.
type ConsumerOption ¶
type ConsumerOption func(consumer *Consumer)
ConsumerOption sets an optional parameter for a Consumer.
func ConsumerAfter ¶
func ConsumerAfter(after ...ConsumerResponseFunc) ConsumerOption
ConsumerAfter functions are executed on the consumer reply after the endpoint is invoked, but before anything is published to the reply.
func ConsumerBefore ¶
func ConsumerBefore(before ...RequestFunc) ConsumerOption
ConsumerBefore functions are executed on the consumer message object before the request is decoded.
func ConsumerErrorHandler ¶
func ConsumerErrorHandler(errorHandler transport.ErrorHandler) ConsumerOption
ConsumerErrorHandler is used to handle non-terminal errors. By default, non-terminal errors are ignored. This is intended as a diagnostic measure.
func ConsumerFinalizer ¶ added in v0.2.0
func ConsumerFinalizer(f ...ConsumerFinalizerFunc) ConsumerOption
ConsumerFinalizer is executed at the end of every message processing. By default, no finalizer is registered.
type ConsumerResponseFunc ¶
ConsumerResponseFunc may take information from a request context and use it to manipulate a Producer. ConsumerResponseFuncs are only executed in consumers, after invoking the endpoint but prior to publishing a reply.
type DecodeRequestFunc ¶
type DecodeRequestFunc func(ctx context.Context, msg *kafka.Message) (request interface{}, err error)
DecodeRequestFunc extracts a user-domain request object from an Kafka message. It is designed to be used in Kafka Consumers.
type EncodeRequestFunc ¶ added in v0.2.0
EncodeRequestFunc encodes the passed request object into an Kafka message object. It is designed to be used in Kafka Producers.
type EncodeResponseFunc ¶ added in v0.2.0
EncodeResponseFunc encodes the passed response object into an Kafka message object. It is designed to be used in Kafka Consumers.
type Producer ¶ added in v0.2.0
type Producer struct {
// contains filtered or unexported fields
}
Producer wraps single Kafka topic for message producing and implements endpoint.Endpoint.
func NewProducer ¶ added in v0.2.0
func NewProducer( handler kafka.Handler, topic string, enc EncodeRequestFunc, options ...ProducerOption, ) *Producer
NewProducer constructs a new producer for a single Kafka topic.
type ProducerFinalizerFunc ¶ added in v0.2.0
ProducerFinalizerFunc can be used to perform work at the end of a producing Kafka message, after response is returned. The principal intended use is for error logging.
type ProducerOption ¶ added in v0.2.0
type ProducerOption func(producer *Producer)
ProducerOption sets an optional parameter for a Producer.
func ProducerAfter ¶ added in v0.2.0
func ProducerAfter(after ...ProducerResponseFunc) ProducerOption
ProducerAfter adds one or more ProducerResponseFuncs, which are applied to the context after successful message producing. This is useful for context-manipulation operations.
func ProducerBefore ¶ added in v0.2.0
func ProducerBefore(before ...RequestFunc) ProducerOption
ProducerBefore sets the RequestFuncs that are applied to the outgoing producer request before it's invoked.
func ProducerFinalizer ¶ added in v0.2.0
func ProducerFinalizer(f ...ProducerFinalizerFunc) ProducerOption
ProducerFinalizer adds one or more ProducerFinalizerFuncs to be executed at the end of producing Kafka message. Finalizers are executed in the order in which they were added. By default, no finalizer is registered.
func ProducerResponse ¶ added in v0.2.1
func ProducerResponse(response interface{}) ProducerOption
ProducerResponse sets the successful response value for a Producer.
type ProducerResponseFunc ¶ added in v0.2.0
ProducerResponseFunc may take information from a request context. ProducerResponseFunc are only executed in producers, after a request has been produced.
type RequestFunc ¶ added in v0.2.0
RequestFunc may take information from a Kafka message and put it into a request context. In Consumers, RequestFuncs are executed prior to invoking the endpoint.
type Router ¶ added in v0.1.4
Router represents mapping topic -> []kafka.Handler and implements kafka.Handler with routing handlers by topic.
func (Router) AddHandler ¶ added in v0.1.4
AddHandler appends the kafka.Handler for specific topic.