Documentation ¶
Overview ¶
Package otkafka contains the opentracing integrated a kafka transport for package Core. The underlying kafka library is kafka-go: https://github.com/segmentio/kafka-go.
Integration ¶
otkafka exports the configuration in this format:
kafka: writer: foo: brokers: - localhost:9092 topic: foo reader: bar: brokers: - localhost:9092 topic: bar groupID: bar-group
For a complete overview of all available options, call the config init command.
To use package otkafka with package core, add:
var c *core.C = core.New() c.Provide(otkafka.Providers())
The reader and writer factories are bundled into that single provider.
Standalone Usage ¶
in some scenarios, the whole go kit family might be overkill. To directly interact with kafka, use the factory to make writers and readers. Those writers/readers are provided by github.com/segmentio/kafka-go.
c.Invoke(func(writer *kafka.Writer) { writer.WriteMessage(kafka.Message{}) })
Example (Reader) ¶
var config = ` log: level: none kafka: reader: default: brokers: - localhost:9092 topic: example writer: default: brokers: - localhost:9092 topic: example ` c := core.Default(core.WithConfigStack(rawbytes.Provider([]byte(config)), yaml.Parser())) c.Provide(otkafka.Providers()) c.Invoke(func(writer *kafka.Writer) { err := writer.WriteMessages(context.Background(), kafka.Message{Value: []byte(`hello`)}) if err != nil { panic(err) } }) c.Invoke(func(reader *kafka.Reader) { msg, err := reader.ReadMessage(context.Background()) if err != nil { panic(err) } fmt.Println(string(msg.Value)) })
Output: hello
Index ¶
- func Providers() []interface{}
- func SpanFromMessage(ctx context.Context, tracer opentracing.Tracer, message *kafka.Message) (opentracing.Span, context.Context, error)
- type KafkaLogAdapter
- type ReaderConfig
- type ReaderFactory
- type ReaderInterceptor
- type ReaderMaker
- type Transport
- type Writer
- type WriterConfig
- type WriterFactory
- type WriterInterceptor
- type WriterMaker
- type WriterOption
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Providers ¶
func Providers() []interface{}
Providers is a set of dependencies including ReaderMaker, WriterMaker and exported configs.
Depends On: ReaderInterceptor `optional:"true"` WriterInterceptor `optional:"true"` contract.ConfigAccessor log.Logger Provide: ReaderFactory WriterFactory ReaderMaker WriterMaker
Types ¶
type KafkaLogAdapter ¶
KafkaLogAdapter is an log adapter bridging kitlog and kafka.
func (KafkaLogAdapter) Printf ¶
func (k KafkaLogAdapter) Printf(s string, i ...interface{})
Printf implements kafka log interface.
type ReaderConfig ¶
type ReaderConfig struct { // The list of broker addresses used to connect to the kafka cluster. Brokers []string `json:"brokers" yaml:"brokers"` // GroupID holds the optional consumer group id. If GroupID is specified, then // Partition should NOT be specified e.g. 0 GroupID string `json:"groupId" yaml:"groupID"` // The topic to read messages from. Topic string `json:"topic" yaml:"topic"` // Partition to read messages from. Either Partition or GroupID may // be assigned, but not both Partition int `json:"partition" yaml:"partition"` // The capacity of the internal message queue, defaults to 100 if none is // set. QueueCapacity int `json:"queue_capacity" yaml:"queue_capacity"` // Min and max number of bytes to fetch from kafka in each request. MinBytes int `json:"minBytes" yaml:"minBytes"` MaxBytes int `json:"maxBytes" yaml:"maxBytes"` // Maximum amount of time to wait for new data to come when fetching batches // of messages from kafka. MaxWait time.Duration `json:"maxWait" yaml:"maxWait"` // ReadLagInterval sets the frequency at which the reader lag is updated. // Setting this field to a negative value disables lag reporting. ReadLagInterval time.Duration `json:"readLagInterval" yaml:"readLagInterval"` // HeartbeatInterval sets the optional frequency at which the reader sends the consumer // group heartbeat update. // // Default: 3s // // Only used when GroupID is set HeartbeatInterval time.Duration `json:"heartbeatInterval" yaml:"heartbeatInterval"` // CommitInterval indicates the interval at which offsets are committed to // the broker. If 0, commits will be handled synchronously. // // Default: 0 // // Only used when GroupID is set CommitInterval time.Duration `json:"commitInterval" yaml:"commitInterval"` // PartitionWatchInterval indicates how often a reader checks for partition changes. // If a reader sees a partition change (such as a partition add) it will rebalance the group // picking up new partitions. // // Default: 5s // // Only used when GroupID is set and WatchPartitionChanges is set. PartitionWatchInterval time.Duration `json:"partitionWatchInterval" yaml:"partitionWatchInterval"` // WatchForPartitionChanges is used to inform kafka-go that a consumer group should be // polling the brokers and rebalancing if any partition changes happen to the topic. WatchPartitionChanges bool `json:"watchPartitionChanges" yaml:"watchPartitionChanges"` // SessionTimeout optionally sets the length of time that may pass without a heartbeat // before the coordinator considers the consumer dead and initiates a rebalance. // // Default: 30s // // Only used when GroupID is set SessionTimeout time.Duration `json:"sessionTimeout" yaml:"sessionTimeout"` // RebalanceTimeout optionally sets the length of time the coordinator will wait // for members to join as part of a rebalance. For kafka servers under higher // load, it may be useful to set this value higher. // // Default: 30s // // Only used when GroupID is set RebalanceTimeout time.Duration `json:"rebalanceTimeout" yaml:"rebalanceTimeout"` // JoinGroupBackoff optionally sets the length of time to wait between re-joining // the consumer group after an error. // // Default: 5s JoinGroupBackoff time.Duration `json:"joinGroupBackoff" yaml:"joinGroupBackoff"` // RetentionTime optionally sets the length of time the consumer group will be saved // by the broker // // Default: 24h // // Only used when GroupID is set RetentionTime time.Duration `json:"retentionTime" yaml:"retentionTime"` // StartOffset determines from whence the consumer group should begin // consuming when it finds a partition without a committed offset. If // non-zero, it must be set to one of FirstOffset or LastOffset. // // Default: FirstOffset // // Only used when GroupID is set StartOffset int64 `json:"startOffset" yaml:"startOffset"` // BackoffDelayMin optionally sets the smallest amount of time the reader will wait before // polling for new messages // // Default: 100ms ReadBackoffMin time.Duration `json:"readBackoffMin" yaml:"readBackoffMin"` // BackoffDelayMax optionally sets the maximum amount of time the reader will wait before // polling for new messages // // Default: 1s ReadBackoffMax time.Duration `json:"readBackoffMax" yaml:"readBackoffMax"` // Limit of how many attempts will be made before delivering the error. // // The default is to try 3 times. MaxAttempts int `json:"maxAttempts" yaml:"maxAttempts"` }
ReaderConfig is a configuration object used to create new instances of Reader.
type ReaderFactory ¶
ReaderFactory is a *di.Factory that creates *kafka.Reader.
Unlike other database providers, the kafka factories don't bundle a default kafka reader/writer. It is suggested to use Topic name as the identifier of kafka config rather than an opaque name such as default.
func (ReaderFactory) Make ¶
func (k ReaderFactory) Make(name string) (*kafka.Reader, error)
Make returns a *kafka.Reader under the provided configuration entry.
type ReaderInterceptor ¶
type ReaderInterceptor func(name string, reader *kafka.ReaderConfig)
ReaderInterceptor is an interceptor that makes last minute change to a *kafka.ReaderConfig during kafka.Reader's creation
type ReaderMaker ¶
ReaderMaker models a ReaderFactory
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport is a type which traces the interacting with kafka brokers.
func NewTransport ¶
func NewTransport(underlying kafka.RoundTripper, tracer opentracing.Tracer) *Transport
NewTransport creates a new kafka transport
type Writer ¶
type Writer struct { *kafka.Writer // contains filtered or unexported fields }
Writer is a decorator around kafka.Writer that provides tracing capabilities.
func Trace ¶
func Trace(writer *kafka.Writer, tracer opentracing.Tracer, opts ...WriterOption) *Writer
Trace takes a kafka.Writer and returns a decorated Writer.
func (*Writer) WriteMessages ¶
WriteMessages writes a batch of messages to the kafka topic configured on this writer. Each message written has been injected tracing headers. The upstream consumer can extract tracing spans from kafka headers, forming a distributed tracing via messaging.
type WriterConfig ¶
type WriterConfig struct { // The list of brokers used to discover the partitions available on the // kafka cluster. // // This field is required, attempting to create a writer with an empty list // of brokers will panic. Brokers []string `json:"brokers" yaml:"brokers"` // The topic that the writer will produce messages to. // // If provided, this will be used to set the topic for all produced messages. // If not provided, each Message must specify a topic for itself. This must be // mutually exclusive, otherwise the Writer will return an error. Topic string `json:"topic" yaml:"topic"` // Limit on how many attempts will be made to deliver a message. // // The default is to try at most 10 times. MaxAttempts int `json:"maxAttempts" yaml:"maxAttempts"` // Limit on how many messages will be buffered before being sent to a // partition. // // The default is to use a target batch size of 100 messages. BatchSize int `json:"batchSize" yaml:"batchSize"` // Limit the maximum size of a request in bytes before being sent to // a partition. // // The default is to use a kafka default value of 1048576. BatchBytes int `json:"batchBytes" yaml:"batchBytes"` // Time limit on how often incomplete message batches will be flushed to // kafka. // // The default is to flush at least every second. BatchTimeout time.Duration `json:"batchTimeout" yaml:"batchTimeout"` // Timeout for read operations performed by the Writer. // // Defaults to 10 seconds. ReadTimeout time.Duration `json:"readTimeout" yaml:"readTimeout"` // Timeout for write operation performed by the Writer. // // Defaults to 10 seconds. WriteTimeout time.Duration `json:"writeTimeout" yaml:"writeTimeout"` // DEPRECATED: in versions prior to 0.4, the writer used to maintain a cache // the topic layout. With the change to use a transport to manage connections, // the responsibility of syncing the cluster layout has been delegated to the // transport. RebalanceInterval time.Duration `json:"rebalanceInterval" yaml:"rebalanceInterval"` // Number of acknowledges from partition replicas required before receiving // a response to a produce request. The default is -1, which means to wait for // all replicas, and a value above 0 is required to indicate how many replicas // should acknowledge a message to be considered successful. // // This version of kafka-go (v0.3) does not support 0 required acks, due to // some internal complexity implementing this with the Kafka protocol. If you // need that functionality specifically, you'll need to upgrade to v0.4. RequiredAcks int `json:"requiredAcks" yaml:"requiredAcks"` // Setting this flag to true causes the WriteMessages method to never block. // It also means that errors are ignored since the caller will not receive // the returned value. Use this only if you don't care about guarantees of // whether the messages were written to kafka. Async bool `json:"async" yaml:"async"` }
WriterConfig is a configuration type used to create new instances of Writer.
type WriterFactory ¶
WriterFactory is a *di.Factory that creates *kafka.Writer.
Unlike other database providers, the kafka factories don't bundle a default kafka reader/writer. It is suggested to use Topic name as the identifier of kafka config rather than an opaque name such as default.
func (WriterFactory) Make ¶
func (k WriterFactory) Make(name string) (*kafka.Writer, error)
Make returns a *kafka.Writer under the provided configuration entry.
type WriterInterceptor ¶
type WriterInterceptor func(name string, writer *kafka.Writer)
WriterInterceptor is an interceptor that makes last minute change to a *kafka.Writer during its creation
type WriterMaker ¶
WriterMaker models a WriterFactory
type WriterOption ¶
type WriterOption func(writer *Writer)
WriterOption is type that configures the Writer.
func WithLogger ¶
func WithLogger(logger log.Logger) WriterOption
WithLogger is an option that provides logging to writer.