Documentation ¶
Index ¶
- Constants
- Variables
- func SetMaxMessageSize(maxSize int32)
- type AsyncProducer
- type ConsumerGroup
- func (cg *ConsumerGroup) Channels() *ConsumerGroupChannels
- func (cg *ConsumerGroup) Checker(ctx context.Context, state *health.CheckState) error
- func (cg *ConsumerGroup) Close(ctx context.Context) (err error)
- func (cg *ConsumerGroup) CommitAndRelease(msg Message)
- func (cg *ConsumerGroup) Initialise(ctx context.Context) error
- func (cg *ConsumerGroup) IsInitialised() bool
- func (cg *ConsumerGroup) Release()
- func (cg *ConsumerGroup) StopListeningToConsumer(ctx context.Context) (err error)
- type ConsumerGroupChannels
- type ErrBrokersNotReachable
- type ErrInvalidBrokers
- type ErrNoChannel
- type IConsumerGroup
- type IProducer
- type Message
- type MessageConsumer
- type MessageProducer
- type Producer
- type ProducerChannels
- type Sarama
- type SaramaClient
- type SaramaCluster
- type SaramaClusterClient
- type SaramaClusterConsumer
- type SaramaMessage
Constants ¶
const ( Errors = "Errors" Init = "Init" Closer = "Closer" Closed = "Closed" Upstream = "Upstream" UpstreamDone = "UpstreamDone" Output = "Output" )
channel names
const ( OffsetNewest = sarama.OffsetNewest OffsetOldest = sarama.OffsetOldest )
const MsgHealthyConsumerGroup = "kafka consumer group is healthy"
MsgHealthyConsumerGroup Check message returned when Kafka consumer group is healthy.
const MsgHealthyProducer = "kafka producer is healthy"
MsgHealthyProducer Check message returned when Kafka producer is healthy.
const ServiceName = "Kafka"
ServiceName is the name of this service: Kafka.
Variables ¶
var ErrInitSarama = errors.New("Failed to initialise client")
ErrInitSarama is used when Sarama client cannot be initialised
var ErrShutdownTimedOut = errors.New("Shutdown context timed out")
ErrShutdownTimedOut represents an error received due to the context deadline being exceeded
var ErrUninitialisedProducer = errors.New("Producer is not initialised")
ErrUninitialisedProducer is used when a caller tries to send a message to the output channel with an uninitialised producer.
Functions ¶
func SetMaxMessageSize ¶
func SetMaxMessageSize(maxSize int32)
Types ¶
type AsyncProducer ¶
type AsyncProducer = sarama.AsyncProducer
AsyncProducer is a wrapper around sarama.AsyncProducer
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup is a Kafka consumer group instance.
func NewConsumerGroup ¶
func NewConsumerGroup( ctx context.Context, brokerAddrs []string, topic string, group string, offset int64, sync bool, channels *ConsumerGroupChannels) (*ConsumerGroup, error)
NewConsumerGroup returns a new consumer group using default configuration and provided channels
func NewConsumerWithClusterClient ¶
func NewConsumerWithClusterClient( ctx context.Context, brokerAddrs []string, topic string, group string, offset int64, syncConsumer bool, channels *ConsumerGroupChannels, cli SaramaCluster) (cg *ConsumerGroup, err error)
NewConsumerWithClusterClient returns a new consumer group with the provided sarama cluster client
func (*ConsumerGroup) Channels ¶ added in v1.1.0
func (cg *ConsumerGroup) Channels() *ConsumerGroupChannels
Channels returns the ConsumerGroup channels for this consumer group
func (*ConsumerGroup) Checker ¶
func (cg *ConsumerGroup) Checker(ctx context.Context, state *health.CheckState) error
Checker checks health of Kafka consumer-group and updates the provided CheckState accordingly
func (*ConsumerGroup) Close ¶
func (cg *ConsumerGroup) Close(ctx context.Context) (err error)
Close safely closes the consumer and releases all resources. pass in a context with a timeout or deadline. Passing a nil context will provide no timeout but is not recommended
func (*ConsumerGroup) CommitAndRelease ¶
func (cg *ConsumerGroup) CommitAndRelease(msg Message)
CommitAndRelease commits the consumed message and release the consumer listener to read another message
func (*ConsumerGroup) Initialise ¶ added in v1.1.0
func (cg *ConsumerGroup) Initialise(ctx context.Context) error
Initialise creates a new Sarama Consumer and the channel redirection, only if it was not already initialised.
func (*ConsumerGroup) IsInitialised ¶
func (cg *ConsumerGroup) IsInitialised() bool
IsInitialised returns true only if Sarama consumer has been correctly initialised.
func (*ConsumerGroup) Release ¶
func (cg *ConsumerGroup) Release()
Release signals that upstream has completed an incoming message i.e. move on to read the next message
func (*ConsumerGroup) StopListeningToConsumer ¶
func (cg *ConsumerGroup) StopListeningToConsumer(ctx context.Context) (err error)
StopListeningToConsumer stops any more messages being consumed off kafka topic
type ConsumerGroupChannels ¶
type ConsumerGroupChannels struct { Upstream chan Message Errors chan error Init chan struct{} Closer chan struct{} Closed chan struct{} UpstreamDone chan bool }
ConsumerGroupChannels represents the channels used by ConsumerGroup.
func CreateConsumerGroupChannels ¶
func CreateConsumerGroupChannels(sync bool) *ConsumerGroupChannels
CreateConsumerGroupChannels initialises a ConsumerGroupChannels with new channels according to sync
func (*ConsumerGroupChannels) LogErrors ¶ added in v1.1.0
func (consumerChannels *ConsumerGroupChannels) LogErrors(ctx context.Context, errMsg string)
LogErrors creates a go-routine that waits on chErrors channel and logs any error received. It exits on chCloser channel event. Provided context and errMsg will be used in the log Event.
func (*ConsumerGroupChannels) Validate ¶
func (consumerChannels *ConsumerGroupChannels) Validate() error
Validate returns ErrNoChannel if any consumer channel is nil
type ErrBrokersNotReachable ¶
type ErrBrokersNotReachable struct {
Addrs []string
}
ErrBrokersNotReachable is an Error type for 'Broker Not reachable' with a list of unreachable addresses
func (*ErrBrokersNotReachable) Error ¶
func (e *ErrBrokersNotReachable) Error() string
Error returns the error message with a list of unreachable addresses
type ErrInvalidBrokers ¶
type ErrInvalidBrokers struct {
Addrs []string
}
ErrInvalidBrokers is an Error type for 'Invalid topic info' with a list of invalid broker addresses
func (*ErrInvalidBrokers) Error ¶
func (e *ErrInvalidBrokers) Error() string
Error returns the error message with a list of broker addresses that returned unexpected responses
type ErrNoChannel ¶
type ErrNoChannel struct {
ChannelNames []string
}
ErrNoChannel is an Error type generated when a kafka producer or consumer is created with a missing channel
func (*ErrNoChannel) Error ¶
func (e *ErrNoChannel) Error() string
Error returns the error message with a list of missing channels
type IConsumerGroup ¶ added in v1.1.3
type IConsumerGroup interface { Channels() *ConsumerGroupChannels IsInitialised() bool Initialise(ctx context.Context) error Release() CommitAndRelease(msg Message) StopListeningToConsumer(ctx context.Context) (err error) Checker(ctx context.Context, state *health.CheckState) error Close(ctx context.Context) (err error) }
IConsumerGroup is an interface representing a Kafka Consumer Group.
type IProducer ¶ added in v1.1.3
type IProducer interface { Channels() *ProducerChannels IsInitialised() bool Initialise(ctx context.Context) error Checker(ctx context.Context, state *health.CheckState) error Close(ctx context.Context) (err error) }
IProducer is an interface representing a Kafka Producer
type Message ¶
type Message interface { // GetData returns the message contents. GetData() []byte // Commit the message's offset. Commit() // Offset returns the message offset Offset() int64 }
Message represents a single kafka message.
type MessageConsumer ¶
MessageConsumer provides a generic interface for consuming []byte messages
type MessageProducer ¶
MessageProducer provides a generic interface for producing []byte messages
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer is a producer of Kafka messages
func NewProducer ¶
func NewProducer( ctx context.Context, brokerAddrs []string, topic string, envMax int, channels *ProducerChannels) (*Producer, error)
NewProducer returns a new producer instance using the provided config and channels. The rest of the config is set to defaults. If any channel parameter is nil, an error will be returned.
func NewProducerWithSaramaClient ¶
func NewProducerWithSaramaClient( ctx context.Context, brokerAddrs []string, topic string, envMax int, channels *ProducerChannels, cli Sarama) (producer *Producer, err error)
NewProducerWithSaramaClient returns a new producer with a provided Sarama client
func (*Producer) Channels ¶ added in v1.1.0
func (p *Producer) Channels() *ProducerChannels
Channels returns the Producer channels for this producer
func (*Producer) Checker ¶
Checker checks health of Kafka producer and updates the provided CheckState accordingly
func (*Producer) Close ¶
Close safely closes the producer and releases all resources. pass in a context with a timeout or deadline. Passing a nil context will provide no timeout and this is not recommended
func (*Producer) Initialise ¶ added in v1.1.0
Initialise creates a new Sarama AsyncProducer and the channel redirection, only if it was not already initialised.
func (*Producer) IsInitialised ¶
IsInitialised returns true only if Sarama producer has been correctly initialised.
type ProducerChannels ¶
type ProducerChannels struct { Output chan []byte Errors chan error Init chan struct{} Closer chan struct{} Closed chan struct{} }
ProducerChannels represents the channels used by Producer.
func CreateProducerChannels ¶
func CreateProducerChannels() *ProducerChannels
CreateProducerChannels initialises a ProducerChannels with new channels.
func (*ProducerChannels) LogErrors ¶ added in v1.1.0
func (producerChannels *ProducerChannels) LogErrors(ctx context.Context, errMsg string)
LogErrors creates a go-routine that waits on chErrors channel and logs any error received. It exits on chCloser channel event. Provided context and errMsg will be used in the log Event.
func (*ProducerChannels) Validate ¶
func (producerChannels *ProducerChannels) Validate() error
Validate returns ErrNoChannel if any producer channel is nil
type Sarama ¶
type Sarama interface {
NewAsyncProducer(addrs []string, conf *sarama.Config) (sarama.AsyncProducer, error)
}
Sarama is an interface representing the Sarama library.
type SaramaClient ¶
type SaramaClient struct{}
SaramaClient implements Sarama interface and wraps the real calls to Sarama library.
func (*SaramaClient) NewAsyncProducer ¶
func (s *SaramaClient) NewAsyncProducer(addrs []string, conf *sarama.Config) (AsyncProducer, error)
NewAsyncProducer creates a new sarama.AsyncProducer using the given broker addresses and configuration.
type SaramaCluster ¶
type SaramaCluster interface {
NewConsumer(addrs []string, groupID string, topics []string, config *cluster.Config) (SaramaClusterConsumer, error)
}
SaramaCluster is an interface representing the bsm sarama-cluster library.
type SaramaClusterClient ¶
type SaramaClusterClient struct{}
SaramaClusterClient implements SaramaCluster interface and wraps the real calls to bsm sarama-cluster library.
func (*SaramaClusterClient) NewConsumer ¶
func (c *SaramaClusterClient) NewConsumer(addrs []string, groupID string, topics []string, config *cluster.Config) (SaramaClusterConsumer, error)
NewConsumer creates a new sarama cluster consumer.
type SaramaClusterConsumer ¶
type SaramaClusterConsumer interface { Close() (err error) Messages() <-chan *sarama.ConsumerMessage CommitOffsets() error Errors() <-chan error Notifications() <-chan *cluster.Notification MarkOffset(msg *sarama.ConsumerMessage, metadata string) }
SaramaClusterConsumer is an interface representing the bsm sarama-cluster Consumer struct
type SaramaMessage ¶
type SaramaMessage struct {
// contains filtered or unexported fields
}
SaramaMessage represents a Sarama specific Kafka message
func (SaramaMessage) GetData ¶
func (M SaramaMessage) GetData() []byte
GetData returns the message contents.
func (SaramaMessage) Offset ¶
func (M SaramaMessage) Offset() int64
Offset returns the message offset