queue

package
v0.0.0-...-951447a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SendMsg
	ReceiveMsg
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Switch    bool   `json:"switch"`
	Driver    string `json:"driver"`
	Retry     int    `json:"retry"`
	GroupName string `json:"groupName"`
	Redis     RedisConf
	Rocket    RocketConf
	Kafka     KafkaConf
	Pulsar    PulsarConf
}

type Consumer

type Consumer interface {
	ListenReceiveMsgDo(topic string, receiveDo func(Msg Msg)) (err error)
}

func InstanceConsumer

func InstanceConsumer() (mqClient Consumer, err error)

InstanceConsumer 实例化消费者

func NewConsumer

func NewConsumer(groupName string) (client Consumer, err error)

NewConsumer 初始化消费者实例

func RegisterKafkaConsumer

func RegisterKafkaConsumer(connOpt KafkaConfig) (client Consumer, err error)

RegisterKafkaConsumer 注册消费者

func RegisterPulsarConsumer

func RegisterPulsarConsumer(config PulsarConf) (client Consumer, err error)

RegisterPulsarConsumer creates a consumer for a specific topic and subscription.

func RegisterRedisMqConsumer

func RegisterRedisMqConsumer(connOpt RedisOption, groupName string) (client Consumer)

RegisterRedisMqConsumer 注册消费者

func RegisterRocketConsumer

func RegisterRocketConsumer(endPoints []string, groupName string) (client Consumer, err error)

RegisterRocketConsumer 注册消费者

type KaConsumer

type KaConsumer struct {
	// contains filtered or unexported fields
}

func (*KaConsumer) Cleanup

func (consumer *KaConsumer) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*KaConsumer) ConsumeClaim

func (consumer *KaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*KaConsumer) Setup

func (consumer *KaConsumer) Setup(sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim

type Kafka

type Kafka struct {
	Partitions int32
	// contains filtered or unexported fields
}

func (*Kafka) ListenReceiveMsgDo

func (r *Kafka) ListenReceiveMsgDo(topic string, receiveDo func(msg Msg)) (err error)

ListenReceiveMsgDo 消费数据

func (*Kafka) SendByteMsg

func (r *Kafka) SendByteMsg(topic string, body []byte) (msg Msg, err error)

SendByteMsg 生产数据

func (*Kafka) SendDelayMsg

func (r *Kafka) SendDelayMsg(topic string, body string, delaySecond int64) (msg Msg, err error)

func (*Kafka) SendMsg

func (r *Kafka) SendMsg(topic string, body string) (msg Msg, err error)

SendMsg 按字符串类型生产数据

type KafkaConf

type KafkaConf struct {
	Address       []string `json:"address"`
	Version       string   `json:"version"`
	RandClient    bool     `json:"randClient"`
	MultiConsumer bool     `json:"multiConsumer"`
}

type KafkaConfig

type KafkaConfig struct {
	ClientId    string
	Brokers     []string
	GroupID     string
	Partitions  int32
	Replication int16
	Version     string
	UserName    string
	Password    string
}

type Msg

type Msg struct {
	RunType   int       `json:"run_type"`
	Topic     string    `json:"topic"`
	MsgId     string    `json:"msg_id"`
	Offset    int64     `json:"offset"`
	Partition int32     `json:"partition"`
	Timestamp time.Time `json:"timestamp"`
	Body      []byte    `json:"body"`
}

func (*Msg) BodyString

func (m *Msg) BodyString() string

BodyString 返回消息体

type Producer

type Producer interface {
	SendMsg(topic string, body string) (msg Msg, err error)
	SendByteMsg(topic string, body []byte) (msg Msg, err error)
	SendDelayMsg(topic string, body string, delaySecond int64) (mqMsg Msg, err error)
}

func InstanceProducer

func InstanceProducer() (client Producer, err error)

InstanceProducer 实例化生产者

func NewProducer

func NewProducer(groupName string) (client Producer, err error)

NewProducer 初始化生产者实例

func RegisterKafkaProducer

func RegisterKafkaProducer(connOpt KafkaConfig) (client Producer, err error)

RegisterKafkaProducer 注册并启动生产者接口实现

func RegisterPulsarProducer

func RegisterPulsarProducer(config PulsarConf) (client Producer, err error)

RegisterPulsarProducer creates a producer for a specific topic.

func RegisterRedisMqProducer

func RegisterRedisMqProducer(connOpt RedisOption, groupName string) (client Producer)

func RegisterRocketProducer

func RegisterRocketProducer(endPoints []string, groupName string, retry int) (client Producer, err error)

RegisterRocketProducer 注册并启动生产者接口实现

type Pulsar

type Pulsar struct {
	Client   pulsar.Client
	Producer pulsar.Producer
	Consumer pulsar.Consumer
}

func NewPulsar

func NewPulsar(serviceURL string) (*Pulsar, error)

NewPulsar creates a new client with the given service URL.

func (*Pulsar) Close

func (p *Pulsar) Close()

Close closes the client and releases all resources.

func (*Pulsar) ListenReceiveMsgDo

func (p *Pulsar) ListenReceiveMsgDo(topic string, receiveDo func(msg Msg)) (err error)

ListenReceiveMsgDo 消费数据

func (*Pulsar) SendByteMsg

func (p *Pulsar) SendByteMsg(topic string, body []byte) (msg Msg, err error)

SendByteMsg 生产数据

func (*Pulsar) SendDelayMsg

func (p *Pulsar) SendDelayMsg(topic string, body string, delaySecond int64) (msg Msg, err error)

func (*Pulsar) SendMsg

func (p *Pulsar) SendMsg(topic string, body string) (msg Msg, err error)

SendMsg 按字符串类型生产数据

type PulsarConf

type PulsarConf struct {
	Address          []string `json:"address"`
	LogLevel         string   `json:"logLevel"`
	Topic            string   `json:"topic"`
	URL              string   `json:"url"`
	Type             int      `json:"type"`
	SubscriptionName string   `json:"subscriptionName"`
}

type Queue

type Queue interface {
	Start() error

	Stop()
}

type RedisConf

type RedisConf struct {
	Timeout int64 `json:"timeout"`
}

type RedisMq

type RedisMq struct {
	// contains filtered or unexported fields
}

func RegisterRedisMq

func RegisterRedisMq(connOpt RedisOption, groupName string) *RedisMq

RegisterRedisMq 注册redis实例

func (*RedisMq) ListenReceiveMsgDo

func (r *RedisMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg Msg)) (err error)

ListenReceiveMsgDo 消费数据

func (*RedisMq) SendByteMsg

func (r *RedisMq) SendByteMsg(topic string, body []byte) (mqMsg Msg, err error)

SendByteMsg 生产数据

func (*RedisMq) SendDelayMsg

func (r *RedisMq) SendDelayMsg(topic string, body string, delaySecond int64) (msg Msg, err error)

func (*RedisMq) SendMsg

func (r *RedisMq) SendMsg(topic string, body string) (mqMsg Msg, err error)

SendMsg 按字符串类型生产数据

type RedisOption

type RedisOption struct {
	Timeout int64
}

type RocketConf

type RocketConf struct {
	Address  []string `json:"address"`
	LogLevel string   `json:"logLevel"`
}

type RocketMq

type RocketMq struct {
	// contains filtered or unexported fields
}

func RegisterRocketMqConsumer

func RegisterRocketMqConsumer(endPoints []string, groupName string) (mqIns *RocketMq, err error)

RegisterRocketMqConsumer 注册rocketmq消费者

func RegisterRocketMqProducer

func RegisterRocketMqProducer(endPoints []string, groupName string, retry int) (mqIns *RocketMq, err error)

RegisterRocketMqProducer 注册rocketmq生产者

func (*RocketMq) ListenReceiveMsgDo

func (r *RocketMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg Msg)) (err error)

ListenReceiveMsgDo 消费数据

func (*RocketMq) SendByteMsg

func (r *RocketMq) SendByteMsg(topic string, body []byte) (mqMsg Msg, err error)

SendByteMsg 生产数据

func (*RocketMq) SendDelayMsg

func (r *RocketMq) SendDelayMsg(topic string, body string, delaySecond int64) (mqMsg Msg, err error)

func (*RocketMq) SendMsg

func (r *RocketMq) SendMsg(topic string, body string) (mqMsg Msg, err error)

SendMsg 按字符串类型生产数据

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL