tasks

package
v0.0.0-...-58c0a64 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2024 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// LeaderKey is the path for which nodes contest for lock
	LeaderKey = "leader/election"
)

Variables

View Source
var TopicCacheData map[string]bool = make(map[string]bool)

TopicCacheData is declared Global to keep it instance agnostic

Functions

func CheckIfTopicExists

func CheckIfTopicExists(ctx context.Context, topic string) bool

CheckIfTopicExists is to check if topic exists inside the cache

func UpdateTopicCache

func UpdateTopicCache(ctx context.Context, topicMap map[string]bool, specificTopicBool bool)

UpdateTopicCache is to update the Topic Data cache

Types

type Config

type Config struct {
	NodeBindingRefreshIntervalMins int
}

Config is scheduler config

type ITask

type ITask interface {
	// Run the worker with the given context
	Run(ctx context.Context) error
}

ITask defines worker interface

func NewLeaderTask

func NewLeaderTask(
	id string,
	registry registry.IRegistry,
	nodeCore node.ICore,
	task ITask,
	options ...Option,
) (ITask, error)

NewLeaderTask creates LeaderTask instance

func NewPublisherTask

func NewPublisherTask(
	id string,
	registry registry.IRegistry,
	topicCore topic.ICore,
	options ...Option,
) (ITask, error)

NewPublisherTask creates PublisherTask instance

func NewSchedulerTask

func NewSchedulerTask(
	id string,
	registry registry.IRegistry,
	brokerStore brokerstore.IBrokerStore,
	nodeCore node.ICore,
	topicCore topic.ICore,
	nodeBindingCore nodebinding.ICore,
	subscriptionCore subscription.ICore,
	scheduler scheduler.IScheduler,
	options ...Option,
) (ITask, error)

NewSchedulerTask creates SchedulerTask instance

func NewSubscriptionTask

func NewSubscriptionTask(
	id string,
	reg registry.IRegistry,
	brokerStore brokerstore.IBrokerStore,
	subscriptionCore subscription.ICore,
	nodebindingCore nodebinding.ICore,
	subscriberCore subscriber.ICore,
	options ...Option,
) (ITask, error)

NewSubscriptionTask creates SubscriptionTask instance

type LeaderTask

type LeaderTask struct {
	// contains filtered or unexported fields
}

LeaderTask runs the leader election process and runs the task associated

func (*LeaderTask) Run

func (sm *LeaderTask) Run(ctx context.Context) error

Run the task

type Option

type Option func(task ITask)

A Option is an option for tasks

func WithHTTPConfig

func WithHTTPConfig(config *httpclient.Config) Option

WithHTTPConfig defines the httpClient config for wehbooks http client

func WithName

func WithName(name string) Option

WithName defines the Name for the registry session creation

func WithSchedulerConfig

func WithSchedulerConfig(config *Config) Option

WithSchedulerConfig defines the scheduler config for automatic node binding refresh

func WithTTL

func WithTTL(ttl time.Duration) Option

WithTTL defines the TTL for the registry session

type PublisherTask

type PublisherTask struct {
	// contains filtered or unexported fields
}

PublisherTask implements the Watcher and maintains a pre-warmup.

func (*PublisherTask) Run

func (pu *PublisherTask) Run(ctx context.Context) error

Run the task

type SchedulerTask

type SchedulerTask struct {
	// contains filtered or unexported fields
}

SchedulerTask implements the scheduling of subscriptions over nodes. only leader node elected using the leader election process does scheduling

func (*SchedulerTask) Run

func (sm *SchedulerTask) Run(ctx context.Context) error

Run the task

type SubscriptionTask

type SubscriptionTask struct {
	// contains filtered or unexported fields
}

SubscriptionTask runs the assigned subscriptions to the node

func (*SubscriptionTask) Run

func (sm *SubscriptionTask) Run(ctx context.Context) error

Run the SubscriptionTask process

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL