Documentation ¶
Index ¶
- Constants
- Variables
- func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue
- func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue
- func AllocateByConfig(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue
- func AllocateByConsistentHash(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue
- func AllocateByMachineNearby(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue
- func AllocateByMachineRoom(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, ...) []*primitive.MessageQueue
- func IsTagType(exp string) bool
- func NewPullConsumer(options ...Option) (*defaultPullConsumer, error)
- func NewPushConsumer(opts ...Option) (*pushConsumer, error)
- func ShutDownStatis()
- type AllocateStrategy
- type ConsumeFromWhere
- type ConsumeResult
- type ConsumeResultHolder
- type ConsumeStatus
- type ConsumeType
- type ConsumerReturn
- type ExpressionType
- type MessageModel
- type MessageSelector
- type OffsetStore
- type Option
- func WithConsumeFromWhere(w ConsumeFromWhere) Option
- func WithConsumerModel(m MessageModel) Option
- func WithConsumerOrder(order bool) Option
- func WithCredentials(c primitive.Credentials) Option
- func WithGroupName(group string) Option
- func WithInterceptor(fs ...primitive.Interceptor) Option
- func WithMaxReconsumeTimes(times int32) Option
- func WithNameServer(nameServers []string) Option
- func WithRetry(retries int) Option
- func WithVIPChannel(enable bool) Option
- type PullConsumer
- type PullRequest
- type QueueLock
Constants ¶
const ( /** * <ul> * Keywords: * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li> * </ul> * <p/> * <ul> * Data type: * <li>Boolean, like: TRUE, FALSE</li> * <li>String, like: 'abc'</li> * <li>Decimal, like: 123</li> * <li>Float number, like: 3.1415</li> * </ul> * <p/> * <ul> * Grammar: * <li>{@code AND, OR}</li> * <li>{@code >, >=, <, <=, =}</li> * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li> * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li> * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li> * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li> * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li> * </ul> * <p/> * <p> * Example: * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE) * </p> */ SQL92 = ExpressionType("SQL92") /** * Only support or operation such as * "tag1 || tag2 || tag3", <br> * If null or * expression, meaning subscribe all. */ TAG = ExpressionType("TAG") )
const (
Mb = 1024 * 1024
)
Variables ¶
var ( ErrCreated = errors.New("consumer group has been created") ErrBrokerNotFound = errors.New("broker can not found") )
Functions ¶
func AllocateByAveragely ¶
func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue
func AllocateByAveragelyCircle ¶
func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue
func AllocateByConfig ¶
func AllocateByConfig(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue
func AllocateByConsistentHash ¶
func AllocateByConsistentHash(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue
func AllocateByMachineNearby ¶
func AllocateByMachineNearby(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue
TODO
func AllocateByMachineRoom ¶
func AllocateByMachineRoom(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue
func NewPullConsumer ¶
func NewPushConsumer ¶
func ShutDownStatis ¶
func ShutDownStatis()
Types ¶
type AllocateStrategy ¶
type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
type ConsumeFromWhere ¶
type ConsumeFromWhere int
Consuming point on consumer booting. </p>
There are three consuming points: <ul> <li> <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously. If it were a newly booting up consumer client, according aging of the consumer group, there are two cases: <ol> <li> if the consumer group is created so recently that the earliest message being subscribed has yet expired, which means the consumer group represents a lately launched business, consuming will start from the very beginning; </li> <li> if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning messages born prior to the booting timestamp would be ignored. </li> </ol> </li> <li> <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available. </li> <li> <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means messages born prior to {@link #consumeTimestamp} will be ignored </li> </ul>
const ( ConsumeFromLastOffset ConsumeFromWhere = iota ConsumeFromFirstOffset ConsumeFromTimestamp )
type ConsumeResult ¶
type ConsumeResult int
const ( ConsumeSuccess ConsumeResult = iota ConsumeRetryLater Commit Rollback SuspendCurrentQueueAMoment )
type ConsumeResultHolder ¶
type ConsumeResultHolder struct {
ConsumeResult
}
type ConsumeStatus ¶
type ConsumeStatus struct { PullRT float64 PullTPS float64 ConsumeRT float64 ConsumeOKTPS float64 ConsumeFailedTPS float64 ConsumeFailedMsgs int64 }
func GetConsumeStatus ¶
func GetConsumeStatus(group, topic string) ConsumeStatus
type ConsumeType ¶
type ConsumeType string
type ConsumerReturn ¶
type ConsumerReturn int
const ( SuccessReturn ConsumerReturn = iota ExceptionReturn NullReturn TimeoutReturn FailedReturn )
type ExpressionType ¶
type ExpressionType string
type MessageModel ¶
type MessageModel int
Message model defines the way how messages are delivered to each consumer clients. </p>
RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with the same {@link #ConsumerGroup} would only consume shards of the messages subscribed, which achieves load balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages separately. </p>
This field defaults to clustering.
const ( BroadCasting MessageModel = iota Clustering )
func (MessageModel) String ¶
func (mode MessageModel) String() string
type MessageSelector ¶
type MessageSelector struct { Type ExpressionType Expression string }
type OffsetStore ¶
type OffsetStore interface {
// contains filtered or unexported methods
}
func NewLocalFileOffsetStore ¶
func NewLocalFileOffsetStore(clientID, group string) OffsetStore
func NewRemoteOffsetStore ¶
func NewRemoteOffsetStore(group string, client internal.RMQClient) OffsetStore
type Option ¶
type Option func(*consumerOptions)
func WithConsumeFromWhere ¶
func WithConsumeFromWhere(w ConsumeFromWhere) Option
func WithConsumerModel ¶
func WithConsumerModel(m MessageModel) Option
func WithConsumerOrder ¶
func WithCredentials ¶
func WithCredentials(c primitive.Credentials) Option
func WithInterceptor ¶
func WithInterceptor(fs ...primitive.Interceptor) Option
WithChainConsumerInterceptor returns a ConsumerOption that specifies the chained interceptor for consumer. The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper around the real call.
func WithMaxReconsumeTimes ¶
WithMaxReconsumeTimes set MaxReconsumeTimes of options, if message reconsume greater than MaxReconsumeTimes, it will be sent to retry or dlq topic. more info reference by examples/consumer/retry.
func WithNameServer ¶
WithNameServer set NameServer address, only support one NameServer cluster in alpha2
func WithRetry ¶
WithRetry return a Option that specifies the retry times when send failed. TODO: use retry middleware instead
func WithVIPChannel ¶
type PullConsumer ¶
type PullConsumer interface { // Start Start() // Shutdown refuse all new pull operation, finish all submitted. Shutdown() // Pull pull message of topic, selector indicate which queue to pull. Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error) // PullFrom pull messages of queue from the offset to offset + numbers PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) // updateOffset update offset of queue in mem UpdateOffset(queue *primitive.MessageQueue, offset int64) error // PersistOffset persist all offset in mem. PersistOffset(ctx context.Context) error // CurrentOffset return the current offset of queue in mem. CurrentOffset(queue *primitive.MessageQueue) (int64, error) }
type PullRequest ¶
type PullRequest struct {
// contains filtered or unexported fields
}
func (*PullRequest) String ¶
func (pr *PullRequest) String() string