Documentation
¶
Overview ¶
Package kitkafka provides a kafka transport for go kit.
Introduction ¶
Go kit has some great properties, such as allowing multiple transport to be used simultaneously. Sadly it limits itself to only support RPCs. in real projects with many decoupled component, messaging is an inevitable path we must go down.
Go kit models the RPCs as:
func(context.Context, request interface{}) (response interface{}, err error)
Package kitkafka treat messaging as a special case of RPC, where the response is always ignored. By using the same model, package kitkafka brings all go kit endpoint into the hood.
See examples for go kit project with kafka as transport.
Integration ¶
kitkafka exports the configuration in this format:
kafka: writer: foo: brokers: - localhost:9092 topic: foo reader: bar: brokers: - localhost:9092 topic: bar groupID: bar-group
For a complete overview of all available options, call the config init command.
To use package kitkafka with package core, add:
var c *core.C = core.New() c.Provide(kitkafka.Providers())
The reader and writer factories are bundled into that single provider.
Standalone Usage ¶
in some scenarios, the whole go kit family might be overkill. To directly interact with kafka, use the factory to make writers and readers. Those writers/readers are provided by github.com/segmentio/kafka-go.
c.Invoke(func(writer *kafka.Writer) { writer.WriteMessage(kafka.Message{}) })
Index ¶
- func EncodeMarshaller(ctx context.Context, msg *kafka.Message, request interface{}) error
- func ErrHandler(logger log.Logger) transport.ErrorHandler
- func MakeClient(writer *kafka.Writer) (*writerHandle, error)
- type DecodeRequestFunc
- type DecodeResponseFunc
- type EncodeRequestFunc
- type EncodeResponseFunc
- type HandleFunc
- type Handler
- type Publisher
- type PublisherOpt
- type PublisherOption
- type PublisherService
- type Reader
- type ReaderOpt
- type RequestResponseFunc
- type Server
- type Subscriber
- type SubscriberClientMux
- type SubscriberOption
- type SubscriberServer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func EncodeMarshaller ¶
EncodeMarshaller encodes the user-domain request object into a *kafka.Message. The request object must implement contract.Marshaller. Protobuf objects implemented this interface out of box.
func ErrHandler ¶
func ErrHandler(logger log.Logger) transport.ErrorHandler
ErrHandler is a transport handler that logs the kafka error message at warning level.
func MakeClient ¶
func MakeClient(writer *kafka.Writer) (*writerHandle, error)
MakeClient creates an Handler. This handler can write *kafka.Message to kafka broker. The Handler is mean to be consumed by NewPublisher.
Types ¶
type DecodeRequestFunc ¶
DecodeRequestFunc extracts a user-domain request object from a *kafka.Message. It is designed to be used in Kafka Subscribers.
type DecodeResponseFunc ¶
DecodeResponseFunc extracts a user-domain response object from an *kafka.Message. It is designed to be used in Kafka Publishers.
type EncodeRequestFunc ¶
EncodeRequestFunc encodes the passed request object into a *kafka.Message. It is designed to be used in Kafka Publishers.
type EncodeResponseFunc ¶
EncodeResponseFunc encodes the passed response object to a *kafka.Message. It is designed to be used in Kafka Subscribers.
type HandleFunc ¶
HandleFunc is a functional Handler.
type Handler ¶
Handler is a symmetric interface for both kafka publication and subscription. As a publisher handler, it is responsible to writes the kafka message to kafka brokers. As a subscriber handler, it is responsible to pipe kafka message to endpoints layer. in go kit analog, this is a go kit transport.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher wraps a kafka client, and provides a method that implements endpoint.Endpoint.
func NewPublisher ¶
func NewPublisher( handler Handler, enc EncodeRequestFunc, options ...PublisherOption, ) *Publisher
NewPublisher constructs a usable Publisher for a single remote method.
type PublisherOpt ¶
type PublisherOpt func(config *publisherConfig)
A PublisherOpt is an option that configures publisher.
type PublisherOption ¶
type PublisherOption func(*Publisher)
PublisherOption sets an optional parameter for clients.
func PublisherAfter ¶
func PublisherAfter(after ...RequestResponseFunc) PublisherOption
PublisherAfter sets the ClientResponseFuncs applied to the incoming kafka request prior to it being decoded. This is useful for obtaining anything off of the response and adding onto the context prior to decoding.
func PublisherBefore ¶
func PublisherBefore(before ...RequestResponseFunc) PublisherOption
PublisherBefore sets the RequestFuncs that are applied to the outgoing kafka request before it's invoked.
func PublisherTimeout ¶
func PublisherTimeout(timeout time.Duration) PublisherOption
PublisherTimeout sets the available timeout for an kafka request.
type PublisherService ¶
type PublisherService struct {
// contains filtered or unexported fields
}
PublisherService is a go kit service with one method, publish.
func MakePublisherService ¶
func MakePublisherService(endpoint endpoint.Endpoint, opt ...PublisherOpt) *PublisherService
MakePublisherService returns a *PublisherService that can publish user-domain messages to kafka brokers. in go kit analog, this is a service with one method, publish.
type Reader ¶
type Reader interface { Close() error ReadMessage(ctx context.Context) (kafka.Message, error) FetchMessage(ctx context.Context) (kafka.Message, error) CommitMessages(ctx context.Context, msgs ...kafka.Message) error }
Reader models a kafka.Reader
type ReaderOpt ¶
type ReaderOpt func(config *subscriberConfig)
ReaderOpt are options that configures the kafka reader.
func WithParallelism ¶
WithParallelism configures the parallelism of fan out workers.
func WithSyncCommit ¶
func WithSyncCommit() ReaderOpt
WithSyncCommit is an kafka option that when enabled, only commit the message synchronously if no error is returned from the endpoint.
type RequestResponseFunc ¶
RequestResponseFunc may take information from a publisher request and put it into a request context. in Subscribers, RequestResponseFunc are executed prior to invoking the endpoint.
func ContextToKafka ¶
func ContextToKafka(tracer opentracing.Tracer, logger log.Logger) RequestResponseFunc
ContextToKafka returns an kafka RequestResponseFunc that injects an OpenTracing Span found in `ctx` into the http headers. If no such Span can be found, the RequestFunc is a noop.
func KafkaToContext ¶
func KafkaToContext(tracer opentracing.Tracer, operationName string, logger log.Logger) RequestResponseFunc
KafkaToContext returns an http RequestFunc that tries to join with an OpenTracing trace found in `req` and starts a new Span called `operationName` accordingly. If no trace could be found in `req`, the Span will be a trace root. The Span is incorporated in the returned Context and can be retrieved with opentracing.SpanFromContext(ctx).
type Server ¶
Server models a kafka server. It will block until context canceled. Server usually start serving when the application boot up.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber is go kit transport layer that wraps an endpoint. It is a handler for SubscriberServer.
func NewSubscriber ¶
func NewSubscriber( e endpoint.Endpoint, dec DecodeRequestFunc, options ...SubscriberOption, ) *Subscriber
NewSubscriber constructs a new subscriber, which provides a handler for Kafka messages.
type SubscriberClientMux ¶
type SubscriberClientMux struct {
// contains filtered or unexported fields
}
SubscriberClientMux is a group of kafka Server. Useful when consuming from multiple topics.
func NewMux ¶
func NewMux(servers ...Server) SubscriberClientMux
NewMux creates a SubscriberClientMux, which is a group of kafka servers.
type SubscriberOption ¶
type SubscriberOption func(*Subscriber)
SubscriberOption sets an optional parameter for subscribers.
func SubscriberAfter ¶
func SubscriberAfter(after ...RequestResponseFunc) SubscriberOption
SubscriberAfter functions are executed on the subscriber reply after the endpoint is invoked, but before anything is published to the reply.
func SubscriberBefore ¶
func SubscriberBefore(before ...RequestResponseFunc) SubscriberOption
SubscriberBefore functions are executed on the publisher delivery object before the request is decoded.
func SubscriberErrorHandler ¶
func SubscriberErrorHandler(errorHandler transport.ErrorHandler) SubscriberOption
SubscriberErrorHandler is used to handle non-terminal errors. By default, non-terminal errors are ignored. This is intended as a diagnostic measure. Finer-grained control of error handling, including logging in more detail, should be performed in a custom SubscriberErrorEncoder which has access to the context.
func SubscriberErrorLogger ¶
func SubscriberErrorLogger(logger log.Logger) SubscriberOption
SubscriberErrorLogger is used to log non-terminal errors. By default, no errors are logged. This is intended as a diagnostic measure. Finer-grained control of error handling, including logging in more detail, should be performed in a custom SubscriberErrorEncoder which has access to the context. Deprecated: Use SubscriberErrorHandler instead.
type SubscriberServer ¶
type SubscriberServer struct {
// contains filtered or unexported fields
}
SubscriberServer is a kafka server that continuously consumes messages from kafka. It implements Server. The SubscriberServer internally uses a fan out model, where only one goroutine communicate with kafka, but distribute messages to many parallel worker goroutines. However, this means manual offset commit is also impossible, making it not suitable for tasks that demands strict consistency. An option, WithSyncCommit is provided, for such high consistency tasks. in Sync Commit mode, Server synchronously commit offset to kafka when the error returned by the Handler is Nil.
func MakeSubscriberServer ¶
func MakeSubscriberServer(reader *kafka.Reader, subscriber Handler, opt ...ReaderOpt) (*SubscriberServer, error)
MakeSubscriberServer creates a *SubscriberServer. Subscriber is the go kit transport layer equivalent.
func (*SubscriberServer) Serve ¶
func (s *SubscriberServer) Serve(ctx context.Context) error
Serve starts the Server. *SubscriberServer will connect to kafka immediately and continuously consuming messages from it. Note Serve uses consumer groups, so Serve can be called on multiple node for the same topic without manually balancing partitions.