Versions in this module Expand all Collapse all v1 v1.0.0 Mar 17, 2020 Changes in this version + const Mb + const SQL92 + const TAG + var ErrBrokerNotFound = errors.New("broker can not found") + var ErrCreated = errors.New("consumer group has been created") + func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue + func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue + func AllocateByConfig(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue + func AllocateByConsistentHash(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue + func AllocateByMachineNearby(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue + func AllocateByMachineRoom(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue + func IsTagType(exp string) bool + func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) + func NewPushConsumer(opts ...Option) (*pushConsumer, error) + func ShutDownStatis() + type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue + type ConsumeFromWhere int + const ConsumeFromFirstOffset + const ConsumeFromLastOffset + const ConsumeFromTimestamp + type ConsumeResult int + const Commit + const ConsumeRetryLater + const ConsumeSuccess + const Rollback + const SuspendCurrentQueueAMoment + type ConsumeResultHolder struct + type ConsumeStatus struct + ConsumeFailedMsgs int64 + ConsumeFailedTPS float64 + ConsumeOKTPS float64 + ConsumeRT float64 + PullRT float64 + PullTPS float64 + func GetConsumeStatus(group, topic string) ConsumeStatus + type ConsumeType string + type ConsumerReturn int + const ExceptionReturn + const FailedReturn + const NullReturn + const SuccessReturn + const TimeoutReturn + type ExpressionType string + type MessageModel int + const BroadCasting + const Clustering + func (mode MessageModel) String() string + type MessageSelector struct + Expression string + Type ExpressionType + type OffsetStore interface + func NewLocalFileOffsetStore(clientID, group string) OffsetStore + func NewRemoteOffsetStore(group string, client internal.RMQClient) OffsetStore + type Option func(*consumerOptions) + func WithConsumeFromWhere(w ConsumeFromWhere) Option + func WithConsumerModel(m MessageModel) Option + func WithConsumerOrder(order bool) Option + func WithCredentials(c primitive.Credentials) Option + func WithGroupName(group string) Option + func WithInterceptor(fs ...primitive.Interceptor) Option + func WithMaxReconsumeTimes(times int32) Option + func WithNameServer(nameServers []string) Option + func WithRetry(retries int) Option + func WithVIPChannel(enable bool) Option + type PullConsumer interface + CurrentOffset func(queue *primitive.MessageQueue) (int64, error) + PersistOffset func(ctx context.Context) error + Pull func(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error) + PullFrom func(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) + Shutdown func() + Start func() + UpdateOffset func(queue *primitive.MessageQueue, offset int64) error + type PullRequest struct + func (pr *PullRequest) String() string + type QueueLock struct