consumer

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2020 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	/**
	 * <ul>
	 * Keywords:
	 * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
	 * </ul>
	 * <p/>
	 * <ul>
	 * Data type:
	 * <li>Boolean, like: TRUE, FALSE</li>
	 * <li>String, like: 'abc'</li>
	 * <li>Decimal, like: 123</li>
	 * <li>Float number, like: 3.1415</li>
	 * </ul>
	 * <p/>
	 * <ul>
	 * Grammar:
	 * <li>{@code AND, OR}</li>
	 * <li>{@code >, >=, <, <=, =}</li>
	 * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
	 * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
	 * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li>
	 * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li>
	 * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li>
	 * </ul>
	 * <p/>
	 * <p>
	 * Example:
	 * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
	 * </p>
	 */
	SQL92 = ExpressionType("SQL92")

	/**
	 * Only support or operation such as
	 * "tag1 || tag2 || tag3", <br>
	 * If null or * expression, meaning subscribe all.
	 */
	TAG = ExpressionType("TAG")
)
View Source
const (
	Mb = 1024 * 1024
)

Variables

View Source
var (
	ErrCreated        = errors.New("consumer group has been created")
	ErrBrokerNotFound = errors.New("broker can not found")
)

Functions

func AllocateByAveragely

func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue

func AllocateByAveragelyCircle

func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue

func AllocateByConfig

func AllocateByConfig(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue

func AllocateByConsistentHash

func AllocateByConsistentHash(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue

func AllocateByMachineNearby

func AllocateByMachineNearby(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue

TODO

func AllocateByMachineRoom

func AllocateByMachineRoom(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue

func IsTagType

func IsTagType(exp string) bool

func NewPullConsumer

func NewPullConsumer(options ...Option) (*defaultPullConsumer, error)

func NewPushConsumer

func NewPushConsumer(opts ...Option) (*pushConsumer, error)

func ShutDownStatis

func ShutDownStatis()

Types

type AllocateStrategy

type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue

type ConsumeFromWhere

type ConsumeFromWhere int

Consuming point on consumer booting. </p>

There are three consuming points: <ul> <li> <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously. If it were a newly booting up consumer client, according aging of the consumer group, there are two cases: <ol> <li> if the consumer group is created so recently that the earliest message being subscribed has yet expired, which means the consumer group represents a lately launched business, consuming will start from the very beginning; </li> <li> if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning messages born prior to the booting timestamp would be ignored. </li> </ol> </li> <li> <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available. </li> <li> <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means messages born prior to {@link #consumeTimestamp} will be ignored </li> </ul>

const (
	ConsumeFromLastOffset ConsumeFromWhere = iota
	ConsumeFromFirstOffset
	ConsumeFromTimestamp
)

type ConsumeResult

type ConsumeResult int
const (
	ConsumeSuccess ConsumeResult = iota
	ConsumeRetryLater
	Commit
	Rollback
	SuspendCurrentQueueAMoment
)

type ConsumeResultHolder

type ConsumeResultHolder struct {
	ConsumeResult
}

type ConsumeStatus

type ConsumeStatus struct {
	PullRT            float64
	PullTPS           float64
	ConsumeRT         float64
	ConsumeOKTPS      float64
	ConsumeFailedTPS  float64
	ConsumeFailedMsgs int64
}

func GetConsumeStatus

func GetConsumeStatus(group, topic string) ConsumeStatus

type ConsumeType

type ConsumeType string

type ConsumerReturn

type ConsumerReturn int
const (
	SuccessReturn ConsumerReturn = iota
	ExceptionReturn
	NullReturn
	TimeoutReturn
	FailedReturn
)

type ExpressionType

type ExpressionType string

type MessageModel

type MessageModel int

Message model defines the way how messages are delivered to each consumer clients. </p>

RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with the same {@link #ConsumerGroup} would only consume shards of the messages subscribed, which achieves load balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages separately. </p>

This field defaults to clustering.

const (
	BroadCasting MessageModel = iota
	Clustering
)

func (MessageModel) String

func (mode MessageModel) String() string

type MessageSelector

type MessageSelector struct {
	Type       ExpressionType
	Expression string
}

type OffsetStore

type OffsetStore interface {
	// contains filtered or unexported methods
}

func NewLocalFileOffsetStore

func NewLocalFileOffsetStore(clientID, group string) OffsetStore

func NewRemoteOffsetStore

func NewRemoteOffsetStore(group string, client internal.RMQClient) OffsetStore

type Option

type Option func(*consumerOptions)

func WithConsumeFromWhere

func WithConsumeFromWhere(w ConsumeFromWhere) Option

func WithConsumerModel

func WithConsumerModel(m MessageModel) Option

func WithConsumerOrder

func WithConsumerOrder(order bool) Option

func WithCredentials

func WithCredentials(c primitive.Credentials) Option

func WithGroupName

func WithGroupName(group string) Option

WithGroupName set group name address

func WithInterceptor

func WithInterceptor(fs ...primitive.Interceptor) Option

WithChainConsumerInterceptor returns a ConsumerOption that specifies the chained interceptor for consumer. The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper around the real call.

func WithMaxReconsumeTimes

func WithMaxReconsumeTimes(times int32) Option

WithMaxReconsumeTimes set MaxReconsumeTimes of options, if message reconsume greater than MaxReconsumeTimes, it will be sent to retry or dlq topic. more info reference by examples/consumer/retry.

func WithNameServer

func WithNameServer(nameServers []string) Option

WithNameServer set NameServer address, only support one NameServer cluster in alpha2

func WithRetry

func WithRetry(retries int) Option

WithRetry return a Option that specifies the retry times when send failed. TODO: use retry middleware instead

func WithVIPChannel

func WithVIPChannel(enable bool) Option

type PullConsumer

type PullConsumer interface {
	// Start
	Start()

	// Shutdown refuse all new pull operation, finish all submitted.
	Shutdown()

	// Pull pull message of topic,  selector indicate which queue to pull.
	Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error)

	// PullFrom pull messages of queue from the offset to offset + numbers
	PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)

	// updateOffset update offset of queue in mem
	UpdateOffset(queue *primitive.MessageQueue, offset int64) error

	// PersistOffset persist all offset in mem.
	PersistOffset(ctx context.Context) error

	// CurrentOffset return the current offset of queue in mem.
	CurrentOffset(queue *primitive.MessageQueue) (int64, error)
}

type PullRequest

type PullRequest struct {
	// contains filtered or unexported fields
}

func (*PullRequest) String

func (pr *PullRequest) String() string

type QueueLock

type QueueLock struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL