Documentation
¶
Overview ¶
Package kafka messaging broker @author Daud Valentino
Package kafka broker message @author Daud Valentino
Package kafka ¶
Package kafka messaging @author Daud Valentino
Package kafka messaging @author Daud Valentino
Package kafka messaging broker @author Daud Valentino
Package kafka
Index ¶
- Variables
- func NewConsumerHandler(msgProcessor MessageProcessorFunc, autoCommit bool) sarama.ConsumerGroupHandler
- type Config
- type Consumer
- type ConsumerConfig
- type ConsumerContext
- type MessageContext
- type MessageDecoder
- type MessageEncoder
- type MessageProcessor
- type MessageProcessorFunc
- type Producer
- type ProducerConfig
- type SASL
- type TLS
- type XDGSCRAMClient
Constants ¶
This section is empty.
Variables ¶
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
Functions ¶
func NewConsumerHandler ¶
func NewConsumerHandler(msgProcessor MessageProcessorFunc, autoCommit bool) sarama.ConsumerGroupHandler
NewConsumerHandler return consumer handler
Types ¶
type Config ¶
type Config struct { // Brokers list of brokers connection hostname or ip address Brokers []string `json:"brokers" yaml:"brokers"` SASL SASL `json:"sasl" yaml:"sasl"` // kafka broker Version Version string `json:"version" yaml:"version"` ClientID string `json:"client_id" yaml:"client_id"` Producer ProducerConfig `json:"producer" yaml:"producer"` Consumer ConsumerConfig `json:"consumer" yaml:"consumer"` TLS TLS `json:"tls" yaml:"tls"` // The number of events to buffer in internal and external channels. This // permits the producer and consumer to continue processing some messages // in the background while user code is working, greatly improving throughput. // Defaults to 256. ChannelBufferSize int `json:"channel_buffer_size" yaml:"channel_buffer_size"` }
Config entity of kafka broker
type Consumer ¶
type Consumer interface {
Subscribe(*ConsumerContext)
}
Consumer represents a Sarama consumer consumer interface
func NewConsumerGroup ¶
NewConsumer return consumer message broker
type ConsumerConfig ¶
type ConsumerConfig struct { // Minimum is 10s SessionTimeoutSecond int `json:"session_timeout_second" yaml:"session_timeout_second"` OffsetInitial int64 `json:"offset_initial" yaml:"offset_initial"` HeartbeatInterval int `json:"heartbeat_interval" yaml:"heartbeat_interval"` RebalanceStrategy string `json:"rebalance_strategy" yaml:"rebalance_strategy"` AutoCommit bool `json:"auto_commit" yaml:"auto_commit"` IsolationLevel int8 `json:"isolation_level" yaml:"isolation_level"` }
type ConsumerContext ¶
type ConsumerContext struct { Handler MessageProcessorFunc Topics []string GroupID string Context context.Context }
type MessageContext ¶
type MessageDecoder ¶
type MessageDecoder struct { Body []byte Key []byte Topic string Partition int32 TimeStamp time.Time Offset int64 Commit func(*MessageDecoder) }
MessageDecoder decoder message data on topic
func (*MessageDecoder) DecodeJSON ¶
func (decoder *MessageDecoder) DecodeJSON(out interface{}) error
DecodeJSON decode kafka message byte to struct
type MessageEncoder ¶
MessageEncoder message encoder publish message to kafka
type MessageProcessor ¶
type MessageProcessor interface {
Processor(decoder *MessageDecoder) error
}
MessageProcessor contract message consumer processor
type MessageProcessorFunc ¶
type MessageProcessorFunc func(*MessageDecoder)
type Producer ¶
type Producer interface {
Publish(ctx context.Context, msg *MessageContext) error
}
Producer represents kafka publisher message topic
type ProducerConfig ¶
type ProducerConfig struct { // The maximum duration the broker will wait the receipt of the number of // RequiredAcks (defaults to 10 seconds). This is only relevant when // RequiredAcks is set to WaitForAll or a number > 1. Only supports // millisecond resolution, nanoseconds will be truncated. Equivalent to // the JVM producer's `request.timeout.ms` setting. TimeoutSecond int `json:"timeout_second" yaml:"timeout_second"` // RequireACK // 0 = NoResponse doesn't send any response, the TCP ACK is all you get. // 1 = WaitForLocal waits for only the local commit to succeed before responding. // - 1 = WaitForAll // WaitForAll waits for all in-sync replicas to commit before responding. // The minimum number of in-sync replicas is configured on the broker via // the `min.insync.replicas` configuration key. RequireACK int16 `json:"require_ack" yaml:"require_ack"` // If enabled, the producer will ensure that exactly one copy of each message is // written. IdemPotent bool `json:"idem_potent" yaml:"idem_potent"` // Generates partitioners for choosing the partition to send messages to // (defaults to hashing the message key). Similar to the `partitioner.class` // setting for the JVM producer. PartitionStrategy string `json:"partition_strategy" yaml:"partition_strategy"` }
type SASL ¶
type SASL struct { // Whether or not to use SASL authentication when connecting to the broker // (defaults to false). Enable bool `json:"enable" yaml:"enable"` // SASLMechanism is the name of the enabled SASL mechanism. // Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN). Mechanism string `json:"mechanism" yaml:"mechanism"` // Version is the SASL Protocol Version to use // Kafka > 1.x should use V1, except on Azure EventHub which use V0 Version int16 `json:"version" yaml:"version"` // Whether or not to send the Kafka SASL handshake first if enabled // (defaults to true). You should only set this to false if you're using // a non-Kafka SASL proxy. Handshake bool `json:"handshake" yaml:"handshake"` // User is the authentication identity (authcid) to present for // SASL/PLAIN or SASL/SCRAM authentication User string `json:"user" yaml:"user"` // Password for SASL/PLAIN authentication Password string `json:"password" yaml:"password"` }
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool