queue

package
v0.0.0-...-92398f1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2024 License: AGPL-3.0 Imports: 16 Imported by: 13

Documentation

Index

Constants

View Source
const BucketWhoOwnsThisTopic = "who_owns_this_topic" //queue_group: node_id/timestamp
View Source
const ConsumerBucket = "queue_consumers"

Variables

This section is empty.

Functions

func Close

func Close(k *QueueConfig) error

func CommitOffset

func CommitOffset(k *QueueConfig, consumer *ConsumerConfig, offset Offset) (bool, error)

func ConsumerHasLag

func ConsumerHasLag(k *QueueConfig, c *ConsumerConfig) bool

func ConvertOffset

func ConvertOffset(offsetStr string) (int64, int64)

func DeleteOffset

func DeleteOffset(k *QueueConfig, consumer *ConsumerConfig) error

func Depth

func Depth(k *QueueConfig) int64

func Destroy

func Destroy(k *QueueConfig) error

func ExtendConvertOffset

func ExtendConvertOffset(offsetStr string) (int64, int64, int64)

func GetAllConfigBytes

func GetAllConfigBytes() []byte

func GetAllConfigs

func GetAllConfigs() map[string]*QueueConfig

func GetConsumerConfigsByQueueID

func GetConsumerConfigsByQueueID(queueID string) (map[string]*ConsumerConfig, bool)

func GetEarlierOffsetByQueueID

func GetEarlierOffsetByQueueID(queueID string) (consumerSize int, segment int64, pos, ver int64)

func GetLatestOffsetByQueueID

func GetLatestOffsetByQueueID(queueID string) (consumerSize int, segment int64, pos int64)

func GetQueues

func GetQueues() map[string][]string

func GetStorageSize

func GetStorageSize(k string) uint64

func HasLag

func HasLag(k *QueueConfig) bool

func IniQueue

func IniQueue(k *QueueConfig)

func IsConfigExists

func IsConfigExists(key string) bool

func Itoa64

func Itoa64(i int64) string

func Pop

func Pop(k *QueueConfig) ([]byte, error)

func PopTimeout

func PopTimeout(k *QueueConfig, timeoutInSeconds time.Duration) (data []byte, timeout bool, err error)

func Push

func Push(k *QueueConfig, v []byte) error

func Register

func Register(name string, h QueueAPI)

func RegisterConfig

func RegisterConfig(cfg *QueueConfig) (preExists bool, err error)

func RegisterConsumer

func RegisterConsumer(queueID string, consumer *ConsumerConfig) (bool, error)

func RegisterConsumerConfigChangeListener

func RegisterConsumerConfigChangeListener(l func(id string, configs map[string]*ConsumerConfig))

func RegisterDefaultHandler

func RegisterDefaultHandler(h QueueAPI)

func RegisterQueueConfigChangeListener

func RegisterQueueConfigChangeListener(l func(cfg *QueueConfig))

func ReleaseConsumer

func ReleaseConsumer(k *QueueConfig, c *ConsumerConfig, consumer ConsumerAPI) error

func RemoveAllConsumers

func RemoveAllConsumers(qConfig *QueueConfig) (bool, error)

func RemoveConfig

func RemoveConfig(cfg *QueueConfig) bool

func RemoveConsumer

func RemoveConsumer(queueID string, consumerKey string) (bool, error)

func ReturnQueueConfig

func ReturnQueueConfig(cfg *QueueConfig)

func TriggerChangeEvent

func TriggerChangeEvent(queueID string, cfgs map[string]*ConsumerConfig, async bool)

Types

type AdvancedQueueAPI

type AdvancedQueueAPI interface {
	QueueAPI

	LatestOffset(k *QueueConfig) Offset
	GetOffset(k *QueueConfig, consumer *ConsumerConfig) (Offset, error)
	DeleteOffset(k *QueueConfig, consumer *ConsumerConfig) error
	CommitOffset(k *QueueConfig, consumer *ConsumerConfig, offset Offset) (bool, error)

	AcquireConsumer(k *QueueConfig, consumer *ConsumerConfig) (ConsumerAPI, error)
	ReleaseConsumer(k *QueueConfig, c *ConsumerConfig, consumer ConsumerAPI) error

	AcquireProducer(cfg *QueueConfig) (ProducerAPI, error)
	ReleaseProducer(k *QueueConfig, producer ProducerAPI) error
}

type ConsumerAPI

type ConsumerAPI interface {
	Close() error
	ResetOffset(segment, readPos int64) (err error)
	FetchMessages(ctx *Context, numOfMessages int) (messages []Message, isTimeout bool, err error)
	CommitOffset(offset Offset) error
}

func AcquireConsumer

func AcquireConsumer(k *QueueConfig, consumer *ConsumerConfig, clientID string) (ConsumerAPI, error)

type ConsumerConfig

type ConsumerConfig struct {
	orm.ORMObjectBase

	Source           string `config:"source" json:"source,omitempty"`
	Group            string `config:"group" json:"group,omitempty"`
	Name             string `config:"name" json:"name,omitempty"`
	AutoResetOffset  string `config:"auto_reset_offset" json:"auto_reset_offset,omitempty"`
	AutoCommitOffset bool   `config:"auto_commit_offset" json:"auto_commit_offset,omitempty"`

	//don't add queue id to generated sliced consumer group
	SimpleSlicedGroup bool `config:"simple_sliced_group" json:"simple_sliced_group,omitempty"`

	FetchMinBytes           int   `config:"fetch_min_bytes" json:"fetch_min_bytes,omitempty"`
	FetchMaxBytes           int   `config:"fetch_max_bytes" json:"fetch_max_bytes,omitempty"`
	FetchMaxMessages        int   `config:"fetch_max_messages" json:"fetch_max_messages,omitempty"`
	FetchMaxWaitMs          int64 `config:"fetch_max_wait_ms" json:"fetch_max_wait_ms,omitempty"`
	ConsumeTimeoutInSeconds int   `config:"consume_timeout" json:"consume_timeout,omitempty"`
	EOFMaxRetryTimes        int   `config:"eof_max_retry_times" json:"eof_max_retry_times,omitempty"`
	EOFRetryDelayInMs       int64 `config:"eof_retry_delay_in_ms" json:"eof_retry_delay_in_ms,omitempty"`

	ClientExpiredInSeconds int64 `config:"client_expired_in_seconds" json:"client_expired_in_seconds,omitempty"` //client acquires lock for this long

	CommitLocker sync.Mutex
	// contains filtered or unexported fields
}

func GetConsumerConfig

func GetConsumerConfig(queueID, group, name string) (*ConsumerConfig, bool)

func GetConsumerConfigID

func GetConsumerConfigID(queueID, consumerID string) (*ConsumerConfig, bool)

func GetOrInitConsumerConfig

func GetOrInitConsumerConfig(queueID, group, name string) *ConsumerConfig

func NewConsumerConfig

func NewConsumerConfig(queueID, group, name string) *ConsumerConfig

func (*ConsumerConfig) GetFetchMaxWaitMs

func (cfg *ConsumerConfig) GetFetchMaxWaitMs() time.Duration

func (*ConsumerConfig) GetLastActiveTime

func (cfg *ConsumerConfig) GetLastActiveTime() *time.Time

func (*ConsumerConfig) KeepActive

func (cfg *ConsumerConfig) KeepActive()

func (*ConsumerConfig) Key

func (cfg *ConsumerConfig) Key() string

func (*ConsumerConfig) String

func (cfg *ConsumerConfig) String() string

type ConsumerInstanceInfo

type ConsumerInstanceInfo struct {
	ID        string    `config:"id" json:"id,omitempty"`
	Timestamp time.Time `config:"timestamp" json:"timestamp,omitempty"`
}

type Context

type Context struct {
	MessageCount int    `config:"message_count" json:"message_count"`
	NextOffset   Offset `config:"next_offset" json:"next_offset"`
	InitOffset   Offset `config:"init_offset" json:"init_offset"`
}

func (*Context) String

func (c *Context) String() string

func (*Context) UpdateInitOffset

func (c *Context) UpdateInitOffset(seg, pos, ver int64)

func (*Context) UpdateNextOffset

func (c *Context) UpdateNextOffset(seg, pos int64)

func (*Context) Valid

func (c *Context) Valid() bool

type Message

type Message struct {
	Timestamp  int64  `config:"timestamp" json:"timestamp" parquet:"timestamp"`
	Offset     Offset `config:"offset" json:"offset"  parquet:"offset"`                //current offset
	NextOffset Offset `config:"next_offset" json:"next_offset"  parquet:"next_offset"` //offset for next message
	Size       int    `config:"size" json:"size"  parquet:"size"`
	Data       []byte `config:"data" json:"data"  parquet:"data,zstd"`
}

func (*Message) String

func (m *Message) String() string

type Offset

type Offset struct {
	Segment  int64 `json:"segment"`
	Position int64 `json:"position"`
	Version  int64 `json:"version"`
}

func AcquireOffset

func AcquireOffset(seg, pos int64) Offset

func DecodeFromString

func DecodeFromString(offsetStr string) Offset

func GetEarlierOffsetStrByQueueID

func GetEarlierOffsetStrByQueueID(queueID string) Offset

func GetOffset

func GetOffset(k *QueueConfig, consumer *ConsumerConfig) (Offset, error)

func LatestOffset

func LatestOffset(k *QueueConfig) Offset

func NewOffset

func NewOffset(seg, pos int64) Offset

func NewOffsetWithVersion

func NewOffsetWithVersion(seg, pos, ver int64) Offset

func (*Offset) EncodeToString

func (c *Offset) EncodeToString() string

func (*Offset) Equals

func (c *Offset) Equals(v Offset) bool

func (*Offset) LatestThan

func (c *Offset) LatestThan(v Offset) bool

func (*Offset) String

func (c *Offset) String() string

type ProduceRequest

type ProduceRequest struct {
	Topic string `config:"topic" json:"topic"` //queue_id
	Key   []byte `config:"key" json:"key"`
	Data  []byte `config:"data" json:"data"`
}

type ProduceResponse

type ProduceResponse struct {
	Topic     string `config:"topic" json:"topic"`
	Partition int64  `config:"partition" json:"partition"`
	Offset    Offset `config:"offset" json:"offset"`
	Timestamp int64  `config:"timestamp" json:"timestamp"`
}

type ProducerAPI

type ProducerAPI interface {
	Produce(*[]ProduceRequest) (*[]ProduceResponse, error)
	Close() error
}

func AcquireProducer

func AcquireProducer(cfg *QueueConfig) (ProducerAPI, error)

type QueueAPI

type QueueAPI interface {
	Name() string
	Init(string) error
	Close(string) error
	GetStorageSize(k string) uint64
	Destroy(string) error
	GetQueues() []string

	Push(string, []byte) error
}

func GetHandlerByType

func GetHandlerByType(t string) QueueAPI

type QueueConfig

type QueueConfig struct {
	ID      string      `config:"id" json:"id,omitempty"`     //uuid for each queue
	Name    string      `config:"name" json:"name,omitempty"` //unique name of each queue
	Source  string      `config:"source" json:"source,omitempty"`
	Codec   string      `config:"codec" json:"codec,omitempty"`
	Type    string      `config:"type" json:"type,omitempty"`
	Created string      `config:"created" json:"created,omitempty"`
	Labels  util.MapStr `config:"label" json:"label,omitempty"`
	sync.RWMutex
}

func AcquireQueueConfig

func AcquireQueueConfig() *QueueConfig

func AdvancedGetOrInitConfig

func AdvancedGetOrInitConfig(queueType, key string, labels map[string]interface{}) *QueueConfig

func GetConfigByKey

func GetConfigByKey(key string) (*QueueConfig, bool)

func GetConfigByLabels

func GetConfigByLabels(labels map[string]interface{}) []*QueueConfig

func GetConfigBySelector

func GetConfigBySelector(selector *QueueSelector) []*QueueConfig

func GetConfigByUUID

func GetConfigByUUID(id string) (*QueueConfig, bool)

func GetOrInitConfig

func GetOrInitConfig(key string) *QueueConfig

func SmartGetConfig

func SmartGetConfig(keyOrID string) (*QueueConfig, bool)

func SmartGetOrInitConfig

func SmartGetOrInitConfig(cfg *QueueConfig) *QueueConfig

func (*QueueConfig) ReplaceLabels

func (q *QueueConfig) ReplaceLabels(labels util.MapStr)

func (*QueueConfig) UpdateLabel

func (q *QueueConfig) UpdateLabel(key string, val interface{})

type QueueSelector

type QueueSelector struct {
	Labels map[string]interface{} `config:"labels,omitempty"`
	Ids    []string               `config:"ids,omitempty"`
	Keys   []string               `config:"keys,omitempty"`
}

func (*QueueSelector) ToString

func (s *QueueSelector) ToString() string

type SimpleQueueAPI

type SimpleQueueAPI interface {
	QueueAPI
	Pop(string, time.Duration) (data []byte, timeout bool)
	Depth(string) int64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL