mq

package
v1.0.0-rc3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MQActionPublish = 1 + iota
	MQActionSubscribe
	MQActionUnSubscribe
)

Variables

View Source
var MQActionIntToStr = map[MQAction]string{
	MQActionPublish:     "publish",
	MQActionSubscribe:   "subscribe",
	MQActionUnSubscribe: "unsubscribe",
}
View Source
var MQActionStrToInt = map[string]MQAction{
	"publish":     MQActionPublish,
	"subscribe":   MQActionSubscribe,
	"unsubscribe": MQActionUnSubscribe,
}

Functions

func GetConsumerManagerKey

func GetConsumerManagerKey(topic []string, consumerGroup string) string

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewMQClient

func NewMQClient(config Config) (*Client, error)

func NewSingletonMQClient

func NewSingletonMQClient(config Config) *Client

func (Client) Apply

func (c Client) Apply() error

func (Client) Call

func (c Client) Call(req *client.Request) (res interface{}, err error)

func (Client) Close

func (c Client) Close() error

func (Client) MapParams

func (c Client) MapParams(req *client.Request) (reqData interface{}, err error)

type Config

type Config struct {
	ClientID            string              `yaml:"client_id" json:"client_id"`
	Endpoints           string              `yaml:"endpoints" json:"endpoints"`
	MqType              MQType              `yaml:"type" json:"type"`
	Retry               int                 `yaml:"retry" json:"retry" default:"5"`
	Timeout             time.Duration       `yaml:"timeout" json:"timeout" default:"2s"`
	KafkaConsumerConfig KafkaConsumerConfig `yaml:"kafka_consumer_config" json:"kafka_consumer_config"`
	KafkaProducerConfig KafkaProducerConfig `yaml:"kafka_producer_config" json:"kafka_producer_config"`
}

type ConsumerFacade

type ConsumerFacade interface {
	// Subscribe message with specified broker and Topic, then handle msg with handler which send msg to real consumers
	Subscribe(ctx context.Context, option ...Option) error
	UnSubscribe(opts ...Option) error
	Stop()
}

type KafkaConsumerConfig

type KafkaConsumerConfig struct {
	Brokers         []string `yaml:"brokers" json:"brokers"`
	ProtocolVersion string   `yaml:"protocol_version" json:"protocol_version"`
	ClientID        string   `yaml:"client_id" json:"client_id"`
	Metadata        Metadata `yaml:"metadata" json:"metadata"`
}

type KafkaConsumerFacade

type KafkaConsumerFacade struct {
	// contains filtered or unexported fields
}

func NewKafkaConsumerFacade

func NewKafkaConsumerFacade(config KafkaConsumerConfig, consumerGroup string) (*KafkaConsumerFacade, error)

func (*KafkaConsumerFacade) Stop

func (f *KafkaConsumerFacade) Stop()

func (*KafkaConsumerFacade) Subscribe

func (f *KafkaConsumerFacade) Subscribe(ctx context.Context, opts ...Option) error

func (*KafkaConsumerFacade) UnSubscribe

func (f *KafkaConsumerFacade) UnSubscribe(opts ...Option) error

type KafkaProducerConfig

type KafkaProducerConfig struct {
	Brokers         []string      `yaml:"brokers" json:"brokers"`
	ProtocolVersion string        `yaml:"protocol_version" json:"protocol_version"`
	Metadata        Metadata      `yaml:"metadata" json:"metadata"`
	Producer        Producer      `yaml:"producer" json:"producer"`
	Timeout         time.Duration `yaml:"timeout" json:"timeout"`
}

type KafkaProducerFacade

type KafkaProducerFacade struct {
	// contains filtered or unexported fields
}

func NewKafkaProviderFacade

func NewKafkaProviderFacade(config KafkaProducerConfig) (*KafkaProducerFacade, error)

func (*KafkaProducerFacade) Send

func (k *KafkaProducerFacade) Send(msgs []string, opts ...Option) error

type MQAction

type MQAction int

type MQMsgPush

type MQMsgPush struct {
	Msg []string `json:"msg"`
}

type MQOptions

type MQOptions struct {
	TopicList     []string
	ConsumeUrl    string
	CheckUrl      string
	ConsumerGroup string
}

MQOptions Consumer options TODO: Add rocketmq params

func DefaultOptions

func DefaultOptions() *MQOptions

func (*MQOptions) ApplyOpts

func (o *MQOptions) ApplyOpts(opts ...Option)

type MQProduceRequest

type MQProduceRequest struct {
	Topic string   `json:"topic"`
	Msg   []string `json:"msg"`
}

type MQSubscribeRequest

type MQSubscribeRequest struct {
	TopicList     []string `json:"topic_list"`
	ConsumerGroup string   `json:"consumer_group"`
	ConsumeUrl    string   `json:"consume_url"` // not empty when subscribe msg, eg: http://10.0.0.1:11451/consume
	CheckUrl      string   `json:"check_url"`   // not empty when subscribe msg, eg: http://10.0.0.1:11451/health
}

MQSubscribeRequest url format http://domain/publish/broker/topic

type MQType

type MQType string

type MQUnSubscribeRequest

type MQUnSubscribeRequest struct {
	ConsumerGroup string `json:"consumer_group"`
}

MQUnSubscribeRequest url format http://domain/publish/broker/topic

type Metadata

type Metadata struct {
	Full  bool          `yaml:"full" json:"full"`
	Retry MetadataRetry `yaml:"retry" json:"retry"`
}

type MetadataRetry

type MetadataRetry struct {
	Max     int           `yaml:"max" json:"max"`
	Backoff time.Duration `yaml:"backoff" json:"backoff"`
}

type Option

type Option func(o *MQOptions)

func WithCheckUrl

func WithCheckUrl(ck string) Option

func WithConsumeUrl

func WithConsumeUrl(ch string) Option

func WithConsumerGroup

func WithConsumerGroup(cp string) Option

func WithTopic

func WithTopic(t string) Option

func WithTopics

func WithTopics(t []string) Option

type Producer

type Producer struct {
	MaxMessageBytes int `yaml:"max_message_bytes" json:"max_message_bytes"`
}

type ProducerFacade

type ProducerFacade interface {
	// Send msg to specified broker and Topic
	Send(msgs []string, opts ...Option) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL