Documentation ¶
Index ¶
- Constants
- Variables
- func GetConsumerManagerKey(topic []string, consumerGroup string) string
- type Client
- type Config
- type ConsumerFacade
- type KafkaConsumerConfig
- type KafkaConsumerFacade
- type KafkaProducerConfig
- type KafkaProducerFacade
- type MQAction
- type MQMsgPush
- type MQOptions
- type MQProduceRequest
- type MQSubscribeRequest
- type MQType
- type MQUnSubscribeRequest
- type Metadata
- type MetadataRetry
- type Option
- type Producer
- type ProducerFacade
Constants ¶
View Source
const ( MQActionPublish = 1 + iota MQActionSubscribe MQActionUnSubscribe )
Variables ¶
View Source
var MQActionIntToStr = map[MQAction]string{ MQActionPublish: "publish", MQActionSubscribe: "subscribe", MQActionUnSubscribe: "unsubscribe", }
View Source
var MQActionStrToInt = map[string]MQAction{ "publish": MQActionPublish, "subscribe": MQActionSubscribe, "unsubscribe": MQActionUnSubscribe, }
Functions ¶
func GetConsumerManagerKey ¶
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewMQClient ¶
func NewSingletonMQClient ¶
type Config ¶
type Config struct { ClientID string `yaml:"client_id" json:"client_id"` Endpoints string `yaml:"endpoints" json:"endpoints"` MqType MQType `yaml:"type" json:"type"` Retry int `yaml:"retry" json:"retry" default:"5"` Timeout time.Duration `yaml:"timeout" json:"timeout" default:"2s"` KafkaConsumerConfig KafkaConsumerConfig `yaml:"kafka_consumer_config" json:"kafka_consumer_config"` KafkaProducerConfig KafkaProducerConfig `yaml:"kafka_producer_config" json:"kafka_producer_config"` }
type ConsumerFacade ¶
type KafkaConsumerConfig ¶
type KafkaConsumerFacade ¶
type KafkaConsumerFacade struct {
// contains filtered or unexported fields
}
func NewKafkaConsumerFacade ¶
func NewKafkaConsumerFacade(config KafkaConsumerConfig, consumerGroup string) (*KafkaConsumerFacade, error)
func (*KafkaConsumerFacade) Stop ¶
func (f *KafkaConsumerFacade) Stop()
func (*KafkaConsumerFacade) Subscribe ¶
func (f *KafkaConsumerFacade) Subscribe(ctx context.Context, opts ...Option) error
func (*KafkaConsumerFacade) UnSubscribe ¶
func (f *KafkaConsumerFacade) UnSubscribe(opts ...Option) error
type KafkaProducerConfig ¶
type KafkaProducerConfig struct { Brokers []string `yaml:"brokers" json:"brokers"` ProtocolVersion string `yaml:"protocol_version" json:"protocol_version"` Metadata Metadata `yaml:"metadata" json:"metadata"` Producer Producer `yaml:"producer" json:"producer"` Timeout time.Duration `yaml:"timeout" json:"timeout"` }
type KafkaProducerFacade ¶
type KafkaProducerFacade struct {
// contains filtered or unexported fields
}
func NewKafkaProviderFacade ¶
func NewKafkaProviderFacade(config KafkaProducerConfig) (*KafkaProducerFacade, error)
type MQOptions ¶
MQOptions Consumer options TODO: Add rocketmq params
func DefaultOptions ¶
func DefaultOptions() *MQOptions
type MQProduceRequest ¶
type MQSubscribeRequest ¶
type MQSubscribeRequest struct { TopicList []string `json:"topic_list"` ConsumerGroup string `json:"consumer_group"` ConsumeUrl string `json:"consume_url"` // not empty when subscribe msg, eg: http://10.0.0.1:11451/consume CheckUrl string `json:"check_url"` // not empty when subscribe msg, eg: http://10.0.0.1:11451/health }
MQSubscribeRequest url format http://domain/publish/broker/topic
type MQUnSubscribeRequest ¶
type MQUnSubscribeRequest struct {
ConsumerGroup string `json:"consumer_group"`
}
MQUnSubscribeRequest url format http://domain/publish/broker/topic
type Metadata ¶
type Metadata struct { Full bool `yaml:"full" json:"full"` Retry MetadataRetry `yaml:"retry" json:"retry"` }
type MetadataRetry ¶
type Option ¶
type Option func(o *MQOptions)
func WithCheckUrl ¶
func WithConsumeUrl ¶
func WithConsumerGroup ¶
func WithTopics ¶
type Producer ¶
type Producer struct {
MaxMessageBytes int `yaml:"max_message_bytes" json:"max_message_bytes"`
}
type ProducerFacade ¶
Click to show internal directories.
Click to hide internal directories.