Documentation
¶
Index ¶
- Constants
- func DecodeAndValidateSingleton(dv datautils.DataDecoderValidator, payloads [][]byte, target interface{}) error
- func GetTenantExchangeName(t string) string
- func JSONConvert[T any](payloads [][]byte) []*T
- func NewRandomStaticQueue() staticQueue
- func NoOpHook(task *Message) error
- func QueueTypeFromDispatcherID(d string) consumerQueue
- func QueueTypeFromPartitionIDAndController(p, controller string) consumerQueue
- func QueueTypeFromTickerID(t string) consumerQueue
- func TenantEventConsumerQueue(t string) fanoutQueue
- type AckHook
- type DstFunc
- type MQPubBuffer
- type MQSubBuffer
- type Message
- type MessageQueue
- type PubFunc
- type Queue
- type SharedTenantReader
Constants ¶
View Source
const ( TASK_PROCESSING_QUEUE staticQueue = "task_processing_queue_v2" OLAP_QUEUE staticQueue = "olap_queue_v2" )
View Source
const PUB_BUFFER_SIZE = 1000
View Source
const PUB_FLUSH_INTERVAL = 10 * time.Millisecond
View Source
const PUB_MAX_CONCURRENCY = 1
View Source
const PUB_TIMEOUT = 10 * time.Second
View Source
const SUB_BUFFER_SIZE = 1000
View Source
const SUB_FLUSH_INTERVAL = 10 * time.Millisecond
View Source
const SUB_MAX_CONCURRENCY = 10
View Source
const (
Scheduler = "scheduler"
)
Variables ¶
This section is empty.
Functions ¶
func DecodeAndValidateSingleton ¶
func DecodeAndValidateSingleton(dv datautils.DataDecoderValidator, payloads [][]byte, target interface{}) error
func GetTenantExchangeName ¶
func JSONConvert ¶
func NewRandomStaticQueue ¶
func NewRandomStaticQueue() staticQueue
func QueueTypeFromDispatcherID ¶
func QueueTypeFromDispatcherID(d string) consumerQueue
func QueueTypeFromPartitionIDAndController ¶
func QueueTypeFromPartitionIDAndController(p, controller string) consumerQueue
func QueueTypeFromTickerID ¶
func QueueTypeFromTickerID(t string) consumerQueue
func TenantEventConsumerQueue ¶
func TenantEventConsumerQueue(t string) fanoutQueue
Types ¶
type MQPubBuffer ¶
type MQPubBuffer struct {
// contains filtered or unexported fields
}
MQPubBuffer buffers messages coming out of the task queue, groups them by tenantId and msgId, and then flushes them to the task handler as necessary.
func NewMQPubBuffer ¶
func NewMQPubBuffer(mq MessageQueue) *MQPubBuffer
type MQSubBuffer ¶
type MQSubBuffer struct {
// contains filtered or unexported fields
}
MQSubBuffer buffers messages coming out of the task queue, groups them by tenantId and msgId, and then flushes them to the task handler as necessary.
func NewMQSubBuffer ¶
func NewMQSubBuffer(queue Queue, mq MessageQueue, dst DstFunc) *MQSubBuffer
func (*MQSubBuffer) Start ¶
func (m *MQSubBuffer) Start() (func() error, error)
type Message ¶
type Message struct { // ID is the ID of the task. ID string `json:"id"` // Payloads is the list of payloads. Payloads [][]byte `json:"messages"` // TenantID is the tenant ID. TenantID string `json:"tenant_id"` // Whether the message should immediately expire if it reaches the queue without an active consumer. ImmediatelyExpire bool `json:"immediately_expire"` // Whether the message should be persisted to disk Persistent bool `json:"persistent"` // OtelCarrier is the OpenTelemetry carrier for the task. OtelCarrier map[string]string `json:"otel_carrier"` // Retries is the number of retries for the task. Retries int `json:"retries"` }
func NewTenantMessage ¶
func (*Message) SetOtelCarrier ¶
type MessageQueue ¶
type MessageQueue interface { // Clone copies the message queue with a new instance. Clone() (func() error, MessageQueue) // SetQOS sets the quality of service for the message queue. SetQOS(prefetchCount int) // SendMessage sends a message to the message queue. SendMessage(ctx context.Context, queue Queue, msg *Message) error // Subscribe subscribes to the task queue. It returns a cleanup function that should be called when the // subscription is no longer needed. Subscribe(queue Queue, preAck AckHook, postAck AckHook) (func() error, error) // RegisterTenant registers a new pub/sub mechanism for a tenant. This should be called when a // new tenant is created. If this is not called, implementors should ensure that there's a check // on the first message to a tenant to ensure that the tenant is registered, and store the tenant // in an LRU cache which lives in-memory. RegisterTenant(ctx context.Context, tenantId string) error // IsReady returns true if the task queue is ready to accept tasks. IsReady() bool }
type Queue ¶
type Queue interface { // Name returns the name of the queue. Name() string // Durable returns true if this queue should survive task queue restarts. Durable() bool // AutoDeleted returns true if this queue should be deleted when the last consumer unsubscribes. AutoDeleted() bool // Exclusive returns true if this queue should only be accessed by the current connection. Exclusive() bool // FanoutExchangeKey returns which exchange the queue should be subscribed to. This is only currently relevant // to tenant pub/sub queues. // // In RabbitMQ terminology, the existence of a subscriber key means that the queue is bound to a fanout // exchange, and a new random queue is generated for each connection when connections are retried. FanoutExchangeKey() string // DLQ returns the queue's dead letter queue, if it exists. DLQ() Queue // IsDLQ returns true if the queue is a dead letter queue. IsDLQ() bool }
type SharedTenantReader ¶
type SharedTenantReader struct {
// contains filtered or unexported fields
}
func NewSharedTenantReader ¶
func NewSharedTenantReader(mq MessageQueue) *SharedTenantReader
Source Files
¶
Click to show internal directories.
Click to hide internal directories.