v1

package
v0.55.0-alpha.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TASK_PROCESSING_QUEUE staticQueue = "task_processing_queue_v2"
	OLAP_QUEUE            staticQueue = "olap_queue_v2"
)
View Source
const PUB_BUFFER_SIZE = 1000
View Source
const PUB_FLUSH_INTERVAL = 10 * time.Millisecond
View Source
const PUB_MAX_CONCURRENCY = 1
View Source
const PUB_TIMEOUT = 10 * time.Second
View Source
const SUB_BUFFER_SIZE = 1000
View Source
const SUB_FLUSH_INTERVAL = 10 * time.Millisecond
View Source
const SUB_MAX_CONCURRENCY = 10
View Source
const (
	Scheduler = "scheduler"
)

Variables

This section is empty.

Functions

func DecodeAndValidateSingleton

func DecodeAndValidateSingleton(dv datautils.DataDecoderValidator, payloads [][]byte, target interface{}) error

func GetTenantExchangeName

func GetTenantExchangeName(t string) string

func JSONConvert

func JSONConvert[T any](payloads [][]byte) []*T

func NewRandomStaticQueue

func NewRandomStaticQueue() staticQueue

func NoOpHook

func NoOpHook(task *Message) error

func QueueTypeFromDispatcherID

func QueueTypeFromDispatcherID(d string) consumerQueue

func QueueTypeFromPartitionIDAndController

func QueueTypeFromPartitionIDAndController(p, controller string) consumerQueue

func QueueTypeFromTickerID

func QueueTypeFromTickerID(t string) consumerQueue

func TenantEventConsumerQueue

func TenantEventConsumerQueue(t string) fanoutQueue

Types

type AckHook

type AckHook func(task *Message) error

type DstFunc

type DstFunc func(tenantId, msgId string, payloads [][]byte) error

type MQPubBuffer

type MQPubBuffer struct {
	// contains filtered or unexported fields
}

MQPubBuffer buffers messages coming out of the task queue, groups them by tenantId and msgId, and then flushes them to the task handler as necessary.

func NewMQPubBuffer

func NewMQPubBuffer(mq MessageQueue) *MQPubBuffer

func (*MQPubBuffer) Pub

func (m *MQPubBuffer) Pub(ctx context.Context, queue Queue, msg *Message, wait bool) error

type MQSubBuffer

type MQSubBuffer struct {
	// contains filtered or unexported fields
}

MQSubBuffer buffers messages coming out of the task queue, groups them by tenantId and msgId, and then flushes them to the task handler as necessary.

func NewMQSubBuffer

func NewMQSubBuffer(queue Queue, mq MessageQueue, dst DstFunc) *MQSubBuffer

func (*MQSubBuffer) Start

func (m *MQSubBuffer) Start() (func() error, error)

type Message

type Message struct {
	// ID is the ID of the task.
	ID string `json:"id"`

	// Payloads is the list of payloads.
	Payloads [][]byte `json:"messages"`

	// TenantID is the tenant ID.
	TenantID string `json:"tenant_id"`

	// Whether the message should immediately expire if it reaches the queue without an active consumer.
	ImmediatelyExpire bool `json:"immediately_expire"`

	// Whether the message should be persisted to disk
	Persistent bool `json:"persistent"`

	// OtelCarrier is the OpenTelemetry carrier for the task.
	OtelCarrier map[string]string `json:"otel_carrier"`

	// Retries is the number of retries for the task.
	Retries int `json:"retries"`
}

func NewTenantMessage

func NewTenantMessage[T any](tenantId, id string, immediatelyExpire, persistent bool, payloads ...T) (*Message, error)

func (*Message) Serialize

func (t *Message) Serialize() ([]byte, error)

func (*Message) SetOtelCarrier

func (t *Message) SetOtelCarrier(otelCarrier map[string]string)

type MessageQueue

type MessageQueue interface {
	// Clone copies the message queue with a new instance.
	Clone() (func() error, MessageQueue)

	// SetQOS sets the quality of service for the message queue.
	SetQOS(prefetchCount int)

	// SendMessage sends a message to the message queue.
	SendMessage(ctx context.Context, queue Queue, msg *Message) error

	// Subscribe subscribes to the task queue. It returns a cleanup function that should be called when the
	// subscription is no longer needed.
	Subscribe(queue Queue, preAck AckHook, postAck AckHook) (func() error, error)

	// RegisterTenant registers a new pub/sub mechanism for a tenant. This should be called when a
	// new tenant is created. If this is not called, implementors should ensure that there's a check
	// on the first message to a tenant to ensure that the tenant is registered, and store the tenant
	// in an LRU cache which lives in-memory.
	RegisterTenant(ctx context.Context, tenantId string) error

	// IsReady returns true if the task queue is ready to accept tasks.
	IsReady() bool
}

type PubFunc

type PubFunc func(m *Message) error

type Queue

type Queue interface {
	// Name returns the name of the queue.
	Name() string

	// Durable returns true if this queue should survive task queue restarts.
	Durable() bool

	// AutoDeleted returns true if this queue should be deleted when the last consumer unsubscribes.
	AutoDeleted() bool

	// Exclusive returns true if this queue should only be accessed by the current connection.
	Exclusive() bool

	// FanoutExchangeKey returns which exchange the queue should be subscribed to. This is only currently relevant
	// to tenant pub/sub queues.
	//
	// In RabbitMQ terminology, the existence of a subscriber key means that the queue is bound to a fanout
	// exchange, and a new random queue is generated for each connection when connections are retried.
	FanoutExchangeKey() string

	// DLQ returns the queue's dead letter queue, if it exists.
	DLQ() Queue

	// IsDLQ returns true if the queue is a dead letter queue.
	IsDLQ() bool
}

type SharedTenantReader

type SharedTenantReader struct {
	// contains filtered or unexported fields
}

func NewSharedTenantReader

func NewSharedTenantReader(mq MessageQueue) *SharedTenantReader

func (*SharedTenantReader) Subscribe

func (s *SharedTenantReader) Subscribe(tenantId string, postAck AckHook) (func() error, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL