mq

package
v0.1.47 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2023 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BROKER_INMEMORY     = "inmemory"
	BROKER_RABBITMQ     = "rabbitmq"
	TOPIC_JOB           = "job.*"
	TOPIC_JOB_COMPLETED = "job.completed"
	TOPIC_JOB_FAILED    = "job.failed"
)
View Source
const (
	// The queue used by the API to insert new tasks into
	QUEUE_PENDING = "pending"
	// The queue used by workers to notify the coordinator
	// that a task has began processing
	QUEUE_STARTED = "started"
	// The queue used by workers to send tasks to when
	// a task completes successfully
	QUEUE_COMPLETED = "completed"
	// The queue used by workers to send tasks to when an error
	// occurs in processing
	QUEUE_ERROR = "error"
	// The default queue for tasks
	QUEUE_DEFAULT = "default"
	// The queue used by workers to periodically
	// notify the coordinator about their aliveness
	QUEUE_HEARTBEAT = "heartbeat"
	// The queue used by for job creation
	// and job-related state changes (e.g. cancellation)
	QUEUE_JOBS = "jobs"
	// The prefix used for queues that
	// are exclusive
	QUEUE_EXCLUSIVE_PREFIX = "x-"
)
View Source
const (
	RABBITMQ_DEFAULT_CONSUMER_TIMEOUT = time.Minute * 30
)

Variables

This section is empty.

Functions

func IsCoordinatorQueue

func IsCoordinatorQueue(qname string) bool

func IsWorkerQueue

func IsWorkerQueue(qname string) bool

Types

type Broker

type Broker interface {
	Queues(ctx context.Context) ([]QueueInfo, error)
	PublishTask(ctx context.Context, qname string, t *tork.Task) error
	SubscribeForTasks(qname string, handler func(t *tork.Task) error) error
	PublishHeartbeat(ctx context.Context, n *tork.Node) error
	SubscribeForHeartbeats(handler func(n *tork.Node) error) error
	PublishJob(ctx context.Context, j *tork.Job) error
	SubscribeForJobs(handler func(j *tork.Job) error) error
	Shutdown(ctx context.Context) error
	PublishEvent(ctx context.Context, topic string, event any) error
	SubscribeForEvents(ctx context.Context, pattern string, handler func(event any)) error
	HealthCheck(ctx context.Context) error
}

Broker is the message-queue, pub/sub mechanism used for delivering tasks.

type InMemoryBroker

type InMemoryBroker struct {
	// contains filtered or unexported fields
}

InMemoryBroker a very simple implementation of the Broker interface which uses in-memory channels to exchange messages. Meant for local development, tests etc.

func NewInMemoryBroker

func NewInMemoryBroker() *InMemoryBroker

func (*InMemoryBroker) HealthCheck added in v0.1.13

func (b *InMemoryBroker) HealthCheck(ctx context.Context) error

func (*InMemoryBroker) PublishEvent added in v0.1.1

func (b *InMemoryBroker) PublishEvent(ctx context.Context, topicName string, event any) error

func (*InMemoryBroker) PublishHeartbeat

func (b *InMemoryBroker) PublishHeartbeat(_ context.Context, n *tork.Node) error

func (*InMemoryBroker) PublishJob

func (b *InMemoryBroker) PublishJob(ctx context.Context, j *tork.Job) error

func (*InMemoryBroker) PublishTask

func (b *InMemoryBroker) PublishTask(ctx context.Context, qname string, t *tork.Task) error

func (*InMemoryBroker) Queues

func (b *InMemoryBroker) Queues(ctx context.Context) ([]QueueInfo, error)

func (*InMemoryBroker) Shutdown

func (b *InMemoryBroker) Shutdown(ctx context.Context) error

func (*InMemoryBroker) SubscribeForEvents added in v0.1.1

func (b *InMemoryBroker) SubscribeForEvents(ctx context.Context, topic string, handler func(event any)) error

func (*InMemoryBroker) SubscribeForHeartbeats

func (b *InMemoryBroker) SubscribeForHeartbeats(handler func(n *tork.Node) error) error

func (*InMemoryBroker) SubscribeForJobs

func (b *InMemoryBroker) SubscribeForJobs(handler func(j *tork.Job) error) error

func (*InMemoryBroker) SubscribeForTasks

func (b *InMemoryBroker) SubscribeForTasks(qname string, handler func(t *tork.Task) error) error

type Option added in v0.1.13

type Option = func(b *RabbitMQBroker)

func WithConsumerTimeoutMS added in v0.1.19

func WithConsumerTimeoutMS(consumerTimeout time.Duration) Option

WithConsumerTimeout sets the maximum amount of time in RabbitMQ will wait for a message ack before from a consumer before deeming the message undelivered and shuting down the connection. Default: 30 minutes

func WithHeartbeatTTL added in v0.1.13

func WithHeartbeatTTL(ttl int) Option

WithHeartbeatTTL sets the TTL for a heartbeat pending in the queue. The value of the TTL argument or policy must be a non-negative integer (0 <= n), describing the TTL period in milliseconds.

func WithManagementURL added in v0.1.23

func WithManagementURL(url string) Option

type Provider added in v0.1.1

type Provider func() (Broker, error)

type QueueInfo

type QueueInfo struct {
	Name        string `json:"name"`
	Size        int    `json:"size"`
	Subscribers int    `json:"subscribers"`
	Unacked     int    `json:"unacked"`
}

type RabbitMQBroker

type RabbitMQBroker struct {
	// contains filtered or unexported fields
}

func NewRabbitMQBroker

func NewRabbitMQBroker(url string, opts ...Option) (*RabbitMQBroker, error)

func (*RabbitMQBroker) HealthCheck added in v0.1.13

func (b *RabbitMQBroker) HealthCheck(ctx context.Context) error

func (*RabbitMQBroker) PublishEvent added in v0.1.1

func (b *RabbitMQBroker) PublishEvent(ctx context.Context, topic string, event any) error

func (*RabbitMQBroker) PublishHeartbeat

func (b *RabbitMQBroker) PublishHeartbeat(ctx context.Context, n *tork.Node) error

func (*RabbitMQBroker) PublishJob

func (b *RabbitMQBroker) PublishJob(ctx context.Context, j *tork.Job) error

func (*RabbitMQBroker) PublishTask

func (b *RabbitMQBroker) PublishTask(ctx context.Context, qname string, t *tork.Task) error

func (*RabbitMQBroker) Queues

func (b *RabbitMQBroker) Queues(ctx context.Context) ([]QueueInfo, error)

func (*RabbitMQBroker) Shutdown

func (b *RabbitMQBroker) Shutdown(ctx context.Context) error

func (*RabbitMQBroker) SubscribeForEvents added in v0.1.1

func (b *RabbitMQBroker) SubscribeForEvents(ctx context.Context, pattern string, handler func(event any)) error

func (*RabbitMQBroker) SubscribeForHeartbeats

func (b *RabbitMQBroker) SubscribeForHeartbeats(handler func(n *tork.Node) error) error

func (*RabbitMQBroker) SubscribeForJobs

func (b *RabbitMQBroker) SubscribeForJobs(handler func(j *tork.Job) error) error

func (*RabbitMQBroker) SubscribeForTasks

func (b *RabbitMQBroker) SubscribeForTasks(qname string, handler func(t *tork.Task) error) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL