Documentation ¶
Index ¶
- Constants
- Variables
- func GetClusterConfig(config *KafkaConfig) *cluster.Config
- func GetSaramConfig(config *KafkaConfig) *sarama.Config
- func WriteAsyncMsg(ctx context.Context, key string, value []byte, properties map[string]string) error
- type Client
- type Consumer
- type KafkaClient
- type KafkaConfig
- func (conf *KafkaConfig) GetBrokerAddr() []string
- func (conf *KafkaConfig) GetLog() MqLog
- func (conf *KafkaConfig) GetMqTopic() string
- func (conf *KafkaConfig) GetMqType() string
- func (conf *KafkaConfig) GetPassword() string
- func (conf *KafkaConfig) GetUser() string
- func (conf *KafkaConfig) SetBrokerAddr(addrs []string)
- func (conf *KafkaConfig) SetLog(logger MqLog)
- func (conf *KafkaConfig) SetMqTopic(topic string)
- func (conf *KafkaConfig) SetMqType(mqTyp string)
- func (conf *KafkaConfig) SetPassword(password string)
- func (conf *KafkaConfig) SetUser(user string)
- type KafkaConsumer
- func (c *KafkaConsumer) Close() error
- func (c *KafkaConsumer) FetchMsg(ctx context.Context, value interface{}) (context.Context, Message, error)
- func (c *KafkaConsumer) FetchPayloadMsg(ctx context.Context) (context.Context, []byte, Message, error)
- func (c *KafkaConsumer) InitConsumerReturn(ctx context.Context)
- func (c *KafkaConsumer) ReadMsg(ctx context.Context, value interface{}) (context.Context, error)
- func (c *KafkaConsumer) ReadPayloadMsg(ctx context.Context) (context.Context, []byte, error)
- type KafkaMssage
- type KafkaProducer
- type Message
- type MqConfig
- type MqLog
- type Producer
Constants ¶
View Source
const (
KAFKA_MQ_TYP = "kafka"
)
Variables ¶
Functions ¶
func GetClusterConfig ¶
func GetClusterConfig(config *KafkaConfig) *cluster.Config
func GetSaramConfig ¶
func GetSaramConfig(config *KafkaConfig) *sarama.Config
Types ¶
type Client ¶
type Consumer ¶
type Consumer interface { // Read mq msg. You need to pass an object for serialization to the function. ReadMsg(ctx context.Context, value interface{}) (context.Context, error) // Fetch mq msg.You need to pass an object for serialization to the function. // It returns a handler for Ack msg. FetchMsg(ctx context.Context, value interface{}) (context.Context, Message, error) // read payload mq msg. Function will return original message. ReadPayloadMsg(ctx context.Context) (context.Context, []byte, error) FetchPayloadMsg(ctx context.Context) (context.Context, []byte, Message, error) // This method should be called when the Consumer's life cycle is ended. Close() error }
type KafkaClient ¶
type KafkaClient struct {
// contains filtered or unexported fields
}
func NewKafkaClient ¶
func NewKafkaClient(ctx context.Context, config *KafkaConfig) (KafkaClient, error)
func (*KafkaClient) NewConsumer ¶
func (*KafkaClient) NewProducer ¶
func (client *KafkaClient) NewProducer(ctx context.Context) (Producer, error)
type KafkaConfig ¶
type KafkaConfig struct { TopicType string Topic string BrokerAddr []string User string Password string L MqLog }
func NewKafkaConfig ¶
func NewKafkaConfig() KafkaConfig
func (*KafkaConfig) GetBrokerAddr ¶
func (conf *KafkaConfig) GetBrokerAddr() []string
func (*KafkaConfig) GetLog ¶
func (conf *KafkaConfig) GetLog() MqLog
func (*KafkaConfig) GetMqTopic ¶
func (conf *KafkaConfig) GetMqTopic() string
func (*KafkaConfig) GetMqType ¶
func (conf *KafkaConfig) GetMqType() string
func (*KafkaConfig) GetPassword ¶
func (conf *KafkaConfig) GetPassword() string
func (*KafkaConfig) GetUser ¶
func (conf *KafkaConfig) GetUser() string
func (*KafkaConfig) SetBrokerAddr ¶
func (conf *KafkaConfig) SetBrokerAddr(addrs []string)
func (*KafkaConfig) SetLog ¶
func (conf *KafkaConfig) SetLog(logger MqLog)
func (*KafkaConfig) SetMqTopic ¶
func (conf *KafkaConfig) SetMqTopic(topic string)
func (*KafkaConfig) SetMqType ¶
func (conf *KafkaConfig) SetMqType(mqTyp string)
func (*KafkaConfig) SetPassword ¶
func (conf *KafkaConfig) SetPassword(password string)
func (*KafkaConfig) SetUser ¶
func (conf *KafkaConfig) SetUser(user string)
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
func (*KafkaConsumer) Close ¶
func (c *KafkaConsumer) Close() error
func (*KafkaConsumer) FetchPayloadMsg ¶
func (*KafkaConsumer) InitConsumerReturn ¶
func (c *KafkaConsumer) InitConsumerReturn(ctx context.Context)
func (*KafkaConsumer) ReadPayloadMsg ¶
type KafkaMssage ¶
type KafkaMssage struct {
// contains filtered or unexported fields
}
func (*KafkaMssage) Commit ¶
func (m *KafkaMssage) Commit(metadata string)
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
func (*KafkaProducer) Close ¶
func (p *KafkaProducer) Close() error
func (*KafkaProducer) InitProducerReturn ¶
func (p *KafkaProducer) InitProducerReturn(ctx context.Context)
type Producer ¶
type Producer interface { // write mq msg. You need to pass a "key" that can break up the data,a value,and an extra message to the function. WriteMsg(ctx context.Context, key string, value []byte, properties map[string]string) (partition int32, msgId string, err error) // This method should be called when the life cycle of the Producer is ended, // and you need to ensure that there are no write requests before calling. Close() error }
Click to show internal directories.
Click to hide internal directories.