broker

package
v0.1.116 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2025 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BROKER_INMEMORY     = "inmemory"
	BROKER_RABBITMQ     = "rabbitmq"
	TOPIC_JOB           = "job.*"
	TOPIC_JOB_COMPLETED = "job.completed"
	TOPIC_JOB_FAILED    = "job.failed"
	TOPIC_JOB_SCHEDULED = "job.scheduled"
)
View Source
const (
	// The queue used by the API to insert new tasks into
	QUEUE_PENDING = "pending"
	// The queue used by workers to notify the coordinator
	// that a task has began processing
	QUEUE_STARTED = "started"
	// The queue used by workers to send tasks to when
	// a task completes successfully
	QUEUE_COMPLETED = "completed"
	// The queue used by workers to send tasks to when an error
	// occurs in processing
	QUEUE_ERROR = "error"
	// The default queue for tasks
	QUEUE_DEFAULT = "default"
	// The queue used by workers to periodically
	// notify the coordinator about their aliveness
	QUEUE_HEARTBEAT = "heartbeat"
	// The queue used by the Coordinator for job creation
	// and job-related state changes (e.g. cancellation)
	QUEUE_JOBS = "jobs"
	// The queue used by workers to send task
	// logs to the Coordinator
	QUEUE_LOGS = "logs"
	// The queue used by workers to send task
	// progress to the Coordinator
	QUEUE_PROGRESS = "progress"
	// The prefix used for queues that
	// are exclusive
	QUEUE_EXCLUSIVE_PREFIX = "x-"
)
View Source
const (
	RABBITMQ_DEFAULT_CONSUMER_TIMEOUT = time.Minute * 30
)

Variables

This section is empty.

Functions

func IsCoordinatorQueue

func IsCoordinatorQueue(qname string) bool

func IsTaskQueue

func IsTaskQueue(qname string) bool

func IsWorkerQueue

func IsWorkerQueue(qname string) bool

Types

type Broker

type Broker interface {
	PublishTask(ctx context.Context, qname string, t *tork.Task) error
	SubscribeForTasks(qname string, handler func(t *tork.Task) error) error

	PublishTaskProgress(ctx context.Context, t *tork.Task) error
	SubscribeForTaskProgress(handler func(t *tork.Task) error) error

	PublishHeartbeat(ctx context.Context, n *tork.Node) error
	SubscribeForHeartbeats(handler func(n *tork.Node) error) error

	PublishJob(ctx context.Context, j *tork.Job) error
	SubscribeForJobs(handler func(j *tork.Job) error) error

	PublishEvent(ctx context.Context, topic string, event any) error
	SubscribeForEvents(ctx context.Context, pattern string, handler func(event any)) error

	PublishTaskLogPart(ctx context.Context, p *tork.TaskLogPart) error
	SubscribeForTaskLogPart(handler func(p *tork.TaskLogPart)) error

	Queues(ctx context.Context) ([]QueueInfo, error)
	HealthCheck(ctx context.Context) error
	Shutdown(ctx context.Context) error
}

Broker is the message-queue, pub/sub mechanism used for delivering tasks.

type InMemoryBroker

type InMemoryBroker struct {
	// contains filtered or unexported fields
}

InMemoryBroker a very simple implementation of the Broker interface which uses in-memory channels to exchange messages. Meant for local development, tests etc.

func NewInMemoryBroker

func NewInMemoryBroker() *InMemoryBroker

func (*InMemoryBroker) HealthCheck

func (b *InMemoryBroker) HealthCheck(ctx context.Context) error

func (*InMemoryBroker) PublishEvent

func (b *InMemoryBroker) PublishEvent(ctx context.Context, topicName string, event any) error

func (*InMemoryBroker) PublishHeartbeat

func (b *InMemoryBroker) PublishHeartbeat(_ context.Context, n *tork.Node) error

func (*InMemoryBroker) PublishJob

func (b *InMemoryBroker) PublishJob(ctx context.Context, j *tork.Job) error

func (*InMemoryBroker) PublishTask

func (b *InMemoryBroker) PublishTask(ctx context.Context, qname string, t *tork.Task) error

func (*InMemoryBroker) PublishTaskLogPart

func (b *InMemoryBroker) PublishTaskLogPart(ctx context.Context, p *tork.TaskLogPart) error

func (*InMemoryBroker) PublishTaskProgress

func (b *InMemoryBroker) PublishTaskProgress(ctx context.Context, tp *tork.Task) error

func (*InMemoryBroker) Queues

func (b *InMemoryBroker) Queues(ctx context.Context) ([]QueueInfo, error)

func (*InMemoryBroker) Shutdown

func (b *InMemoryBroker) Shutdown(ctx context.Context) error

func (*InMemoryBroker) SubscribeForEvents

func (b *InMemoryBroker) SubscribeForEvents(ctx context.Context, topic string, handler func(event any)) error

func (*InMemoryBroker) SubscribeForHeartbeats

func (b *InMemoryBroker) SubscribeForHeartbeats(handler func(n *tork.Node) error) error

func (*InMemoryBroker) SubscribeForJobs

func (b *InMemoryBroker) SubscribeForJobs(handler func(j *tork.Job) error) error

func (*InMemoryBroker) SubscribeForTaskLogPart

func (b *InMemoryBroker) SubscribeForTaskLogPart(handler func(p *tork.TaskLogPart)) error

func (*InMemoryBroker) SubscribeForTaskProgress

func (b *InMemoryBroker) SubscribeForTaskProgress(handler func(tp *tork.Task) error) error

func (*InMemoryBroker) SubscribeForTasks

func (b *InMemoryBroker) SubscribeForTasks(qname string, handler func(t *tork.Task) error) error

type LogShipper

type LogShipper struct {
	Broker Broker
	TaskID string
	// contains filtered or unexported fields
}

func NewLogShipper

func NewLogShipper(broker Broker, taskID string) *LogShipper

func (*LogShipper) Write

func (r *LogShipper) Write(p []byte) (int, error)

type Option

type Option = func(b *RabbitMQBroker)

func WithConsumerTimeoutMS

func WithConsumerTimeoutMS(consumerTimeout time.Duration) Option

WithConsumerTimeout sets the maximum amount of time in RabbitMQ will wait for a message ack before from a consumer before deeming the message undelivered and shuting down the connection. Default: 30 minutes

func WithDurableQueues

func WithDurableQueues(val bool) Option

WithDurableQueues sets the durable flag upon queue creation. Durable queues can survive broker restarts.

func WithHeartbeatTTL

func WithHeartbeatTTL(ttl int) Option

WithHeartbeatTTL sets the TTL for a heartbeat pending in the queue. The value of the TTL argument or policy must be a non-negative integer (0 <= n), describing the TTL period in milliseconds.

func WithManagementURL

func WithManagementURL(url string) Option

type Provider

type Provider func() (Broker, error)

type QueueInfo

type QueueInfo struct {
	Name        string `json:"name"`
	Size        int    `json:"size"`
	Subscribers int    `json:"subscribers"`
	Unacked     int    `json:"unacked"`
}

type RabbitMQBroker

type RabbitMQBroker struct {
	// contains filtered or unexported fields
}

func NewRabbitMQBroker

func NewRabbitMQBroker(url string, opts ...Option) (*RabbitMQBroker, error)

func (*RabbitMQBroker) HealthCheck

func (b *RabbitMQBroker) HealthCheck(ctx context.Context) error

func (*RabbitMQBroker) PublishEvent

func (b *RabbitMQBroker) PublishEvent(ctx context.Context, topic string, event any) error

func (*RabbitMQBroker) PublishHeartbeat

func (b *RabbitMQBroker) PublishHeartbeat(ctx context.Context, n *tork.Node) error

func (*RabbitMQBroker) PublishJob

func (b *RabbitMQBroker) PublishJob(ctx context.Context, j *tork.Job) error

func (*RabbitMQBroker) PublishTask

func (b *RabbitMQBroker) PublishTask(ctx context.Context, qname string, t *tork.Task) error

func (*RabbitMQBroker) PublishTaskLogPart

func (b *RabbitMQBroker) PublishTaskLogPart(ctx context.Context, p *tork.TaskLogPart) error

func (*RabbitMQBroker) PublishTaskProgress

func (b *RabbitMQBroker) PublishTaskProgress(ctx context.Context, tp *tork.Task) error

func (*RabbitMQBroker) Queues

func (b *RabbitMQBroker) Queues(ctx context.Context) ([]QueueInfo, error)

func (*RabbitMQBroker) Shutdown

func (b *RabbitMQBroker) Shutdown(ctx context.Context) error

func (*RabbitMQBroker) SubscribeForEvents

func (b *RabbitMQBroker) SubscribeForEvents(ctx context.Context, pattern string, handler func(event any)) error

func (*RabbitMQBroker) SubscribeForHeartbeats

func (b *RabbitMQBroker) SubscribeForHeartbeats(handler func(n *tork.Node) error) error

func (*RabbitMQBroker) SubscribeForJobs

func (b *RabbitMQBroker) SubscribeForJobs(handler func(j *tork.Job) error) error

func (*RabbitMQBroker) SubscribeForTaskLogPart

func (b *RabbitMQBroker) SubscribeForTaskLogPart(handler func(p *tork.TaskLogPart)) error

func (*RabbitMQBroker) SubscribeForTaskProgress

func (b *RabbitMQBroker) SubscribeForTaskProgress(handler func(t *tork.Task) error) error

func (*RabbitMQBroker) SubscribeForTasks

func (b *RabbitMQBroker) SubscribeForTasks(qname string, handler func(t *tork.Task) error) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL