task

package
v2.0.0-alpha3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2023 License: MIT Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TgSuccessMsgSubscribed   = "Subscribed successfully."
	TgSuccessMsgUnsubscribed = "Unsubscribed successfully."

	TgErrMsgRequiredSite = "Site (domain) is required."
	TgErrMsgNotFoundSite = "Site `%s` not found."
)
View Source
const (
	OpClientEnqueue errs.Op = "task.client: enqueue"

	OpMetricsRegister errs.Op = "task.metrics: register"
	OpMetricsClose    errs.Op = "task.metrics: close"

	OpServerStart        errs.Op = "task.server: start"
	OpServerProcessTask  errs.Op = "task.server: process task"
	OpServerParseFeed    errs.Op = "task.server: parse feed link"
	OpServerParseSitemap errs.Op = "task.server: parse sitemap link"
	OpServerParseArticle errs.Op = "task.server: parse article link"

	OpSchedulerStart  errs.Op = "task.scheduler: start"
	OpSchedulerSync   errs.Op = "task.scheduler: sync"
	OpSchedulerAdd    errs.Op = "task.scheduler: add"
	OpSchedulerRemove errs.Op = "task.scheduler: remove"

	OpMarshal   errs.Op = "task: marshal payload"
	OpUnmarshal errs.Op = "task: unmarshal payload"
)
View Source
const (
	TgCmdStart  = "start"
	TgCmdRumors = "rumors"
	TgCmdSites  = "sites"
	TgCmdSub    = "sub"
	TgCmdOn     = "on"
	TgCmdOff    = "off"
)
View Source
const (
	TelegramPrefix    = "telegram:"
	TelegramCmd       = TelegramPrefix + "cmd:"
	TelegramCmdRumors = TelegramCmd + TgCmdRumors
	TelegramCmdSites  = TelegramCmd + TgCmdSites
	TelegramCmdSub    = TelegramCmd + TgCmdSub
	TelegramCmdOn     = TelegramCmd + TgCmdOn
	TelegramCmdOff    = TelegramCmd + TgCmdOff
	TelegramChat      = TelegramPrefix + "chat:"
	TelegramChatNew   = TelegramChat + "new"
	TelegramChatEdit  = TelegramChat + "edit"
)
View Source
const ConfigSchedulerKey = "task.scheduler"
View Source
const ConfigServerKey = "task.server"
View Source
const DefaultQueue = "default"

Variables

This section is empty.

Functions

func ClientActivator

func ClientActivator() *di.Activator

func GetServerMux

func GetServerMux(ctx context.Context, c ...di.Container) (*asynq.ServeMux, error)

func LoggingMiddleware

func LoggingMiddleware(log *slog.Logger) asynq.MiddlewareFunc

func MetricsActivator

func MetricsActivator() *di.Activator

func SchedulerActivator

func SchedulerActivator() *di.Activator

func ServerActivator

func ServerActivator() *di.Activator

func ServerMuxActivator

func ServerMuxActivator() *di.Activator

func TgCmdMiddleware

func TgCmdMiddleware(
	siteRepo repository.ReadRepository[*entity.Site],
	chatRepo repository.ReadWriteRepository[*entity.Chat],
	publisher *pubsub.Publisher,
	logger *slog.Logger,
) asynq.MiddlewareFunc

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func GetClient

func GetClient(ctx context.Context, c ...di.Container) (*Client, error)

func NewClient

func NewClient(rdbMaker *rdb.UniversalClientMaker) *Client

func (*Client) Close

func (c *Client) Close() error

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, name string, data any, opts ...asynq.Option) error

func (*Client) EnqueueTgCmd

func (c *Client) EnqueueTgCmd(ctx context.Context, message *tgbotapi.Message, updateID int)

func (*Client) EnqueueTgMemberEdit

func (c *Client) EnqueueTgMemberEdit(ctx context.Context, member *tgbotapi.ChatMemberUpdated, updateID int)

func (*Client) EnqueueTgMemberNew

func (c *Client) EnqueueTgMemberNew(ctx context.Context, member *tgbotapi.Chat, updateID int)

type ClientKey

type ClientKey struct{}

type HandlerJobFeed

type HandlerJobFeed struct {
	// contains filtered or unexported fields
}

func (*HandlerJobFeed) ProcessTask

func (h *HandlerJobFeed) ProcessTask(ctx context.Context, task *asynq.Task) error

type HandlerJobSitemap

type HandlerJobSitemap struct {
	// contains filtered or unexported fields
}

func (*HandlerJobSitemap) ProcessTask

func (h *HandlerJobSitemap) ProcessTask(ctx context.Context, task *asynq.Task) error

type HandlerTgChat

type HandlerTgChat struct {
	// contains filtered or unexported fields
}

func (*HandlerTgChat) ProcessTask

func (h *HandlerTgChat) ProcessTask(ctx context.Context, task *asynq.Task) error

type HandlerTgCmdOff

type HandlerTgCmdOff struct {
	// contains filtered or unexported fields
}

func (*HandlerTgCmdOff) ProcessTask

func (h *HandlerTgCmdOff) ProcessTask(ctx context.Context, _ *asynq.Task) error

type HandlerTgCmdOn

type HandlerTgCmdOn struct {
	// contains filtered or unexported fields
}

func (*HandlerTgCmdOn) ProcessTask

func (h *HandlerTgCmdOn) ProcessTask(ctx context.Context, _ *asynq.Task) error

type HandlerTgCmdRumors

type HandlerTgCmdRumors struct {
	// contains filtered or unexported fields
}

func (*HandlerTgCmdRumors) ProcessTask

func (h *HandlerTgCmdRumors) ProcessTask(ctx context.Context, _ *asynq.Task) error

type HandlerTgCmdSites

type HandlerTgCmdSites struct {
	// contains filtered or unexported fields
}

func (*HandlerTgCmdSites) ProcessTask

func (h *HandlerTgCmdSites) ProcessTask(ctx context.Context, _ *asynq.Task) error

type HandlerTgCmdSub

type HandlerTgCmdSub struct {
	// contains filtered or unexported fields
}

func (*HandlerTgCmdSub) ProcessTask

func (h *HandlerTgCmdSub) ProcessTask(ctx context.Context, _ *asynq.Task) error

type Meta

type Meta struct {
	*opengraph.Meta
}

func MetaTag

func MetaTag(node *html.Node) *Meta

func (*Meta) Contribute

func (meta *Meta) Contribute(og *opengraph.OpenGraph) (err error)

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func GetMetrics

func GetMetrics(ctx context.Context, c ...di.Container) (*Metrics, error)

func NewMetrics

func NewMetrics(rdbMaker *rdb.UniversalClientMaker) *Metrics

func (*Metrics) Close

func (m *Metrics) Close() error

func (*Metrics) Register

func (m *Metrics) Register() error

func (*Metrics) Unregister

func (m *Metrics) Unregister()

type MetricsKey

type MetricsKey struct{}

type PostEnqueueFunc

type PostEnqueueFunc func(info *asynq.TaskInfo, err error)

type PreEnqueueFunc

type PreEnqueueFunc func(task *asynq.Task, opts []asynq.Option)

type Scheduler

type Scheduler struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func GetScheduler

func GetScheduler(ctx context.Context, c ...di.Container) (*Scheduler, error)

func NewScheduler

func NewScheduler(repo repository.ReadRepository[*entity.Job], rdbMaker *rdb.UniversalClientMaker, options ...SchedulerOption) *Scheduler

func (*Scheduler) Add

func (s *Scheduler) Add(job *entity.Job) error

func (*Scheduler) Remove

func (s *Scheduler) Remove(id uuid.UUID) error

func (*Scheduler) Run

func (s *Scheduler) Run(ctx context.Context) error

type SchedulerConfig

type SchedulerConfig struct {
	SyncInterval time.Duration `mapstructure:"sync_interval"`
}

func (*SchedulerConfig) Init

func (cfg *SchedulerConfig) Init()

type SchedulerKey

type SchedulerKey struct{}

type SchedulerOption

type SchedulerOption func(*Scheduler)

func WithInterval

func WithInterval(interval time.Duration) SchedulerOption

func WithPostEnqueueFunc

func WithPostEnqueueFunc(fn PostEnqueueFunc) SchedulerOption

func WithPreEnqueueFunc

func WithPreEnqueueFunc(fn PreEnqueueFunc) SchedulerOption

type Server

type Server struct {
	// contains filtered or unexported fields
}

func GetServer

func GetServer(ctx context.Context, c ...di.Container) (*Server, error)

func NewServer

func NewServer(cfg *ServerConfig, rdbMaker *rdb.UniversalClientMaker, options ...ServerOption) *Server

func (*Server) Run

func (s *Server) Run(ctx context.Context, handler asynq.Handler) error

type ServerConfig

type ServerConfig struct {
	Concurrency              int            `mapstructure:"concurrency"`
	Queues                   map[string]int `mapstructure:"queues"`
	StrictPriority           bool           `mapstructure:"strict_priority"`
	HealthCheckInterval      time.Duration  `mapstructure:"health_check_interval"`
	DelayedTaskCheckInterval time.Duration  `mapstructure:"delayed_task_check_interval"`
	GroupGracePeriod         time.Duration  `mapstructure:"group_grace_period"`
	GroupMaxDelay            time.Duration  `mapstructure:"group_max_delay"`
	GroupMaxSize             int            `mapstructure:"group_max_size"`
	GracefulTimeout          time.Duration  `mapstructure:"graceful_timeout"`
}

func (*ServerConfig) Init

func (cfg *ServerConfig) Init()

type ServerKey

type ServerKey struct{}

type ServerMuxKey

type ServerMuxKey struct{}

type ServerOption

type ServerOption func(s *Server)

func WithErrorHandler

func WithErrorHandler(errorHandler asynq.ErrorHandler) ServerOption

func WithGroupAggregator

func WithGroupAggregator(groupAggregator asynq.GroupAggregator) ServerOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL