emq

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2022 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	KAFKA_MQ_TYP = "kafka"
)

Variables

View Source
var (
	ErrKafkaConfigIllegal = errors.New("kafka config illegal")
	ErrKafkaLoggerNil     = errors.New("kafka logger nil")

	ErrConsumerClosed = errors.New("consumer closed")
	ErrIllegalMqType  = errors.New("illegal mq type")
)

Functions

func GetClusterConfig

func GetClusterConfig(config *KafkaConfig) *cluster.Config

func GetSaramConfig

func GetSaramConfig(config *KafkaConfig) *sarama.Config

func WriteAsyncMsg

func WriteAsyncMsg(ctx context.Context, key string, value []byte, properties map[string]string) error

Types

type Client

type Client interface {
	// create a producer
	NewProducer(ctx context.Context) (Producer, error)

	// create a consumer
	NewConsumer(ctx context.Context, consumerName string) (Consumer, error)
}

func NewClient

func NewClient(ctx context.Context, m MqConfig) (Client, error)

type Consumer

type Consumer interface {
	// Read mq msg. You need to pass an object for serialization to the function.
	ReadMsg(ctx context.Context, value interface{}) (context.Context, error)

	// Fetch mq msg.You need to pass an object for serialization to the function.
	// It returns a handler for Ack msg.
	FetchMsg(ctx context.Context, value interface{}) (context.Context, Message, error)

	// read payload mq msg. Function will return original message.
	ReadPayloadMsg(ctx context.Context) (context.Context, []byte, error)

	FetchPayloadMsg(ctx context.Context) (context.Context, []byte, Message, error)

	// This method should be called when the Consumer's life cycle is ended.
	Close() error
}

type KafkaClient

type KafkaClient struct {
	// contains filtered or unexported fields
}

func NewKafkaClient

func NewKafkaClient(ctx context.Context, config *KafkaConfig) (KafkaClient, error)

func (*KafkaClient) NewConsumer

func (client *KafkaClient) NewConsumer(ctx context.Context, consumerName string) (Consumer, error)

func (*KafkaClient) NewProducer

func (client *KafkaClient) NewProducer(ctx context.Context) (Producer, error)

type KafkaConfig

type KafkaConfig struct {
	TopicType  string
	Topic      string
	BrokerAddr []string
	User       string
	Password   string
	L          MqLog
}

func NewKafkaConfig

func NewKafkaConfig() KafkaConfig

func (*KafkaConfig) GetBrokerAddr

func (conf *KafkaConfig) GetBrokerAddr() []string

func (*KafkaConfig) GetLog

func (conf *KafkaConfig) GetLog() MqLog

func (*KafkaConfig) GetMqTopic

func (conf *KafkaConfig) GetMqTopic() string

func (*KafkaConfig) GetMqType

func (conf *KafkaConfig) GetMqType() string

func (*KafkaConfig) GetPassword

func (conf *KafkaConfig) GetPassword() string

func (*KafkaConfig) GetUser

func (conf *KafkaConfig) GetUser() string

func (*KafkaConfig) SetBrokerAddr

func (conf *KafkaConfig) SetBrokerAddr(addrs []string)

func (*KafkaConfig) SetLog

func (conf *KafkaConfig) SetLog(logger MqLog)

func (*KafkaConfig) SetMqTopic

func (conf *KafkaConfig) SetMqTopic(topic string)

func (*KafkaConfig) SetMqType

func (conf *KafkaConfig) SetMqType(mqTyp string)

func (*KafkaConfig) SetPassword

func (conf *KafkaConfig) SetPassword(password string)

func (*KafkaConfig) SetUser

func (conf *KafkaConfig) SetUser(user string)

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

func (*KafkaConsumer) Close

func (c *KafkaConsumer) Close() error

func (*KafkaConsumer) FetchMsg

func (c *KafkaConsumer) FetchMsg(ctx context.Context, value interface{}) (context.Context, Message, error)

func (*KafkaConsumer) FetchPayloadMsg

func (c *KafkaConsumer) FetchPayloadMsg(ctx context.Context) (context.Context, []byte, Message, error)

func (*KafkaConsumer) InitConsumerReturn

func (c *KafkaConsumer) InitConsumerReturn(ctx context.Context)

func (*KafkaConsumer) ReadMsg

func (c *KafkaConsumer) ReadMsg(ctx context.Context, value interface{}) (context.Context, error)

func (*KafkaConsumer) ReadPayloadMsg

func (c *KafkaConsumer) ReadPayloadMsg(ctx context.Context) (context.Context, []byte, error)

type KafkaMssage

type KafkaMssage struct {
	// contains filtered or unexported fields
}

func (*KafkaMssage) Commit

func (m *KafkaMssage) Commit(metadata string)

type KafkaProducer

type KafkaProducer struct {
	// contains filtered or unexported fields
}

func (*KafkaProducer) Close

func (p *KafkaProducer) Close() error

func (*KafkaProducer) InitProducerReturn

func (p *KafkaProducer) InitProducerReturn(ctx context.Context)

func (*KafkaProducer) WriteMsg

func (p *KafkaProducer) WriteMsg(ctx context.Context, key string, value []byte, properties map[string]string) (partition int32, msgId string, err error)

type Message

type Message interface {
	Commit(metadata string)
}

type MqConfig

type MqConfig interface {
	SetMqType(mqTyp string)
	SetMqTopic(topic string)
	SetBrokerAddr(addrs []string)
	SetUser(user string)
	SetPassword(password string)

	GetMqType() string
	GetMqTopic() string
	GetBrokerAddr() []string
	GetUser() string
	GetPassword() string
}

type MqLog

type MqLog interface {
	Debugf(f string, args ...interface{})
	Infof(f string, args ...interface{})
	Warnf(f string, args ...interface{})
	Errorf(f string, args ...interface{})
}

type Producer

type Producer interface {
	// write mq msg. You need to pass a "key" that can break up the data,a value,and an extra message to the function.
	WriteMsg(ctx context.Context, key string, value []byte, properties map[string]string) (partition int32, msgId string, err error)

	// This method should be called when the life cycle of the Producer is ended,
	// and you need to ensure that there are no write requests before calling.
	Close() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL