Documentation
¶
Index ¶
- type Config
- type MQFaultStrategy
- type MessageQueueSelector
- type Producer
- func (p *Producer) Group() string
- func (p *Producer) NeedUpdateTopicPublish(topic string) bool
- func (p *Producer) PublishTopics() []string
- func (p *Producer) SendBatchSync(batch *message.Batch) (sendResult *SendResult, err error)
- func (p *Producer) SendSync(m *message.Message) (sendResult *SendResult, err error)
- func (p *Producer) SendSyncWithSelector(m *message.Message, s MessageQueueSelector, arg interface{}) (sendResult *SendResult, err error)
- func (p *Producer) Unpublish(topic string) bool
- func (p *Producer) UpdateTopicPublish(topic string, router *route.TopicRouter)
- type SendResult
- type SendStatus
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { rocketmq.Client SendMsgTimeout time.Duration CompressSizeThreshod int32 CompressLevel int32 RetryTimesWhenSendFailed int32 RetryTimesWhenSendAsyncFailed int32 RetryAnotherBrokerWhenNotStoreOK bool MaxMessageSize int32 CreateTopicKey string DefaultTopicQueueNums int32 }
Config the configuration of producer
type MQFaultStrategy ¶
type MQFaultStrategy struct {
// contains filtered or unexported fields
}
MQFaultStrategy the strategy of fault
func NewMQFaultStrategy ¶
func NewMQFaultStrategy(sendEnable bool) *MQFaultStrategy
NewMQFaultStrategy creates on fault strategy
func (*MQFaultStrategy) Available ¶
func (s *MQFaultStrategy) Available(broker string) bool
Available returns true if the broker can server, false otherwise
func (*MQFaultStrategy) SelectOneQueue ¶
func (s *MQFaultStrategy) SelectOneQueue(tp topicRouter, exculdedBroker string) *message.Queue
SelectOneQueue select one message queue to send message
func (*MQFaultStrategy) String ¶
func (s *MQFaultStrategy) String() string
func (*MQFaultStrategy) UpdateFault ¶
func (s *MQFaultStrategy) UpdateFault(broker string, latency time.Duration, isolation bool)
UpdateFault update the latency
type MessageQueueSelector ¶
type MessageQueueSelector interface {
Select(mqs []*message.Queue, m *message.Message, arg interface{}) *message.Queue
}
MessageQueueSelector select the message queue
type Producer ¶
type Producer struct { Config rocketmq.Server // contains filtered or unexported fields }
Producer sends messages
func (*Producer) NeedUpdateTopicPublish ¶
NeedUpdateTopicPublish returns true if the published topic's consume queue is empty otherwise false
func (*Producer) PublishTopics ¶
PublishTopics returns the topics published by the producer
func (*Producer) SendBatchSync ¶
func (p *Producer) SendBatchSync(batch *message.Batch) (sendResult *SendResult, err error)
SendBatchSync send the batch message sync
func (*Producer) SendSync ¶
func (p *Producer) SendSync(m *message.Message) (sendResult *SendResult, err error)
SendSync sends the message the message must not be nil
func (*Producer) SendSyncWithSelector ¶
func (p *Producer) SendSyncWithSelector( m *message.Message, s MessageQueueSelector, arg interface{}, ) ( sendResult *SendResult, err error, )
SendSyncWithSelector send the message sync with the message queue selector
func (*Producer) UpdateTopicPublish ¶
func (p *Producer) UpdateTopicPublish(topic string, router *route.TopicRouter)
UpdateTopicPublish updates the published information it always update the publish data, even no message sent under the topic by now the router must not be nil
type SendResult ¶
type SendResult struct { Status SendStatus UniqID string QueueOffset int64 Queue *message.Queue RegionID string OffsetID string TraceOn bool TransactionID string }
SendResult the send message result
func (*SendResult) String ¶
func (s *SendResult) String() string
type SendStatus ¶
type SendStatus int8
const ( OK SendStatus = iota FlushDiskTimeout FlushSlaveTimeout SlaveNotAvailable )
func (SendStatus) String ¶
func (s SendStatus) String() string