mqueue

package
v2.32.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2023 License: MIT Imports: 17 Imported by: 0

README

Simple message queue based on postgres, this is for more realiably sending messages with retry on failure, accepting long failture durations such as discord being down.

Documentation

Index

Constants

View Source
const DBSchema = `` /* 266-byte string literal not displayed */

Variables

This section is empty.

Functions

func QueueMessage

func QueueMessage(elem *QueuedElement) error

QueueMessage queues a message in the message queue

func RegisterPlugin

func RegisterPlugin()

RegisterPlugin registers the mqueue plugin into the plugin system and also initializes it

func RegisterSource

func RegisterSource(name string, source PluginWithSourceDisabler)

RegisterSource registers a mqueue source, used for error handling

Types

type DiscordProcessor

type DiscordProcessor struct {
}

func (*DiscordProcessor) ProcessItem

func (d *DiscordProcessor) ProcessItem(resp chan *workResult, wi *workItem)

type ItemProcessor

type ItemProcessor interface {
	ProcessItem(resp chan *workResult, wi *workItem)
}

type MqueueServer

type MqueueServer struct {
	PushWork chan *workItem

	Stop chan *sync.WaitGroup
	// contains filtered or unexported fields
}

MqueueServer is a worker that processes mqueue items for the current shards on the process It uses primarily pubsub but it initializes the list by checking the sorted list

func NewServer

func NewServer(backend Storage, processor ItemProcessor) *MqueueServer

func (*MqueueServer) Run

func (m *MqueueServer) Run()

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

Plugin represents the mqueue plugin

func (*Plugin) BotInit

func (p *Plugin) BotInit()

func (*Plugin) LateBotInit

func (p *Plugin) LateBotInit()

LateBotInit implements bot.LateBotInitHandler

func (*Plugin) PluginInfo

func (p *Plugin) PluginInfo() *common.PluginInfo

PluginInfo implements common.Plugin

func (*Plugin) RunBackgroundWorker

func (p *Plugin) RunBackgroundWorker()

RunBackgroundWorker implements backgroundworkers.BackgroundWorkerPlugin

func (*Plugin) StopBackgroundWorker

func (p *Plugin) StopBackgroundWorker(wg *sync.WaitGroup)

StopBackgroundWorker implements backgroundworkers.BackgroundWorkerPlugin

func (*Plugin) StopBot

func (p *Plugin) StopBot(wg *sync.WaitGroup)

StopBot implements bot.BotStopperHandler

type PluginWithSourceDisabler

type PluginWithSourceDisabler interface {
	DisableFeed(elem *QueuedElement, err error)
}

PluginWithSourceDisabler

type PluginWithWebhookAvatar

type PluginWithWebhookAvatar interface {
	WebhookAvatar() string
}

PluginWithWebhookAvatar can be implemented by plugins for custom avatars

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) QueueMessage

func (p *Producer) QueueMessage(elem *QueuedElement) error

QueueMessage queues a message in the message queue

type QueuedElement

type QueuedElement struct {
	// The channel to send the message in
	ChannelID int64 `json:"Channel"`
	GuildID   int64 `json:"Guild"`

	ID int64

	// Where this feed originated from, responsible for handling discord specific errors
	Source string
	// Could be stuff like reddit feed element id, youtube feed element id and so on
	SourceItemID string `json:"SourceID"`

	// The actual message as a simple string
	MessageStr string `json:",omitempty"`

	// The actual message as an embed
	MessageEmbed *discordgo.MessageEmbed `json:",omitempty"`

	UseWebhook      bool
	WebhookUsername string

	AllowedMentions discordgo.AllowedMentions `json:"allowed_mentions"`

	// Publish the message if the channel is an announcement channel
	PublishAnnouncement bool

	// When the queue grows, the feeds with the highest priority gets sent first
	Priority int

	CreatedAt time.Time
}

QueuedElement represents a queued message

type RedisBackend

type RedisBackend struct {
	// contains filtered or unexported fields
}

func NewRedisBackend

func NewRedisBackend(pool *radix.Pool) *RedisBackend

func (*RedisBackend) AppendItem

func (rb *RedisBackend) AppendItem(elem *QueuedElement) error

func (*RedisBackend) DelItem

func (rb *RedisBackend) DelItem(item *workItem) error

func (*RedisBackend) GetFullQueue

func (rb *RedisBackend) GetFullQueue() ([]*workItem, error)

func (*RedisBackend) NextID

func (rb *RedisBackend) NextID() (next int64, err error)

type RedisPushServer

type RedisPushServer struct {
	// contains filtered or unexported fields
}

type Storage

type Storage interface {
	GetFullQueue() ([]*workItem, error)
	AppendItem(elem *QueuedElement) error
	DelItem(elem *workItem) error
	NextID() (int64, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL