Documentation ¶
Index ¶
- Constants
- func Close(k *QueueConfig) error
- func CommitOffset(k *QueueConfig, consumer *ConsumerConfig, offset Offset) (bool, error)
- func ConsumerHasLag(k *QueueConfig, c *ConsumerConfig) bool
- func ConvertOffset(offsetStr string) (int64, int64)
- func DeleteOffset(k *QueueConfig, consumer *ConsumerConfig) error
- func Depth(k *QueueConfig) int64
- func Destroy(k *QueueConfig) error
- func ExtendConvertOffset(offsetStr string) (int64, int64, int64)
- func GetAllConfigBytes() []byte
- func GetAllConfigs() map[string]*QueueConfig
- func GetConsumerConfigsByQueueID(queueID string) (map[string]*ConsumerConfig, bool)
- func GetEarlierOffsetByQueueID(queueID string) (consumerSize int, segment int64, pos, ver int64)
- func GetLatestOffsetByQueueID(queueID string) (consumerSize int, segment int64, pos int64)
- func GetQueues() map[string][]string
- func GetStorageSize(k string) uint64
- func HasLag(k *QueueConfig) bool
- func IniQueue(k *QueueConfig)
- func IsConfigExists(key string) bool
- func Itoa64(i int64) string
- func Pop(k *QueueConfig) ([]byte, error)
- func PopTimeout(k *QueueConfig, timeoutInSeconds time.Duration) (data []byte, timeout bool, err error)
- func Push(k *QueueConfig, v []byte) error
- func Register(name string, h QueueAPI)
- func RegisterConfig(cfg *QueueConfig) (preExists bool, err error)
- func RegisterConsumer(queueID string, consumer *ConsumerConfig) (bool, error)
- func RegisterConsumerConfigChangeListener(l func(id string, configs map[string]*ConsumerConfig))
- func RegisterDefaultHandler(h QueueAPI)
- func RegisterQueueConfigChangeListener(l func(cfg *QueueConfig))
- func ReleaseConsumer(k *QueueConfig, c *ConsumerConfig, consumer ConsumerAPI) error
- func RemoveAllConsumers(qConfig *QueueConfig) (bool, error)
- func RemoveConfig(cfg *QueueConfig) bool
- func RemoveConsumer(queueID string, consumerKey string) (bool, error)
- func ReturnQueueConfig(cfg *QueueConfig)
- func TriggerChangeEvent(queueID string, cfgs map[string]*ConsumerConfig, async bool)
- type AdvancedQueueAPI
- type ConsumerAPI
- type ConsumerConfig
- type ConsumerInstanceInfo
- type Context
- type Message
- type Offset
- func AcquireOffset(seg, pos int64) Offset
- func DecodeFromString(offsetStr string) Offset
- func GetEarlierOffsetStrByQueueID(queueID string) Offset
- func GetOffset(k *QueueConfig, consumer *ConsumerConfig) (Offset, error)
- func LatestOffset(k *QueueConfig) Offset
- func NewOffset(seg, pos int64) Offset
- func NewOffsetWithVersion(seg, pos, ver int64) Offset
- type ProduceRequest
- type ProduceResponse
- type ProducerAPI
- type QueueAPI
- type QueueConfig
- func AcquireQueueConfig() *QueueConfig
- func AdvancedGetOrInitConfig(queueType, key string, labels map[string]interface{}) *QueueConfig
- func GetConfigByKey(key string) (*QueueConfig, bool)
- func GetConfigByLabels(labels map[string]interface{}) []*QueueConfig
- func GetConfigBySelector(selector *QueueSelector) []*QueueConfig
- func GetConfigByUUID(id string) (*QueueConfig, bool)
- func GetOrInitConfig(key string) *QueueConfig
- func SmartGetConfig(keyOrID string) (*QueueConfig, bool)
- func SmartGetOrInitConfig(cfg *QueueConfig) *QueueConfig
- type QueueSelector
- type SimpleQueueAPI
Constants ¶
View Source
const BucketWhoOwnsThisTopic = "who_owns_this_topic" //queue_group: node_id/timestamp
View Source
const ConsumerBucket = "queue_consumers"
Variables ¶
This section is empty.
Functions ¶
func Close ¶
func Close(k *QueueConfig) error
func CommitOffset ¶
func CommitOffset(k *QueueConfig, consumer *ConsumerConfig, offset Offset) (bool, error)
func ConsumerHasLag ¶
func ConsumerHasLag(k *QueueConfig, c *ConsumerConfig) bool
func ConvertOffset ¶
func DeleteOffset ¶
func DeleteOffset(k *QueueConfig, consumer *ConsumerConfig) error
func Depth ¶
func Depth(k *QueueConfig) int64
func Destroy ¶
func Destroy(k *QueueConfig) error
func GetAllConfigBytes ¶
func GetAllConfigBytes() []byte
func GetAllConfigs ¶
func GetAllConfigs() map[string]*QueueConfig
func GetConsumerConfigsByQueueID ¶
func GetConsumerConfigsByQueueID(queueID string) (map[string]*ConsumerConfig, bool)
func GetStorageSize ¶
func HasLag ¶
func HasLag(k *QueueConfig) bool
func IniQueue ¶
func IniQueue(k *QueueConfig)
func IsConfigExists ¶
func Pop ¶
func Pop(k *QueueConfig) ([]byte, error)
func PopTimeout ¶
func Push ¶
func Push(k *QueueConfig, v []byte) error
func RegisterConfig ¶
func RegisterConfig(cfg *QueueConfig) (preExists bool, err error)
func RegisterConsumer ¶
func RegisterConsumer(queueID string, consumer *ConsumerConfig) (bool, error)
func RegisterConsumerConfigChangeListener ¶
func RegisterConsumerConfigChangeListener(l func(id string, configs map[string]*ConsumerConfig))
func RegisterDefaultHandler ¶
func RegisterDefaultHandler(h QueueAPI)
func RegisterQueueConfigChangeListener ¶
func RegisterQueueConfigChangeListener(l func(cfg *QueueConfig))
func ReleaseConsumer ¶
func ReleaseConsumer(k *QueueConfig, c *ConsumerConfig, consumer ConsumerAPI) error
func RemoveAllConsumers ¶
func RemoveAllConsumers(qConfig *QueueConfig) (bool, error)
func RemoveConfig ¶
func RemoveConfig(cfg *QueueConfig) bool
func ReturnQueueConfig ¶
func ReturnQueueConfig(cfg *QueueConfig)
func TriggerChangeEvent ¶
func TriggerChangeEvent(queueID string, cfgs map[string]*ConsumerConfig, async bool)
Types ¶
type AdvancedQueueAPI ¶
type AdvancedQueueAPI interface { QueueAPI LatestOffset(k *QueueConfig) Offset GetOffset(k *QueueConfig, consumer *ConsumerConfig) (Offset, error) DeleteOffset(k *QueueConfig, consumer *ConsumerConfig) error CommitOffset(k *QueueConfig, consumer *ConsumerConfig, offset Offset) (bool, error) AcquireConsumer(k *QueueConfig, consumer *ConsumerConfig) (ConsumerAPI, error) ReleaseConsumer(k *QueueConfig, c *ConsumerConfig, consumer ConsumerAPI) error AcquireProducer(cfg *QueueConfig) (ProducerAPI, error) ReleaseProducer(k *QueueConfig, producer ProducerAPI) error }
type ConsumerAPI ¶
type ConsumerAPI interface { Close() error ResetOffset(segment, readPos int64) (err error) FetchMessages(ctx *Context, numOfMessages int) (messages []Message, isTimeout bool, err error) CommitOffset(offset Offset) error }
func AcquireConsumer ¶
func AcquireConsumer(k *QueueConfig, consumer *ConsumerConfig, clientID string) (ConsumerAPI, error)
type ConsumerConfig ¶
type ConsumerConfig struct { orm.ORMObjectBase Source string `config:"source" json:"source,omitempty"` Group string `config:"group" json:"group,omitempty"` Name string `config:"name" json:"name,omitempty"` AutoResetOffset string `config:"auto_reset_offset" json:"auto_reset_offset,omitempty"` AutoCommitOffset bool `config:"auto_commit_offset" json:"auto_commit_offset,omitempty"` //don't add queue id to generated sliced consumer group SimpleSlicedGroup bool `config:"simple_sliced_group" json:"simple_sliced_group,omitempty"` FetchMinBytes int `config:"fetch_min_bytes" json:"fetch_min_bytes,omitempty"` FetchMaxBytes int `config:"fetch_max_bytes" json:"fetch_max_bytes,omitempty"` FetchMaxMessages int `config:"fetch_max_messages" json:"fetch_max_messages,omitempty"` FetchMaxWaitMs int64 `config:"fetch_max_wait_ms" json:"fetch_max_wait_ms,omitempty"` ConsumeTimeoutInSeconds int `config:"consume_timeout" json:"consume_timeout,omitempty"` EOFMaxRetryTimes int `config:"eof_max_retry_times" json:"eof_max_retry_times,omitempty"` EOFRetryDelayInMs int64 `config:"eof_retry_delay_in_ms" json:"eof_retry_delay_in_ms,omitempty"` ClientExpiredInSeconds int64 `config:"client_expired_in_seconds" json:"client_expired_in_seconds,omitempty"` //client acquires lock for this long CommitLocker sync.Mutex // contains filtered or unexported fields }
func GetConsumerConfig ¶
func GetConsumerConfig(queueID, group, name string) (*ConsumerConfig, bool)
func GetConsumerConfigID ¶
func GetConsumerConfigID(queueID, consumerID string) (*ConsumerConfig, bool)
func GetOrInitConsumerConfig ¶
func GetOrInitConsumerConfig(queueID, group, name string) *ConsumerConfig
func NewConsumerConfig ¶
func NewConsumerConfig(queueID, group, name string) *ConsumerConfig
func (*ConsumerConfig) GetFetchMaxWaitMs ¶
func (cfg *ConsumerConfig) GetFetchMaxWaitMs() time.Duration
func (*ConsumerConfig) GetLastActiveTime ¶
func (cfg *ConsumerConfig) GetLastActiveTime() *time.Time
func (*ConsumerConfig) KeepActive ¶
func (cfg *ConsumerConfig) KeepActive()
func (*ConsumerConfig) Key ¶
func (cfg *ConsumerConfig) Key() string
func (*ConsumerConfig) String ¶
func (cfg *ConsumerConfig) String() string
type ConsumerInstanceInfo ¶
type Context ¶
type Context struct { MessageCount int `config:"message_count" json:"message_count"` NextOffset Offset `config:"next_offset" json:"next_offset"` InitOffset Offset `config:"init_offset" json:"init_offset"` }
func (*Context) UpdateInitOffset ¶
func (*Context) UpdateNextOffset ¶
type Message ¶
type Message struct { Timestamp int64 `config:"timestamp" json:"timestamp" parquet:"timestamp"` Offset Offset `config:"offset" json:"offset" parquet:"offset"` //current offset NextOffset Offset `config:"next_offset" json:"next_offset" parquet:"next_offset"` //offset for next message Size int `config:"size" json:"size" parquet:"size"` Data []byte `config:"data" json:"data" parquet:"data,zstd"` }
type Offset ¶
type Offset struct { Segment int64 `json:"segment"` Position int64 `json:"position"` Version int64 `json:"version"` }
func AcquireOffset ¶
func DecodeFromString ¶
func GetOffset ¶
func GetOffset(k *QueueConfig, consumer *ConsumerConfig) (Offset, error)
func LatestOffset ¶
func LatestOffset(k *QueueConfig) Offset
func NewOffsetWithVersion ¶
func (*Offset) EncodeToString ¶
func (*Offset) LatestThan ¶
type ProduceRequest ¶
type ProduceResponse ¶
type ProducerAPI ¶
type ProducerAPI interface { Produce(*[]ProduceRequest) (*[]ProduceResponse, error) Close() error }
func AcquireProducer ¶
func AcquireProducer(cfg *QueueConfig) (ProducerAPI, error)
type QueueAPI ¶
type QueueAPI interface { Name() string Init(string) error Close(string) error GetStorageSize(k string) uint64 Destroy(string) error GetQueues() []string Push(string, []byte) error }
func GetHandlerByType ¶
type QueueConfig ¶
type QueueConfig struct { ID string `config:"id" json:"id,omitempty"` //uuid for each queue Name string `config:"name" json:"name,omitempty"` //unique name of each queue Source string `config:"source" json:"source,omitempty"` Codec string `config:"codec" json:"codec,omitempty"` Type string `config:"type" json:"type,omitempty"` Created string `config:"created" json:"created,omitempty"` Labels util.MapStr `config:"label" json:"label,omitempty"` sync.RWMutex }
func AcquireQueueConfig ¶
func AcquireQueueConfig() *QueueConfig
func AdvancedGetOrInitConfig ¶
func AdvancedGetOrInitConfig(queueType, key string, labels map[string]interface{}) *QueueConfig
func GetConfigByKey ¶
func GetConfigByKey(key string) (*QueueConfig, bool)
func GetConfigByLabels ¶
func GetConfigByLabels(labels map[string]interface{}) []*QueueConfig
func GetConfigBySelector ¶
func GetConfigBySelector(selector *QueueSelector) []*QueueConfig
func GetConfigByUUID ¶
func GetConfigByUUID(id string) (*QueueConfig, bool)
func GetOrInitConfig ¶
func GetOrInitConfig(key string) *QueueConfig
func SmartGetConfig ¶
func SmartGetConfig(keyOrID string) (*QueueConfig, bool)
func SmartGetOrInitConfig ¶
func SmartGetOrInitConfig(cfg *QueueConfig) *QueueConfig
func (*QueueConfig) ReplaceLabels ¶
func (q *QueueConfig) ReplaceLabels(labels util.MapStr)
func (*QueueConfig) UpdateLabel ¶
func (q *QueueConfig) UpdateLabel(key string, val interface{})
type QueueSelector ¶
type QueueSelector struct { Labels map[string]interface{} `config:"labels,omitempty"` Ids []string `config:"ids,omitempty"` Keys []string `config:"keys,omitempty"` }
func (*QueueSelector) ToString ¶
func (s *QueueSelector) ToString() string
Click to show internal directories.
Click to hide internal directories.