Documentation ¶
Index ¶
- Constants
- func DefaultProcessor(ctx context.Context, dependencies ProcessorDependencies, msg ConsumerMessage) error
- type Config
- type ConsumerMessage
- type EncoderDecoder
- type KafkaClient
- type KafkaGoClient
- type KafkaGoMessage
- func (m KafkaGoMessage) InfoEvent(event string) string
- func (m KafkaGoMessage) Key() string
- func (m KafkaGoMessage) Offset() int64
- func (m KafkaGoMessage) Partition() int32
- func (m KafkaGoMessage) Topic() string
- func (m KafkaGoMessage) Unmarshall(ctx context.Context, native interface{}) (e error)
- func (m KafkaGoMessage) Value() []byte
- type ProcessorDependencies
- type RetryTopicMessage
- type SaramaClient
- type SaramaMessage
- type TopicConfig
Constants ¶
const ( // ConsumerTypeGroup configures the consumer as part of a consumer group ConsumerTypeGroup consumerType = "CONSUMER_GROUP" // MessageFormatAvro specifies that messages in a topic are stored in avro format MessageFormatAvro messageFormat = "MESSAGE_AVRO" // MessageFormatJSON specifies that messages in a topic are stored in JSON format MessageFormatJSON messageFormat = "MESSAGE_JSON" // MessageFormatString specifies that messages in a topic are stored in string format MessageFormatString messageFormat = "MESSAGE_STRING" // ProducerTypeAsync configures a producer with an asynchronous response mechanism ProducerTypeAsync producerType = "PRODUCER_ASYNC" // ProducerTypeSync configures a producer with synchronous feedback ProducerTypeSync producerType = "PRODUCER_SYNC" )
const ( // BaseSarama can be used in kafkaclient.New to specify that // the underlying library used will be Shopify's sarama (https://github.com/Shopify/sarama/) BaseSarama baseLibrary = "SARAMA" // BaseKafkaGO can be used in kafkaclient.New to specify that // the underlying library used will be kafkago (https://github.com/segmentio/kafka-go) BaseKafkaGO baseLibrary = "KAFKAGO" )
Variables ¶
This section is empty.
Functions ¶
func DefaultProcessor ¶
func DefaultProcessor(ctx context.Context, dependencies ProcessorDependencies, msg ConsumerMessage) error
Types ¶
type Config ¶
type Config struct { KafkaVersion string Brokers []string Topics []TopicConfig SchemaRegURL string ConsumerType consumerType ConsumerGroupID string ProcDependencies ProcessorDependencies // injectable dependencies for message processors ProducerType producerType ReadFromOldest bool TLS *tls.Config Debug bool }
Config holds specifics used to configure different part of the kafka client
func NewConfig ¶
func NewConfig( ctx context.Context, version string, brokers []string, topics []TopicConfig, procDependencies ProcessorDependencies, schemaRegURL string, consType consumerType, groupID string, prodType producerType, readFromOldest bool, tls *tls.Config, debug bool) (c Config, e error)
NewConfig constructs and returns a Config struct
func (Config) ReadTopicNames ¶
ReadTopicNames constructs and returns a slice of all topic names
func (Config) TopicMap ¶
func (c Config) TopicMap() (m map[string]TopicConfig)
TopicMap constructs and returns a map of topic configuration, using each topic name as the map key
func (Config) WriteTopicNames ¶
WriteTopicNames constructs and returns a slice of all topic names
type ConsumerMessage ¶
type ConsumerMessage interface { Unmarshall(ctx context.Context, native interface{}) (e error) Topic() string Key() string Offset() int64 Partition() int32 Value() []byte }
ConsumerMessage is an interface implememented by kafka consumer message types
type EncoderDecoder ¶
type EncoderDecoder interface { // Encode encodes native golang as binary. // // topic: name of topic the message will be sent to // native: the golang data structure to be encoded Encode(ctx context.Context, topic string, native interface{}) (b []byte, e error) // Decode decodes binary into native golang. // // topic: name of topic the message was received from // b: the binary to be decoded, // target: pointer to data structure the binary data will be decoded into Decode(ctx context.Context, topic string, b []byte, target interface{}) error // GetSchemaID returns the topic schema ID, if applicable GetSchemaID(ctx context.Context, topic string) (int, error) }
EncoderDecoder interface
type KafkaClient ¶
type KafkaClient interface { // StartConsume starts the consumption of messages from the configured Kafka topics StartConsume(ctx context.Context) error // CancelConsume cancels the consumption of messages from configured topics CancelConsume() error // ProduceMessage adds messages to a specified topic ProduceMessage(ctx context.Context, topic string, key string, msg interface{}) error // contains filtered or unexported methods }
KafkaClient is an interface describing the primary uses of this library
func New ¶
func New(base baseLibrary, config Config) (KafkaClient, error)
New constructs and returns a new KafkaClient implementation
type KafkaGoClient ¶
type KafkaGoClient struct {
// contains filtered or unexported fields
}
KafkaGoClient implements the KafkaClient interface
func (*KafkaGoClient) CancelConsume ¶
func (c *KafkaGoClient) CancelConsume() (e error)
CancelConsume calls the context's context.cancelFunc in order to stop the process of message consumption
func (*KafkaGoClient) ProduceMessage ¶
func (c *KafkaGoClient) ProduceMessage( ctx context.Context, topic string, key string, msg interface{}) (e error)
ProduceMessage creates/encodes a message and sends it to the specified topic
func (*KafkaGoClient) StartConsume ¶
func (c *KafkaGoClient) StartConsume(ctx context.Context) (e error)
StartConsume starts consuming configured kafka topic messages
type KafkaGoMessage ¶
type KafkaGoMessage struct {
// contains filtered or unexported fields
}
KafkaGoMessage holds kafka-go message contents as well as an EncoderDecoder used to unmarshall message data
func (KafkaGoMessage) InfoEvent ¶
func (m KafkaGoMessage) InfoEvent(event string) string
InfoEvent constructs and returns a loggable event relating to the message
func (KafkaGoMessage) Offset ¶
func (m KafkaGoMessage) Offset() int64
Offset returns the message offset
func (KafkaGoMessage) Partition ¶
func (m KafkaGoMessage) Partition() int32
Partition returns the message partition
func (KafkaGoMessage) Topic ¶
func (m KafkaGoMessage) Topic() string
Topic returns the message topic
func (KafkaGoMessage) Unmarshall ¶
func (m KafkaGoMessage) Unmarshall(ctx context.Context, native interface{}) (e error)
Unmarshall unmarshalls the message contents into the provided struct
func (KafkaGoMessage) Value ¶
func (m KafkaGoMessage) Value() []byte
Value returns the message byte value
type ProcessorDependencies ¶ added in v1.0.2
type ProcessorDependencies interface{}
type RetryTopicMessage ¶
type RetryTopicMessage struct { OriginalTopic string `json:"original_topic" avro:"original_topic"` OriginalPartition int32 `json:"original_partition" avro:"original_partition"` OriginalOffset int64 `json:"original_offset" avro:"original_offset"` OriginalMessage []byte `json:"original_message" avro:"original_message"` Error string `json:"error" avro:"error"` }
RetryTopicMessage is a native go representation of a message on a retry topic
func NewRetryTopicMessage ¶
func NewRetryTopicMessage( origTopic string, origPart int32, origOffset int64, origMsg []byte, e error) RetryTopicMessage
NewRetryTopicMessage constructs and returns a new RetryTopicMessage to be added to a retry topic
type SaramaClient ¶
type SaramaClient struct {
// contains filtered or unexported fields
}
SaramaClient implements the KafkaClient interface
func (*SaramaClient) CancelConsume ¶
func (c *SaramaClient) CancelConsume() (e error)
CancelConsume call the context's context.cancelFunc in order to stop the process of message consumption
func (*SaramaClient) ProduceMessage ¶
func (c *SaramaClient) ProduceMessage( ctx context.Context, topic string, key string, msg interface{}) (e error)
ProduceMessage creates/encodes a message and sends it to the specified topic
func (*SaramaClient) StartConsume ¶
func (c *SaramaClient) StartConsume(ctx context.Context) (e error)
StartConsume starts consuming configured kafka topic messages
type SaramaMessage ¶
type SaramaMessage struct {
// contains filtered or unexported fields
}
SaramaMessage holds sarama message contents as well as an EncoderDecoder used to unmarshall message data
func (SaramaMessage) Offset ¶
func (m SaramaMessage) Offset() int64
Offset returns the message offset
func (SaramaMessage) Partition ¶
func (m SaramaMessage) Partition() int32
Partition returns the message partition
func (SaramaMessage) Unmarshall ¶
func (m SaramaMessage) Unmarshall(ctx context.Context, native interface{}) (e error)
Unmarshall unmarshalls the message contents into the provided struct
func (SaramaMessage) Value ¶
func (m SaramaMessage) Value() []byte
Value returns the message byte value
type TopicConfig ¶
type TopicConfig struct { Name string MessageFormat messageFormat // Set DoConsume to true if this topic should be consumed from DoConsume bool // Set SoProduce to true if you will need to produce messages to this topic DoProduce bool DelayProcessingMins time.Duration // FailedProcessingTopic is the retry topic to which a message // should be handed off in the case of a failure to process the message FailedProcessingTopic string // Schema is an optional string representation of the topic schema Schema string SchemaVersion int MessageProcessor func(context.Context, ProcessorDependencies, ConsumerMessage) error // contains filtered or unexported fields }
TopicConfig is a struct that holds data regarding an existing Kafka topic that can be consumed from or written to
Source Files ¶
- config.go
- encoder_decoder.go
- encoder_decoder_avro.go
- encoder_decoder_json.go
- encoder_decoder_string.go
- kafkaclient.go
- kafkago_client.go
- kafkago_consumer.go
- kafkago_message.go
- kafkago_producer.go
- log.go
- message.go
- processor.go
- sarama_client.go
- sarama_consumer.go
- sarama_encoder.go
- sarama_message.go
- sarama_producer.go
- schema_reg.go
- schema_reg_mock.go