Documentation ¶
Index ¶
- Constants
- func IsCoordinatorQueue(qname string) bool
- func IsTaskQueue(qname string) bool
- func IsWorkerQueue(qname string) bool
- type Broker
- type InMemoryBroker
- func (b *InMemoryBroker) HealthCheck(ctx context.Context) error
- func (b *InMemoryBroker) PublishEvent(ctx context.Context, topicName string, event any) error
- func (b *InMemoryBroker) PublishHeartbeat(_ context.Context, n *tork.Node) error
- func (b *InMemoryBroker) PublishJob(ctx context.Context, j *tork.Job) error
- func (b *InMemoryBroker) PublishTask(ctx context.Context, qname string, t *tork.Task) error
- func (b *InMemoryBroker) PublishTaskLogPart(ctx context.Context, p *tork.TaskLogPart) error
- func (b *InMemoryBroker) PublishTaskProgress(ctx context.Context, tp *tork.Task) error
- func (b *InMemoryBroker) Queues(ctx context.Context) ([]QueueInfo, error)
- func (b *InMemoryBroker) Shutdown(ctx context.Context) error
- func (b *InMemoryBroker) SubscribeForEvents(ctx context.Context, topic string, handler func(event any)) error
- func (b *InMemoryBroker) SubscribeForHeartbeats(handler func(n *tork.Node) error) error
- func (b *InMemoryBroker) SubscribeForJobs(handler func(j *tork.Job) error) error
- func (b *InMemoryBroker) SubscribeForTaskLogPart(handler func(p *tork.TaskLogPart)) error
- func (b *InMemoryBroker) SubscribeForTaskProgress(handler func(tp *tork.Task) error) error
- func (b *InMemoryBroker) SubscribeForTasks(qname string, handler func(t *tork.Task) error) error
- type LogShipper
- type Option
- type Provider
- type QueueInfo
- type RabbitMQBroker
- func (b *RabbitMQBroker) HealthCheck(ctx context.Context) error
- func (b *RabbitMQBroker) PublishEvent(ctx context.Context, topic string, event any) error
- func (b *RabbitMQBroker) PublishHeartbeat(ctx context.Context, n *tork.Node) error
- func (b *RabbitMQBroker) PublishJob(ctx context.Context, j *tork.Job) error
- func (b *RabbitMQBroker) PublishTask(ctx context.Context, qname string, t *tork.Task) error
- func (b *RabbitMQBroker) PublishTaskLogPart(ctx context.Context, p *tork.TaskLogPart) error
- func (b *RabbitMQBroker) PublishTaskProgress(ctx context.Context, tp *tork.Task) error
- func (b *RabbitMQBroker) Queues(ctx context.Context) ([]QueueInfo, error)
- func (b *RabbitMQBroker) Shutdown(ctx context.Context) error
- func (b *RabbitMQBroker) SubscribeForEvents(ctx context.Context, pattern string, handler func(event any)) error
- func (b *RabbitMQBroker) SubscribeForHeartbeats(handler func(n *tork.Node) error) error
- func (b *RabbitMQBroker) SubscribeForJobs(handler func(j *tork.Job) error) error
- func (b *RabbitMQBroker) SubscribeForTaskLogPart(handler func(p *tork.TaskLogPart)) error
- func (b *RabbitMQBroker) SubscribeForTaskProgress(handler func(t *tork.Task) error) error
- func (b *RabbitMQBroker) SubscribeForTasks(qname string, handler func(t *tork.Task) error) error
Constants ¶
const ( BROKER_INMEMORY = "inmemory" BROKER_RABBITMQ = "rabbitmq" TOPIC_JOB = "job.*" TOPIC_JOB_COMPLETED = "job.completed" TOPIC_JOB_FAILED = "job.failed" TOPIC_JOB_SCHEDULED = "job.scheduled" )
const ( // The queue used by the API to insert new tasks into QUEUE_PENDING = "pending" // The queue used by workers to notify the coordinator // that a task has began processing QUEUE_STARTED = "started" // The queue used by workers to send tasks to when // a task completes successfully QUEUE_COMPLETED = "completed" // The queue used by workers to send tasks to when an error // occurs in processing QUEUE_ERROR = "error" // The default queue for tasks QUEUE_DEFAULT = "default" // The queue used by workers to periodically // notify the coordinator about their aliveness QUEUE_HEARTBEAT = "heartbeat" // The queue used by the Coordinator for job creation // and job-related state changes (e.g. cancellation) QUEUE_JOBS = "jobs" // The queue used by workers to send task // logs to the Coordinator QUEUE_LOGS = "logs" // The queue used by workers to send task // progress to the Coordinator QUEUE_PROGRESS = "progress" // The prefix used for queues that // are exclusive QUEUE_EXCLUSIVE_PREFIX = "x-" )
const (
RABBITMQ_DEFAULT_CONSUMER_TIMEOUT = time.Minute * 30
)
Variables ¶
This section is empty.
Functions ¶
func IsCoordinatorQueue ¶
func IsTaskQueue ¶
func IsWorkerQueue ¶
Types ¶
type Broker ¶
type Broker interface { PublishTask(ctx context.Context, qname string, t *tork.Task) error SubscribeForTasks(qname string, handler func(t *tork.Task) error) error PublishTaskProgress(ctx context.Context, t *tork.Task) error SubscribeForTaskProgress(handler func(t *tork.Task) error) error PublishHeartbeat(ctx context.Context, n *tork.Node) error SubscribeForHeartbeats(handler func(n *tork.Node) error) error PublishJob(ctx context.Context, j *tork.Job) error SubscribeForJobs(handler func(j *tork.Job) error) error PublishEvent(ctx context.Context, topic string, event any) error SubscribeForEvents(ctx context.Context, pattern string, handler func(event any)) error PublishTaskLogPart(ctx context.Context, p *tork.TaskLogPart) error SubscribeForTaskLogPart(handler func(p *tork.TaskLogPart)) error Queues(ctx context.Context) ([]QueueInfo, error) HealthCheck(ctx context.Context) error Shutdown(ctx context.Context) error }
Broker is the message-queue, pub/sub mechanism used for delivering tasks.
type InMemoryBroker ¶
type InMemoryBroker struct {
// contains filtered or unexported fields
}
InMemoryBroker a very simple implementation of the Broker interface which uses in-memory channels to exchange messages. Meant for local development, tests etc.
func NewInMemoryBroker ¶
func NewInMemoryBroker() *InMemoryBroker
func (*InMemoryBroker) HealthCheck ¶
func (b *InMemoryBroker) HealthCheck(ctx context.Context) error
func (*InMemoryBroker) PublishEvent ¶
func (*InMemoryBroker) PublishHeartbeat ¶
func (*InMemoryBroker) PublishJob ¶
func (*InMemoryBroker) PublishTask ¶
func (*InMemoryBroker) PublishTaskLogPart ¶
func (b *InMemoryBroker) PublishTaskLogPart(ctx context.Context, p *tork.TaskLogPart) error
func (*InMemoryBroker) PublishTaskProgress ¶
func (*InMemoryBroker) Queues ¶
func (b *InMemoryBroker) Queues(ctx context.Context) ([]QueueInfo, error)
func (*InMemoryBroker) SubscribeForEvents ¶
func (*InMemoryBroker) SubscribeForHeartbeats ¶
func (b *InMemoryBroker) SubscribeForHeartbeats(handler func(n *tork.Node) error) error
func (*InMemoryBroker) SubscribeForJobs ¶
func (b *InMemoryBroker) SubscribeForJobs(handler func(j *tork.Job) error) error
func (*InMemoryBroker) SubscribeForTaskLogPart ¶
func (b *InMemoryBroker) SubscribeForTaskLogPart(handler func(p *tork.TaskLogPart)) error
func (*InMemoryBroker) SubscribeForTaskProgress ¶
func (b *InMemoryBroker) SubscribeForTaskProgress(handler func(tp *tork.Task) error) error
func (*InMemoryBroker) SubscribeForTasks ¶
type LogShipper ¶
func NewLogShipper ¶
func NewLogShipper(broker Broker, taskID string) *LogShipper
type Option ¶
type Option = func(b *RabbitMQBroker)
func WithConsumerTimeoutMS ¶
WithConsumerTimeout sets the maximum amount of time in RabbitMQ will wait for a message ack before from a consumer before deeming the message undelivered and shuting down the connection. Default: 30 minutes
func WithDurableQueues ¶
WithDurableQueues sets the durable flag upon queue creation. Durable queues can survive broker restarts.
func WithHeartbeatTTL ¶
WithHeartbeatTTL sets the TTL for a heartbeat pending in the queue. The value of the TTL argument or policy must be a non-negative integer (0 <= n), describing the TTL period in milliseconds.
func WithManagementURL ¶
type RabbitMQBroker ¶
type RabbitMQBroker struct {
// contains filtered or unexported fields
}
func NewRabbitMQBroker ¶
func NewRabbitMQBroker(url string, opts ...Option) (*RabbitMQBroker, error)
func (*RabbitMQBroker) HealthCheck ¶
func (b *RabbitMQBroker) HealthCheck(ctx context.Context) error
func (*RabbitMQBroker) PublishEvent ¶
func (*RabbitMQBroker) PublishHeartbeat ¶
func (*RabbitMQBroker) PublishJob ¶
func (*RabbitMQBroker) PublishTask ¶
func (*RabbitMQBroker) PublishTaskLogPart ¶
func (b *RabbitMQBroker) PublishTaskLogPart(ctx context.Context, p *tork.TaskLogPart) error
func (*RabbitMQBroker) PublishTaskProgress ¶
func (*RabbitMQBroker) Queues ¶
func (b *RabbitMQBroker) Queues(ctx context.Context) ([]QueueInfo, error)
func (*RabbitMQBroker) SubscribeForEvents ¶
func (*RabbitMQBroker) SubscribeForHeartbeats ¶
func (b *RabbitMQBroker) SubscribeForHeartbeats(handler func(n *tork.Node) error) error
func (*RabbitMQBroker) SubscribeForJobs ¶
func (b *RabbitMQBroker) SubscribeForJobs(handler func(j *tork.Job) error) error
func (*RabbitMQBroker) SubscribeForTaskLogPart ¶
func (b *RabbitMQBroker) SubscribeForTaskLogPart(handler func(p *tork.TaskLogPart)) error
func (*RabbitMQBroker) SubscribeForTaskProgress ¶
func (b *RabbitMQBroker) SubscribeForTaskProgress(handler func(t *tork.Task) error) error