Documentation
¶
Overview ¶
Package kafka provides a Kafka abstraction through github.com/Shopify/sarama.
WARNING: there's no message ack-ing done by Subscriber, so no automatic resuming from last-processed message. Subscribing is done only from oldest-known or newest or manually-specified numeric offset.
Both bps.NewPublisher (`kafka` + `kafka+sync` schemes) and bps.NewConsumer (`kafka` scheme) support the following query parameters:
client.id A user-provided string sent with every request to the brokers for logging debugging, and auditing purposes. rack.id A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config 'broker.rack'. net.max.requests How many outstanding requests a connection is allowed to have before sending on it blocks (default 5). net.dial.timeout How long to wait for the initial connection (default 30s). net.read.timeout How long to wait for a response (default 30s). net.write.timeout How long to wait for for a transmit (default 30s). net.tls.enable Whether or not to use TLS when connecting to the broker (defaults to false). kafka.version The version of the kafka server. channel.buffer.size The number of events to buffer in internal and external channels. This permits the producer to continue processing some messages in the background while user code is working, greatly improving throughput (default 256).
bps.NewPublisher supports `kafka` + `kafka+sync` schemes and the following query parameters:
acks The number of acks required before considering a request complete. When acks=0, the producer will not wait for any acknowledgment. When acks=1 the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. When acks=all the leader will wait for the full set of in-sync replicas to acknowledge the record. message.max.bytes The maximum permitted size of a message (default 1,000,000). Should be set equal to or smaller than the broker's `message.max.bytes`. compression.type The compression type for all data generated by the producer. Valid values are: none, gzip, snappy, lz4 (default none). partitioner The partitioner used to partition messages (defaults to hashing the message ID). Valid values are: hash, random, roundrobin, timeout The maximum duration the broker will wait the receipt of the number of acks (default 10s). This is only relevant when acks=all or a number > 1. flush.bytes he best-effort number of bytes needed to trigger a flush. flush.messages The best-effort number of messages needed to trigger a flush. flush.frequency The best-effort frequency of flushes. retry.max The total number of times to retry sending a message (default 3). retry.backoff How long to wait for the cluster to settle between retries (default 100ms).
bps.NewConsumer (`kafka://` scheme) supports:
offsets.initial Offset to start consuming from. Can be "oldest" (oldest available message) or "newest" (only new messages - produced after subscribing) or just numeric offset value.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher wraps a kafka producer and implements the bps.Publisher interface.
Example ¶
package main import ( "context" "github.com/bsm/bps" _ "github.com/bsm/bps/kafka" ) func main() { ctx := context.TODO() pub, err := bps.NewPublisher(ctx, "kafka://10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092/?client.id=my-client&kafka.version=2.3.0&channel.buffer.size=1024") if err != nil { panic(err.Error()) } defer pub.Close() if err := pub.Topic("topic").Publish(ctx, &bps.PubMessage{Data: []byte("message")}); err != nil { panic(err.Error()) } }
Output:
func NewPublisher ¶
NewPublisher inits a new async publisher.
func (*Publisher) Producer ¶
func (p *Publisher) Producer() sarama.AsyncProducer
Producer exposes the native producer. Use at your own risk!
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber wraps a kafka consumer and implements the bps.Subscriber interface.
Example ¶
package main import ( "context" "fmt" "time" "github.com/bsm/bps" _ "github.com/bsm/bps/kafka" ) func main() { subscriber, err := bps.NewSubscriber(context.TODO(), "kafka://10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092/?client.id=my-client&kafka.version=2.3.0&channel.buffer.size=1024") if err != nil { panic(err.Error()) } defer subscriber.Close() subscription, err := subscriber.Topic("topic").Subscribe( bps.HandlerFunc(func(msg bps.SubMessage) { _, _ = fmt.Printf("%s\n", string(msg.Data())) }), bps.StartAt(bps.PositionOldest), ) if err != nil { panic(err.Error()) } defer subscription.Close() time.Sleep(time.Second) // wait to receive some messages }
Output:
func NewSubscriber ¶
func NewSubscriber(addrs []string, config *sarama.Config) (*Subscriber, error)
NewSubscriber inits a new subscriber. By default, it starts handling from the newest available message (published after subscribing).
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
Close implements the bps.Subscriber interface.
type SyncPublisher ¶
type SyncPublisher struct {
// contains filtered or unexported fields
}
SyncPublisher wraps a synchronous kafka producer and implements the bps.Publisher interface.
func NewSyncPublisher ¶
func NewSyncPublisher(addrs []string, config *sarama.Config) (*SyncPublisher, error)
NewSyncPublisher inits a new async publisher.
func (*SyncPublisher) Close ¶
func (p *SyncPublisher) Close() error
Close implements the bps.Publisher interface.
func (*SyncPublisher) Producer ¶
func (p *SyncPublisher) Producer() sarama.SyncProducer
Producer exposes the native producer. Use at your own risk!