mq

package
v0.1.73 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2024 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BROKER_INMEMORY     = "inmemory"
	BROKER_RABBITMQ     = "rabbitmq"
	TOPIC_JOB           = "job.*"
	TOPIC_JOB_COMPLETED = "job.completed"
	TOPIC_JOB_FAILED    = "job.failed"
)
View Source
const (
	// The queue used by the API to insert new tasks into
	QUEUE_PENDING = "pending"
	// The queue used by workers to notify the coordinator
	// that a task has began processing
	QUEUE_STARTED = "started"
	// The queue used by workers to send tasks to when
	// a task completes successfully
	QUEUE_COMPLETED = "completed"
	// The queue used by workers to send tasks to when an error
	// occurs in processing
	QUEUE_ERROR = "error"
	// The default queue for tasks
	QUEUE_DEFAULT = "default"
	// The queue used by workers to periodically
	// notify the coordinator about their aliveness
	QUEUE_HEARTBEAT = "heartbeat"
	// The queue used by for job creation
	// and job-related state changes (e.g. cancellation)
	QUEUE_JOBS = "jobs"
	// The queue used by workers to send task
	// logs to the Coordinator
	QUEUE_LOGS = "logs"
	// The prefix used for queues that
	// are exclusive
	QUEUE_EXCLUSIVE_PREFIX = "x-"
)
View Source
const (
	RABBITMQ_DEFAULT_CONSUMER_TIMEOUT = time.Minute * 30
)

Variables

This section is empty.

Functions

func IsCoordinatorQueue

func IsCoordinatorQueue(qname string) bool

func IsWorkerQueue

func IsWorkerQueue(qname string) bool

Types

type Broker

type Broker interface {
	PublishTask(ctx context.Context, qname string, t *tork.Task) error
	SubscribeForTasks(qname string, handler func(t *tork.Task) error) error

	PublishHeartbeat(ctx context.Context, n *tork.Node) error
	SubscribeForHeartbeats(handler func(n *tork.Node) error) error

	PublishJob(ctx context.Context, j *tork.Job) error
	SubscribeForJobs(handler func(j *tork.Job) error) error

	PublishEvent(ctx context.Context, topic string, event any) error
	SubscribeForEvents(ctx context.Context, pattern string, handler func(event any)) error

	PublishTaskLogPart(ctx context.Context, p *tork.TaskLogPart) error
	SubscribeForTaskLogPart(handler func(p *tork.TaskLogPart)) error

	Queues(ctx context.Context) ([]QueueInfo, error)
	HealthCheck(ctx context.Context) error
	Shutdown(ctx context.Context) error
}

Broker is the message-queue, pub/sub mechanism used for delivering tasks.

type InMemoryBroker

type InMemoryBroker struct {
	// contains filtered or unexported fields
}

InMemoryBroker a very simple implementation of the Broker interface which uses in-memory channels to exchange messages. Meant for local development, tests etc.

func NewInMemoryBroker

func NewInMemoryBroker() *InMemoryBroker

func (*InMemoryBroker) HealthCheck added in v0.1.13

func (b *InMemoryBroker) HealthCheck(ctx context.Context) error

func (*InMemoryBroker) PublishEvent added in v0.1.1

func (b *InMemoryBroker) PublishEvent(ctx context.Context, topicName string, event any) error

func (*InMemoryBroker) PublishHeartbeat

func (b *InMemoryBroker) PublishHeartbeat(_ context.Context, n *tork.Node) error

func (*InMemoryBroker) PublishJob

func (b *InMemoryBroker) PublishJob(ctx context.Context, j *tork.Job) error

func (*InMemoryBroker) PublishTask

func (b *InMemoryBroker) PublishTask(ctx context.Context, qname string, t *tork.Task) error

func (*InMemoryBroker) PublishTaskLogPart added in v0.1.63

func (b *InMemoryBroker) PublishTaskLogPart(ctx context.Context, p *tork.TaskLogPart) error

func (*InMemoryBroker) Queues

func (b *InMemoryBroker) Queues(ctx context.Context) ([]QueueInfo, error)

func (*InMemoryBroker) Shutdown

func (b *InMemoryBroker) Shutdown(ctx context.Context) error

func (*InMemoryBroker) SubscribeForEvents added in v0.1.1

func (b *InMemoryBroker) SubscribeForEvents(ctx context.Context, topic string, handler func(event any)) error

func (*InMemoryBroker) SubscribeForHeartbeats

func (b *InMemoryBroker) SubscribeForHeartbeats(handler func(n *tork.Node) error) error

func (*InMemoryBroker) SubscribeForJobs

func (b *InMemoryBroker) SubscribeForJobs(handler func(j *tork.Job) error) error

func (*InMemoryBroker) SubscribeForTaskLogPart added in v0.1.63

func (b *InMemoryBroker) SubscribeForTaskLogPart(handler func(p *tork.TaskLogPart)) error

func (*InMemoryBroker) SubscribeForTasks

func (b *InMemoryBroker) SubscribeForTasks(qname string, handler func(t *tork.Task) error) error

type Option added in v0.1.13

type Option = func(b *RabbitMQBroker)

func WithConsumerTimeoutMS added in v0.1.19

func WithConsumerTimeoutMS(consumerTimeout time.Duration) Option

WithConsumerTimeout sets the maximum amount of time in RabbitMQ will wait for a message ack before from a consumer before deeming the message undelivered and shuting down the connection. Default: 30 minutes

func WithDurableQueues added in v0.1.68

func WithDurableQueues(val bool) Option

WithDurableQueues sets the durable flag upon queue creation. Durable queues can survive broker restarts.

func WithHeartbeatTTL added in v0.1.13

func WithHeartbeatTTL(ttl int) Option

WithHeartbeatTTL sets the TTL for a heartbeat pending in the queue. The value of the TTL argument or policy must be a non-negative integer (0 <= n), describing the TTL period in milliseconds.

func WithManagementURL added in v0.1.23

func WithManagementURL(url string) Option

type Provider added in v0.1.1

type Provider func() (Broker, error)

type QueueInfo

type QueueInfo struct {
	Name        string `json:"name"`
	Size        int    `json:"size"`
	Subscribers int    `json:"subscribers"`
	Unacked     int    `json:"unacked"`
}

type RabbitMQBroker

type RabbitMQBroker struct {
	// contains filtered or unexported fields
}

func NewRabbitMQBroker

func NewRabbitMQBroker(url string, opts ...Option) (*RabbitMQBroker, error)

func (*RabbitMQBroker) HealthCheck added in v0.1.13

func (b *RabbitMQBroker) HealthCheck(ctx context.Context) error

func (*RabbitMQBroker) PublishEvent added in v0.1.1

func (b *RabbitMQBroker) PublishEvent(ctx context.Context, topic string, event any) error

func (*RabbitMQBroker) PublishHeartbeat

func (b *RabbitMQBroker) PublishHeartbeat(ctx context.Context, n *tork.Node) error

func (*RabbitMQBroker) PublishJob

func (b *RabbitMQBroker) PublishJob(ctx context.Context, j *tork.Job) error

func (*RabbitMQBroker) PublishTask

func (b *RabbitMQBroker) PublishTask(ctx context.Context, qname string, t *tork.Task) error

func (*RabbitMQBroker) PublishTaskLogPart added in v0.1.63

func (b *RabbitMQBroker) PublishTaskLogPart(ctx context.Context, p *tork.TaskLogPart) error

func (*RabbitMQBroker) Queues

func (b *RabbitMQBroker) Queues(ctx context.Context) ([]QueueInfo, error)

func (*RabbitMQBroker) Shutdown

func (b *RabbitMQBroker) Shutdown(ctx context.Context) error

func (*RabbitMQBroker) SubscribeForEvents added in v0.1.1

func (b *RabbitMQBroker) SubscribeForEvents(ctx context.Context, pattern string, handler func(event any)) error

func (*RabbitMQBroker) SubscribeForHeartbeats

func (b *RabbitMQBroker) SubscribeForHeartbeats(handler func(n *tork.Node) error) error

func (*RabbitMQBroker) SubscribeForJobs

func (b *RabbitMQBroker) SubscribeForJobs(handler func(j *tork.Job) error) error

func (*RabbitMQBroker) SubscribeForTaskLogPart added in v0.1.63

func (b *RabbitMQBroker) SubscribeForTaskLogPart(handler func(p *tork.TaskLogPart)) error

func (*RabbitMQBroker) SubscribeForTasks

func (b *RabbitMQBroker) SubscribeForTasks(qname string, handler func(t *tork.Task) error) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL