Documentation ¶
Index ¶
- func ClusterConsumer(cfg *Config, consumerGroup string, topics []string, opts ...Option) (*cluster.Consumer, error)
- func Consumer(cfg *Config, opts ...Option) (sarama.Consumer, error)
- func MakeTopicName(prefix, env, boundedContext string, args ...string) string
- func NewPublisher(ctx context.Context, producer sarama.SyncProducer, topic string) eventsourcex.PublisherFunc
- func Producer(cfg *Config, opts ...Option) (sarama.SyncProducer, error)
- type Config
- type Option
- type Subscription
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ClusterConsumer ¶
func ClusterConsumer(cfg *Config, consumerGroup string, topics []string, opts ...Option) (*cluster.Consumer, error)
ClusterConsumer creates a clustered kafka consumer that uses kafka's built in offset tracking mechanism to manage offsets.
func MakeTopicName ¶
MakeTopicName returns the topic name for the arguments provided; prefix is a convenience for the Heroku Kafka topic prefix
func NewPublisher ¶
func NewPublisher(ctx context.Context, producer sarama.SyncProducer, topic string) eventsourcex.PublisherFunc
NewPublisher creates a kafka publisher
Types ¶
type Config ¶
Config contains the configuration parameters for the kafka producer
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription provides a reference to a running stream
func SubscribeStream ¶
func SubscribeStream(ctx context.Context, consumer sarama.Consumer, topic string, h eventsourcex.Handler) (*Subscription, error)
SubscribeStream subscribes a message handler to the specified Kafka topic
func (*Subscription) Done ¶
func (s *Subscription) Done() <-chan struct{}
Done waits for the subscription to be finished
Click to show internal directories.
Click to hide internal directories.