Documentation ¶
Index ¶
- Constants
- Variables
- func GetPrincipal(app, subnet, domain string) string
- func GetRetryTime(attempt int, retryTime time.Duration) time.Duration
- func Healthcheck(ctx context.Context, brokers []SaramaBroker, topic string, cfg *sarama.Config) error
- func NewAdmin(brokerAddrs []string, adminCfg *AdminConfig) (sarama.ClusterAdmin, error)
- func SetMaxMessageSize(maxSize int32)
- func SetMaxRetryInterval(maxPause time.Duration)
- func UnwrapLogData(err error) log.Data
- func WaitWithTimeout(ctx context.Context, wg *sync.WaitGroup) bool
- type Acls
- type AdminConfig
- type Batch
- type BatchHandler
- type Commiter
- type ConsumerGroup
- func (cg *ConsumerGroup) Channels() *ConsumerGroupChannels
- func (cg *ConsumerGroup) Checker(ctx context.Context, state *healthcheck.CheckState) error
- func (cg *ConsumerGroup) Close(ctx context.Context) (err error)
- func (cg *ConsumerGroup) Initialise(ctx context.Context) error
- func (cg *ConsumerGroup) IsInitialised() bool
- func (cg *ConsumerGroup) LogErrors(ctx context.Context)
- func (cg *ConsumerGroup) OnHealthUpdate(status string)
- func (cg *ConsumerGroup) RegisterBatchHandler(ctx context.Context, batchHandler BatchHandler) error
- func (cg *ConsumerGroup) RegisterHandler(ctx context.Context, h Handler) error
- func (cg *ConsumerGroup) Start() error
- func (cg *ConsumerGroup) State() string
- func (cg *ConsumerGroup) Stop()
- func (cg *ConsumerGroup) StopAndWait()
- type ConsumerGroupChannels
- type ConsumerGroupConfig
- type ErrBrokersNotReachable
- type ErrInvalidBrokers
- type Error
- type Handler
- type IConsumerGroup
- type IProducer
- type Message
- type Producer
- func (p *Producer) Channels() *ProducerChannels
- func (p *Producer) Checker(ctx context.Context, state *healthcheck.CheckState) error
- func (p *Producer) Close(ctx context.Context) (err error)
- func (p *Producer) Initialise(ctx context.Context) error
- func (p *Producer) IsInitialised() bool
- func (p *Producer) LogErrors(ctx context.Context)
- func (p *Producer) Send(schema *avro.Schema, event interface{}) error
- type ProducerChannels
- type ProducerConfig
- type SaramaAsyncProducer
- type SaramaBroker
- type SaramaConsumerGroup
- type SaramaConsumerGroupClaim
- type SaramaConsumerGroupSession
- type SaramaMessage
- type SecurityConfig
- type State
- type StateMachine
- type TopicAuth
- type TopicAuthList
Constants ¶
const ( Errors = "Errors" Initialised = "Initialised" Consume = "Consume" Closer = "Closer" Closed = "Closed" Upstream = "Upstream" UpstreamDone = "UpstreamDone" Output = "Output" )
channel names
const ( OffsetNewest = sarama.OffsetNewest OffsetOldest = sarama.OffsetOldest )
Common constants
const ( MsgHealthyProducer = "kafka producer is healthy" MsgHealthyConsumerGroup = "kafka consumer group is healthy" )
const ServiceName = "Kafka"
ServiceName is the name of this service: Kafka.
Variables ¶
var ConsumeErrRetryPeriod = 250 * time.Millisecond
ConsumeErrRetryPeriod is the initial time period between consumer retries on error (for consumer groups)
var ErrTLSCannotLoadCACerts = errors.New("cannot load CA Certs")
ErrTLSCannotLoadCACerts is returned when the certs file cannot be loaded
var InitRetryPeriod = 250 * time.Millisecond
InitRetryPeriod is the initial time period between initialisation retries (for producers and consumer gropus)
var MaxRetryInterval = 31 * time.Second
MaxRetryInterval is the maximum time between retries (plus or minus a random amount)
Functions ¶
func GetPrincipal ¶
func GetRetryTime ¶
GetRetryTime will return a duration based on the attempt and initial retry time. It uses the algorithm `2^n` where `n` is the attempt number (double the previous) plus a randomization factor of ±25% of the initial retry time (so that the server isn't being hit at the same time by many clients)
func Healthcheck ¶
func Healthcheck(ctx context.Context, brokers []SaramaBroker, topic string, cfg *sarama.Config) error
Healthcheck implements the common healthcheck logic for kafka producers and consumers, by contacting the provided brokers and asking for topic metadata. Possible errors: - ErrBrokersNotReachable if a broker cannot be contacted. - ErrInvalidBrokers if topic metadata is not returned by a broker.
func NewAdmin ¶
func NewAdmin(brokerAddrs []string, adminCfg *AdminConfig) (sarama.ClusterAdmin, error)
NewAdmin creates an admin-based client
func SetMaxMessageSize ¶
func SetMaxMessageSize(maxSize int32)
SetMaxMessageSize sets the Sarama MaxRequestSize and MaxResponseSize values to the provided maxSize
func SetMaxRetryInterval ¶
SetMaxRetryInterval sets MaxRetryInterval to its duration argument
func UnwrapLogData ¶
UnwrapLogData recursively unwraps logData from an error
Types ¶
type Acls ¶
type Acls []*sarama.AclCreation
type AdminConfig ¶
type AdminConfig struct { KafkaVersion *string KeepAlive *time.Duration RetryBackoff *time.Duration RetryMax *int SecurityConfig *SecurityConfig }
AdminConfig exposes the optional configurable parameters for an admin client to overwrite default Sarama config values. Any value that is not provied will use the default Sarama config value.
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch handles adding raw messages to a batch of ObservationExtracted events.
func (*Batch) Commit ¶
func (batch *Batch) Commit()
Commit is called when the batch has been processed. The last message has been released already, so at this point we just need to commit
type BatchHandler ¶
BatchHandler represents a handler for processing a batch of kafka messages. This method will be called by only one go-routine at a time.
type Commiter ¶
type Commiter interface {
Commit() bool
}
Commiter represents an error type that defines a bool method to enable or disable a message/batch to be committed.
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup is a Kafka consumer group instance.
func NewConsumerGroup ¶
func NewConsumerGroup(ctx context.Context, cgConfig *ConsumerGroupConfig) (*ConsumerGroup, error)
NewConsumerGroup creates a new consumer group with the provided parameters
func (*ConsumerGroup) Channels ¶
func (cg *ConsumerGroup) Channels() *ConsumerGroupChannels
Channels returns the ConsumerGroup channels for this consumer group
func (*ConsumerGroup) Checker ¶
func (cg *ConsumerGroup) Checker(ctx context.Context, state *healthcheck.CheckState) error
Checker checks health of Kafka consumer-group and updates the provided CheckState accordingly
func (*ConsumerGroup) Close ¶
func (cg *ConsumerGroup) Close(ctx context.Context) (err error)
Close safely closes the consumer and releases all resources. pass in a context with a timeout or deadline. Passing a nil context will provide no timeout but is not recommended
func (*ConsumerGroup) Initialise ¶
func (cg *ConsumerGroup) Initialise(ctx context.Context) error
Initialise creates a new Sarama ConsumerGroup and the consumer/error loops, only if it was not already initialised.
func (*ConsumerGroup) IsInitialised ¶
func (cg *ConsumerGroup) IsInitialised() bool
IsInitialised returns true only if Sarama ConsumerGroup has been correctly initialised.
func (*ConsumerGroup) LogErrors ¶
func (cg *ConsumerGroup) LogErrors(ctx context.Context)
LogErrors creates a go-routine that waits on Errors channel and logs any error received. It exits on Closer channel closed.
func (*ConsumerGroup) OnHealthUpdate ¶
func (cg *ConsumerGroup) OnHealthUpdate(status string)
OnHealthUpdate implements the healthcheck Subscriber interface so that the kafka consumer can be notified of state changes. This method is intended to be used for managed start/stop's only, and should not be called manually. WARNING: Having more than one notifier calling this method will result in unexpected behavior. - On Health status OK: start consuming - On Warning or Critical: stop consuming
func (*ConsumerGroup) RegisterBatchHandler ¶
func (cg *ConsumerGroup) RegisterBatchHandler(ctx context.Context, batchHandler BatchHandler) error
func (*ConsumerGroup) RegisterHandler ¶
func (cg *ConsumerGroup) RegisterHandler(ctx context.Context, h Handler) error
func (*ConsumerGroup) Start ¶
func (cg *ConsumerGroup) Start() error
Start has different effects depending on the state: - Initialising: the consumer will try to start consuming straight away once it's initialised - Starting/Consumer: no change will happen - Stopping/Stopped: the consumer will start consuming - Closing: an error will be returned
func (*ConsumerGroup) State ¶
func (cg *ConsumerGroup) State() string
State returns the state of the consumer group
func (*ConsumerGroup) Stop ¶
func (cg *ConsumerGroup) Stop()
Stop has different effects depending on the state: - Initialising: the consumer will remain in the Stopped state once initialised, without consuming messages - Starting/Consumer: no change will happen - Stopping/Stopped: the consumer will start start consuming - Closing: an error will be returned This method does not wait until the consumerGroup reaches the stopped state, it only triggers the stopping action.
func (*ConsumerGroup) StopAndWait ¶
func (cg *ConsumerGroup) StopAndWait()
StopAndWait has different effects depending on the state: - Initialising: the consumer will remain in the Stopped state once initialised, without consuming messages - Starting/Consumer: no change will happen - Stopping/Stopped: the consumer will start start consuming - Closing: an error will be returned This method waits until the consumerGroup reaches the stopped state if it was starting/consuming.
type ConsumerGroupChannels ¶
type ConsumerGroupChannels struct { Upstream chan Message Errors chan error Initialised chan struct{} Consume chan bool Closer chan struct{} Closed chan struct{} }
ConsumerGroupChannels represents the channels used by ConsumerGroup.
func CreateConsumerGroupChannels ¶
func CreateConsumerGroupChannels(bufferSize int) *ConsumerGroupChannels
CreateConsumerGroupChannels initialises a ConsumerGroupChannels with new channels. You can provide the buffer size to determine the number of messages that will be buffered in the upstream channel (to receive messages)
func (*ConsumerGroupChannels) Validate ¶
func (consumerChannels *ConsumerGroupChannels) Validate() error
Validate returns an Error with a list of missing channels if any consumer channel is nil
type ConsumerGroupConfig ¶
type ConsumerGroupConfig struct { // Sarama config overrides KafkaVersion *string KeepAlive *time.Duration RetryBackoff *time.Duration RetryBackoffFunc *func(retries int) time.Duration Offset *int64 SecurityConfig *SecurityConfig MessageConsumeTimeout *time.Duration // dp-kafka specific config overrides NumWorkers *int BatchSize *int BatchWaitTime *time.Duration Topic string GroupName string BrokerAddrs []string }
ConsumerGroupConfig exposes the configurable parameters for a consumer group to overwrite default config values and any other defult config values set by dp-kafka. Any value that is not provied will use the default Sarama config value, or the default dp-kafka value. The only 3 compulsory values are: - Topic - GroupName - BrokerAddrs
func (*ConsumerGroupConfig) Get ¶
func (c *ConsumerGroupConfig) Get() (*sarama.Config, error)
Get creates a default sarama config for a consumer-group and overwrites any values provided in cgConfig. If any required value is not provided or any override is invalid, an error will be returned
func (*ConsumerGroupConfig) Validate ¶
func (c *ConsumerGroupConfig) Validate() error
Validate that compulsory values are provided in config
type ErrBrokersNotReachable ¶
type ErrBrokersNotReachable struct {
Addrs []string
}
ErrBrokersNotReachable is an Error type for 'Broker Not reachable' with a list of unreachable addresses
func (*ErrBrokersNotReachable) Error ¶
func (e *ErrBrokersNotReachable) Error() string
Error returns the error message with a list of unreachable addresses
type ErrInvalidBrokers ¶
type ErrInvalidBrokers struct {
Addrs []string
}
ErrInvalidBrokers is an Error type for 'Invalid topic info' with a list of invalid broker addresses
func (*ErrInvalidBrokers) Error ¶
func (e *ErrInvalidBrokers) Error() string
Error returns the error message with a list of broker addresses that returned unexpected responses
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
Error is the handler package's error type. Is not meant to be compared as a a type, but information should be extracted via the interfaces it implements with callback functions. Is not guaranteed to remain exported so shouldn't be treated as such.
type Handler ¶
Handler represents a handler for processing a single kafka message. IMPORTANT: if maxConsumers > 1 then this method needs to be thread safe.
type IConsumerGroup ¶
type IConsumerGroup interface { Channels() *ConsumerGroupChannels State() string RegisterHandler(ctx context.Context, h Handler) error RegisterBatchHandler(ctx context.Context, batchHandler BatchHandler) error Checker(ctx context.Context, state *healthcheck.CheckState) error IsInitialised() bool Initialise(ctx context.Context) error OnHealthUpdate(status string) Start() error Stop() StopAndWait() LogErrors(ctx context.Context) Close(ctx context.Context) error }
IConsumerGroup is an interface representing a Kafka Consumer Group, as implemented in dp-kafka/consumer
type IProducer ¶
type IProducer interface { Channels() *ProducerChannels Checker(ctx context.Context, state *healthcheck.CheckState) error LogErrors(ctx context.Context) IsInitialised() bool Initialise(ctx context.Context) error Send(schema *avro.Schema, event interface{}) error Close(ctx context.Context) (err error) }
IProducer is an interface representing a Kafka Producer, as implemented in dp-kafka/producer
type Message ¶
type Message interface { // GetData returns the message contents. GetData() []byte // Mark marks the message as consumed, but doesn't commit the offset to the backend Mark() // Commit marks the message as consumed and commits its offset to the backend Commit() // Release closes the UpstreamDone channel for this message Release() // CommitAndRelease marks a message as consumed, commits it and closes the UpstreamDone channel CommitAndRelease() // Offset returns the message offset Offset() int64 // UpstreamDone returns the upstreamDone channel. Closing this channel notifies that the message has been consumed UpstreamDone() chan struct{} }
Message represents a single kafka message.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer is a producer of Kafka messages
func NewProducer ¶
func NewProducer(ctx context.Context, pConfig *ProducerConfig) (producer *Producer, err error)
New returns a new producer instance using the provided config and channels. The rest of the config is set to defaults. If any channel parameter is nil, an error will be returned.
func (*Producer) Channels ¶
func (p *Producer) Channels() *ProducerChannels
Channels returns the Producer channels for this producer
func (*Producer) Checker ¶
func (p *Producer) Checker(ctx context.Context, state *healthcheck.CheckState) error
Checker checks health of Kafka producer and updates the provided CheckState accordingly
func (*Producer) Close ¶
Close safely closes the producer and releases all resources. pass in a context with a timeout or deadline. Passing a nil context will provide no timeout and this is not recommended
func (*Producer) Initialise ¶
Initialise creates a new Sarama AsyncProducer and the channel redirection, only if it was not already initialised.
func (*Producer) IsInitialised ¶
IsInitialised returns true only if Sarama producer has been correctly initialised.
type ProducerChannels ¶
type ProducerChannels struct { Output chan []byte Errors chan error Initialised chan struct{} Closer chan struct{} Closed chan struct{} }
ProducerChannels represents the channels used by Producer.
func CreateProducerChannels ¶
func CreateProducerChannels() *ProducerChannels
CreateProducerChannels initialises a ProducerChannels with new channels.
func (*ProducerChannels) Validate ¶
func (producerChannels *ProducerChannels) Validate() error
Validate returns an error with a list of missing channels if any producer channel is nil
type ProducerConfig ¶
type ProducerConfig struct { // Sarama config overrides KafkaVersion *string MaxMessageBytes *int RetryMax *int KeepAlive *time.Duration RetryBackoff *time.Duration RetryBackoffFunc *func(retries, maxRetries int) time.Duration SecurityConfig *SecurityConfig // dp-kafka specific config Topic string BrokerAddrs []string }
ProducerConfig exposes the optional configurable parameters for a producer to overwrite default Sarama config values. Any value that is not provied will use the default Sarama config value.
func (*ProducerConfig) Get ¶
func (p *ProducerConfig) Get() (*sarama.Config, error)
Get creates a default sarama config and overwrites any values provided in pConfig
func (*ProducerConfig) Validate ¶
func (p *ProducerConfig) Validate() error
Validate that compulsory values are provided in config
type SaramaAsyncProducer ¶
type SaramaAsyncProducer = sarama.AsyncProducer
SaramaAsyncProducer is a wrapper around sarama.AsyncProducer
type SaramaBroker ¶
type SaramaConsumerGroup ¶
type SaramaConsumerGroup = sarama.ConsumerGroup
SaramaConsumerGroup is a wrapper around sarama.ConsumerGroup
type SaramaConsumerGroupClaim ¶
type SaramaConsumerGroupClaim = sarama.ConsumerGroupClaim
SaramaConsumerGroupClaim is a wrapper around sarama.ConsumerGroupClaim
type SaramaConsumerGroupSession ¶
type SaramaConsumerGroupSession = sarama.ConsumerGroupSession
SaramaConsumerGroupSession is a wrapper around sarama.ConsumerGroupSession
type SaramaMessage ¶
type SaramaMessage struct {
// contains filtered or unexported fields
}
SaramaMessage represents a Sarama specific Kafka message
func NewSaramaMessage ¶
func NewSaramaMessage(m *sarama.ConsumerMessage, s sarama.ConsumerGroupSession, ud chan struct{}) *SaramaMessage
func (SaramaMessage) Commit ¶
func (M SaramaMessage) Commit()
Commit marks the message as consumed, and then commits the offset to the backend
func (SaramaMessage) CommitAndRelease ¶
func (M SaramaMessage) CommitAndRelease()
CommitAndRelease marks the message as consumed, commits the offset to the backend and releases the UpstreamDone channel
func (SaramaMessage) GetData ¶
func (M SaramaMessage) GetData() []byte
GetData returns the message contents.
func (SaramaMessage) Mark ¶
func (M SaramaMessage) Mark()
Mark marks the message as consumed, but doesn't commit the offset to the backend
func (SaramaMessage) Offset ¶
func (M SaramaMessage) Offset() int64
Offset returns the message offset
func (SaramaMessage) Release ¶
func (M SaramaMessage) Release()
Release closes the UpstreamDone channel, but doesn't mark the message or commit the offset
func (SaramaMessage) UpstreamDone ¶
func (M SaramaMessage) UpstreamDone() chan struct{}
UpstreamDone returns the upstreamDone channel. Closing this channel notifies that the message has been consumed (same effect as calling Release)
type SecurityConfig ¶
type SecurityConfig struct { RootCACerts string ClientCert string ClientKey string InsecureSkipVerify bool }
SecurityConfig is common to producers and consumer configs, above
func GetSecurityConfig ¶
func GetSecurityConfig(caCerts, clientCert, clientKey string, skipVerify bool) *SecurityConfig
type StateMachine ¶
type StateMachine struct {
// contains filtered or unexported fields
}
func NewConsumerStateMachine ¶
func NewConsumerStateMachine(st State) *StateMachine
func (*StateMachine) Set ¶
func (sm *StateMachine) Set(newState State)
Set sets the state machine to the provided state value
func (*StateMachine) SetIf ¶
func (sm *StateMachine) SetIf(allowed []State, newState State) error
SetIf sets the state machine to the provided state value only if the current state is one of the values provided in the list
func (*StateMachine) String ¶
func (sm *StateMachine) String() string
String returns the string representation of the current state
type TopicAuth ¶
type TopicAuthList ¶
func (TopicAuthList) Apply ¶
func (t TopicAuthList) Apply(adm sarama.ClusterAdmin) error
Source Files ¶
- admin.go
- channels.go
- config_admin.go
- config_consumer.go
- config_producer.go
- config_security.go
- consumer_batch.go
- consumer_group.go
- consumer_handler.go
- consumer_sarama_handler.go
- consumer_sarama_interface.go
- consumer_state.go
- error.go
- error_interface.go
- global.go
- health.go
- health_errors.go
- health_interface.go
- message.go
- producer.go
- producer_interface.go
Directories ¶
Path | Synopsis |
---|---|
Package avro provides a user functionality to return the avro encoding of s.
|
Package avro provides a user functionality to return the avro encoding of s. |
examples
|
|