queue

package
v0.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2024 License: MIT Imports: 45 Imported by: 6

Documentation

Overview

Package queue is in charge of outgoing messages, queueing them when submitted, attempting a first delivery over SMTP, retrying with backoff and sending DSNs for delayed or failed deliveries.

Index

Constants

View Source
const FutureReleaseIntervalMax = 60 * 24 * time.Hour

Allow requesting delivery starting from up to this interval from time of submission.

Variables

View Source
var DB *bstore.DB // Exported for making backups.
View Source
var DBTypes = []any{Msg{}, HoldRule{}, MsgRetired{}, webapi.Suppression{}, Hook{}, HookRetired{}} // Types stored in DB.
View Source
var ErrFromID = errors.New("fromid not unique")

ErrFromID indicate a fromid was present when adding a message to the queue, but it wasn't unique.

View Source
var Localserve bool

Set for mox localserve, to prevent queueing.

Functions

func Add

func Add(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.File, qml ...Msg) error

Add one or more new messages to the queue. If the sender paths and MsgPrefix are identical, they'll get the same BaseID, so they can be delivered in a single SMTP transaction, with a single DATA command, but may be split into multiple transactions if errors/limits are encountered. The queue is kicked immediately to start a first delivery attempt.

ID of the messagse must be 0 and will be set after inserting in the queue.

Add sets derived fields like SenderDomainStr and RecipientDomainStr, and fields related to queueing, such as Queued, NextAttempt.

func ConnectionCounter added in v0.0.10

func ConnectionCounter() int64

func Count

func Count(ctx context.Context) (int, error)

Count returns the number of messages in the delivery queue.

func Drop

func Drop(ctx context.Context, log mlog.Log, f Filter) (affected int, err error)

Drop removes matching messages from the queue. Messages are added as retired message, webhooks with the "canceled" event are queued.

Returns number of messages removed, which can be non-zero even in case of an error.

func Fail added in v0.0.11

func Fail(ctx context.Context, log mlog.Log, f Filter) (affected int, err error)

Fail marks matching messages as failed for delivery, delivers a DSN to the sender, and sends a webhook.

Returns number of messages removed, which can be non-zero even in case of an error.

func HoldRuleRemove added in v0.0.11

func HoldRuleRemove(ctx context.Context, log mlog.Log, holdRuleID int64) error

HoldRuleRemove removes a hold rule. The Hold field of existing messages are not changed.

func HoldSet added in v0.0.11

func HoldSet(ctx context.Context, filter Filter, hold bool) (affected int, err error)

HoldSet sets Hold for all matching messages and kicks the queue.

func HookCancel added in v0.0.11

func HookCancel(ctx context.Context, log mlog.Log, filter HookFilter) (affected int, err error)

HookCancel prevents more delivery attempts of the hook, moving it to the retired list if configured.

func HookNextAttemptAdd added in v0.0.11

func HookNextAttemptAdd(ctx context.Context, filter HookFilter, d time.Duration) (affected int, err error)

HookNextAttemptAdd adds a duration to the NextAttempt for all matching messages, and kicks the queue.

func HookNextAttemptSet added in v0.0.11

func HookNextAttemptSet(ctx context.Context, filter HookFilter, t time.Time) (affected int, err error)

HookNextAttemptSet sets NextAttempt for all matching messages to a new absolute time and kicks the queue.

func HookPost added in v0.0.11

func HookPost(ctx context.Context, log mlog.Log, hookID int64, attempt int, url, authz string, payload string) (code int, response string, err error)

func HookQueueSize added in v0.0.11

func HookQueueSize(ctx context.Context) (int, error)

HookQueueSize returns the number of webhooks in the queue.

func Incoming added in v0.0.11

func Incoming(ctx context.Context, log mlog.Log, acc *store.Account, messageID string, m store.Message, part message.Part, mailboxName string) error

Incoming processes a message delivered over SMTP for webhooks. If the message is a DSN, a webhook for outgoing deliveries may be scheduled (if configured). Otherwise, a webhook for incoming deliveries may be scheduled.

func Init

func Init() error

Init opens the queue database without starting delivery.

func NextAttemptAdd added in v0.0.11

func NextAttemptAdd(ctx context.Context, filter Filter, d time.Duration) (affected int, err error)

NextAttemptAdd adds a duration to the NextAttempt for all matching messages, and kicks the queue.

func NextAttemptSet added in v0.0.11

func NextAttemptSet(ctx context.Context, filter Filter, t time.Time) (affected int, err error)

NextAttemptSet sets NextAttempt for all matching messages to a new time, and kicks the queue.

func RequireTLSSet added in v0.0.11

func RequireTLSSet(ctx context.Context, filter Filter, requireTLS *bool) (affected int, err error)

RequireTLSSet updates the RequireTLS field of matching messages.

func Shutdown

func Shutdown()

Shutdown closes the queue database. The delivery process isn't stopped. For tests only.

func Start

func Start(resolver dns.Resolver, done chan struct{}) error

Start opens the database by calling Init, then starts the delivery and cleanup processes.

func SuppressionAdd added in v0.0.11

func SuppressionAdd(ctx context.Context, originalAddress smtp.Path, sup *webapi.Suppression) error

SuppressionAdd adds a suppression for an address for an account, setting BaseAddress based on OriginalAddress.

If the base address of original address is already present, an error is returned (such as from bstore).

SuppressionAdd does not check if an account exists.

func SuppressionList added in v0.0.11

func SuppressionList(ctx context.Context, account string) ([]webapi.Suppression, error)

SuppressionList returns suppression. If account is not empty, only suppression for that account are returned.

SuppressionList does not check if an account exists.

func SuppressionLookup added in v0.0.11

func SuppressionLookup(ctx context.Context, account string, address smtp.Path) (*webapi.Suppression, error)

SuppressionLookup looks up a suppression for an address for an account. Returns a nil suppression if not found.

SuppressionLookup does not check if an account exists.

func SuppressionRemove added in v0.0.11

func SuppressionRemove(ctx context.Context, account string, address smtp.Path) error

SuppressionRemove removes a suppression. The base address for the the given address is removed.

SuppressionRemove does not check if an account exists.

func TransportSet added in v0.0.11

func TransportSet(ctx context.Context, filter Filter, transport string) (affected int, err error)

TransportSet changes the transport to use for the matching messages.

Types

type Filter added in v0.0.11

type Filter struct {
	Max         int
	IDs         []int64
	Account     string
	From        string
	To          string
	Hold        *bool
	Submitted   string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
	NextAttempt string // ">$duration" or "<$duration", also with "now" for duration.
	Transport   *string
}

Filter filters messages to list or operate on. Used by admin web interface and cli.

Only non-empty/non-zero values are applied to the filter. Leaving all fields empty/zero matches all messages.

type HoldRule added in v0.0.11

type HoldRule struct {
	ID                 int64
	Account            string
	SenderDomain       dns.Domain
	RecipientDomain    dns.Domain
	SenderDomainStr    string // Unicode.
	RecipientDomainStr string // Unicode.
}

HoldRule is a set of conditions that cause a matching message to be marked as on hold when it is queued. All-empty conditions matches all messages, effectively pausing the entire queue.

func HoldRuleAdd added in v0.0.11

func HoldRuleAdd(ctx context.Context, log mlog.Log, hr HoldRule) (HoldRule, error)

HoldRuleAdd adds a new hold rule causing newly submitted messages to be marked as "on hold", and existing matching messages too.

func HoldRuleList added in v0.0.11

func HoldRuleList(ctx context.Context) ([]HoldRule, error)

HoldRuleList returns all hold rules.

func (HoldRule) All added in v0.0.11

func (pr HoldRule) All() bool

type Hook added in v0.0.11

type Hook struct {
	ID         int64
	QueueMsgID int64             `bstore:"index"` // Original queue Msg/MsgRetired ID. Zero for hooks for incoming messages.
	FromID     string            // As generated by us and returned in webapi call. Can be empty, for incoming messages to our base address.
	MessageID  string            // Of outgoing or incoming messages. Includes <>.
	Subject    string            // Subject of original outgoing message, or of incoming message.
	Extra      map[string]string // From submitted message.

	Account       string `bstore:"nonzero"`
	URL           string `bstore:"nonzero"` // Taken from config when webhook is scheduled.
	Authorization string // Optional value for authorization header to include in HTTP request.
	IsIncoming    bool
	OutgoingEvent string // Empty string if not outgoing.
	Payload       string // JSON data to be submitted.

	Submitted   time.Time `bstore:"default now,index"`
	Attempts    int
	NextAttempt time.Time `bstore:"nonzero,index"` // Index for fast scheduling.
	Results     []HookResult
}

Hook is a webhook call about a delivery. We'll try delivering with backoff until we succeed or fail.

func HookList added in v0.0.11

func HookList(ctx context.Context, filter HookFilter, sort HookSort) ([]Hook, error)

HookList returns webhooks according to filter and sort.

func (Hook) LastResult added in v0.0.11

func (h Hook) LastResult() HookResult

LastResult returns the last result entry, or an empty result.

func (Hook) Retired added in v0.0.11

func (h Hook) Retired(success bool, lastActivity, keepUntil time.Time) HookRetired

Retired returns a HookRetired for a Hook, for insertion into the database.

type HookFilter added in v0.0.11

type HookFilter struct {
	Max         int
	IDs         []int64
	Account     string
	Submitted   string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
	NextAttempt string // ">$duration" or "<$duration", also with "now" for duration.
	Event       string // Including "incoming".
}

HookFilter filters messages to list or operate on. Used by admin web interface and cli.

Only non-empty/non-zero values are applied to the filter. Leaving all fields empty/zero matches all hooks.

type HookResult added in v0.0.11

type HookResult struct {
	Start    time.Time
	Duration time.Duration
	URL      string
	Success  bool
	Code     int // eg 200, 404, 500. 2xx implies success.
	Error    string
	Response string // Max 512 bytes of HTTP response body.
}

HookResult is the result of a single attempt to deliver a webhook.

type HookRetired added in v0.0.11

type HookRetired struct {
	ID         int64             // Same as original Hook.ID.
	QueueMsgID int64             // Original queue Msg or MsgRetired ID. Zero for hooks for incoming messages.
	FromID     string            // As generated by us and returned in webapi call. Can be empty, for incoming messages to our base address.
	MessageID  string            // Of outgoing or incoming messages. Includes <>.
	Subject    string            // Subject of original outgoing message, or of incoming message.
	Extra      map[string]string // From submitted message.

	Account       string `bstore:"nonzero,index Account+LastActivity"`
	URL           string `bstore:"nonzero"` // Taken from config at start of each attempt.
	Authorization bool   // Whether request had authorization without keeping it around.
	IsIncoming    bool
	OutgoingEvent string
	Payload       string // JSON data submitted.

	Submitted      time.Time
	SupersededByID int64 // If not 0, a Hook.ID that superseded this one and Done will be true.
	Attempts       int
	Results        []HookResult

	Success      bool
	LastActivity time.Time `bstore:"index"`
	KeepUntil    time.Time `bstore:"index"`
}

HookRetired is a Hook that was delivered/failed/canceled and kept according to the configuration.

func HookRetiredList added in v0.0.11

func HookRetiredList(ctx context.Context, filter HookRetiredFilter, sort HookRetiredSort) ([]HookRetired, error)

HookRetiredList returns retired webhooks according to filter and sort.

func (HookRetired) LastResult added in v0.0.11

func (h HookRetired) LastResult() HookResult

LastResult returns the last result entry, or an empty result.

type HookRetiredFilter added in v0.0.11

type HookRetiredFilter struct {
	Max          int
	IDs          []int64
	Account      string
	Submitted    string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
	LastActivity string // ">$duration" or "<$duration", also with "now" for duration.
	Event        string // Including "incoming".
}

HookRetiredFilter filters messages to list or operate on. Used by admin web interface and cli.

Only non-empty/non-zero values are applied to the filter. Leaving all fields empty/zero matches all hooks.

type HookRetiredSort added in v0.0.11

type HookRetiredSort struct {
	Field  string // "Queued" or "LastActivity"/"".
	LastID int64  // If > 0, we return objects beyond this, less/greater depending on Asc.
	Last   any    // Value of Field for last object. Must be set iff LastID is set.
	Asc    bool   // Ascending, or descending.
}

type HookSort added in v0.0.11

type HookSort struct {
	Field  string // "Queued" or "NextAttempt"/"".
	LastID int64  // If > 0, we return objects beyond this, less/greater depending on Asc.
	Last   any    // Value of Field for last object. Must be set iff LastID is set.
	Asc    bool   // Ascending, or descending.
}

type Msg

type Msg struct {
	ID int64

	// A message for multiple recipients will get a BaseID that is identical to the
	// first Msg.ID queued. The message contents will be identical for each recipient,
	// including MsgPrefix. If other properties are identical too, including recipient
	// domain, multiple Msgs may be delivered in a single SMTP transaction. For
	// messages with a single recipient, this field will be 0.
	BaseID int64 `bstore:"index"`

	Queued             time.Time      `bstore:"default now"`
	Hold               bool           // If set, delivery won't be attempted.
	SenderAccount      string         // Failures are delivered back to this local account. Also used for routing.
	SenderLocalpart    smtp.Localpart // Should be a local user and domain.
	SenderDomain       dns.IPDomain
	SenderDomainStr    string         // For filtering, unicode.
	FromID             string         // For transactional messages, used to match later DSNs.
	RecipientLocalpart smtp.Localpart // Typically a remote user and domain.
	RecipientDomain    dns.IPDomain
	RecipientDomainStr string              // For filtering, unicode domain. Can also contain ip enclosed in [].
	Attempts           int                 // Next attempt is based on last attempt and exponential back off based on attempts.
	MaxAttempts        int                 // Max number of attempts before giving up. If 0, then the default of 8 attempts is used instead.
	DialedIPs          map[string][]net.IP // For each host, the IPs that were dialed. Used for IP selection for later attempts.
	NextAttempt        time.Time           // For scheduling.
	LastAttempt        *time.Time
	Results            []MsgResult

	Has8bit       bool   // Whether message contains bytes with high bit set, determines whether 8BITMIME SMTP extension is needed.
	SMTPUTF8      bool   // Whether message requires use of SMTPUTF8.
	IsDMARCReport bool   // Delivery failures for DMARC reports are handled differently.
	IsTLSReport   bool   // Delivery failures for TLS reports are handled differently.
	Size          int64  // Full size of message, combined MsgPrefix with contents of message file.
	MessageID     string // Message-ID header, including <>. Used when composing a DSN, in its References header.
	MsgPrefix     []byte // Data to send before the contents from the file, typically with headers like DKIM-Signature.
	Subject       string // For context about delivery.

	// If set, this message is a DSN and this is a version using utf-8, for the case
	// the remote MTA supports smtputf8. In this case, Size and MsgPrefix are not
	// relevant.
	DSNUTF8 []byte

	// If non-empty, the transport to use for this message. Can be set through cli or
	// admin interface. If empty (the default for a submitted message), regular routing
	// rules apply.
	Transport string

	// RequireTLS influences TLS verification during delivery.
	//
	// If nil, the recipient domain policy is followed (MTA-STS and/or DANE), falling
	// back to optional opportunistic non-verified STARTTLS.
	//
	// If RequireTLS is true (through SMTP REQUIRETLS extension or webmail submit),
	// MTA-STS or DANE is required, as well as REQUIRETLS support by the next hop
	// server.
	//
	// If RequireTLS is false (through messag header "TLS-Required: No"), the recipient
	// domain's policy is ignored if it does not lead to a successful TLS connection,
	// i.e. falling back to SMTP delivery with unverified STARTTLS or plain text.
	RequireTLS *bool

	// For DSNs, where the original FUTURERELEASE value must be included as per-message
	// field. This field should be of the form "for;" plus interval, or "until;" plus
	// utc date-time.
	FutureReleaseRequest string

	Extra map[string]string // Extra information, for transactional email.
}

Msg is a message in the queue.

Use MakeMsg to make a message with fields that Add needs. Add will further set queueing related fields.

func List

func List(ctx context.Context, filter Filter, sort Sort) ([]Msg, error)

List returns max 100 messages matching filter in the delivery queue. By default, orders by next delivery attempt.

func MakeMsg added in v0.0.8

func MakeMsg(sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool, next time.Time, subject string) Msg

MakeMsg is a convenience function that sets the commonly used fields for a Msg. messageID should include <>.

func (*Msg) LastResult added in v0.0.11

func (m *Msg) LastResult() MsgResult

LastResult returns the last result entry, or an empty result.

func (Msg) MessagePath

func (m Msg) MessagePath() string

MessagePath returns the path where the message is stored.

func (Msg) Recipient

func (m Msg) Recipient() smtp.Path

Recipient of message as used in RCPT TO.

func (Msg) Retired added in v0.0.11

func (m Msg) Retired(success bool, t, keepUntil time.Time) MsgRetired

Retired returns a MsgRetired for the message, for history of deliveries.

func (Msg) Sender

func (m Msg) Sender() smtp.Path

Sender of message as used in MAIL FROM.

type MsgResult added in v0.0.11

type MsgResult struct {
	Start    time.Time
	Duration time.Duration
	Success  bool
	Code     int
	Secode   string
	Error    string
}

MsgResult is the result (or work in progress) of a delivery attempt.

type MsgRetired added in v0.0.11

type MsgRetired struct {
	ID int64 // Same ID as it was as Msg.ID.

	BaseID             int64
	Queued             time.Time
	SenderAccount      string         // Failures are delivered back to this local account. Also used for routing.
	SenderLocalpart    smtp.Localpart // Should be a local user and domain.
	SenderDomainStr    string         // For filtering, unicode.
	FromID             string         `bstore:"index"` // Used to match DSNs.
	RecipientLocalpart smtp.Localpart // Typically a remote user and domain.
	RecipientDomain    dns.IPDomain
	RecipientDomainStr string              // For filtering, unicode.
	Attempts           int                 // Next attempt is based on last attempt and exponential back off based on attempts.
	MaxAttempts        int                 // Max number of attempts before giving up. If 0, then the default of 8 attempts is used instead.
	DialedIPs          map[string][]net.IP // For each host, the IPs that were dialed. Used for IP selection for later attempts.
	LastAttempt        *time.Time
	Results            []MsgResult

	Has8bit       bool   // Whether message contains bytes with high bit set, determines whether 8BITMIME SMTP extension is needed.
	SMTPUTF8      bool   // Whether message requires use of SMTPUTF8.
	IsDMARCReport bool   // Delivery failures for DMARC reports are handled differently.
	IsTLSReport   bool   // Delivery failures for TLS reports are handled differently.
	Size          int64  // Full size of message, combined MsgPrefix with contents of message file.
	MessageID     string // Used when composing a DSN, in its References header.
	Subject       string // For context about delivery.

	Transport            string
	RequireTLS           *bool
	FutureReleaseRequest string

	Extra map[string]string // Extra information, for transactional email.

	LastActivity     time.Time `bstore:"index"`
	RecipientAddress string    `bstore:"index RecipientAddress+LastActivity"`
	Success          bool      // Whether delivery to next hop succeeded.
	KeepUntil        time.Time `bstore:"index"`
}

MsgRetired is a message for which delivery completed, either successful, failed/canceled. Retired messages are only stored if so configured, and will be cleaned up after the configured period.

func RetiredList added in v0.0.11

func RetiredList(ctx context.Context, filter RetiredFilter, sort RetiredSort) ([]MsgRetired, error)

RetiredList returns retired messages.

func (MsgRetired) LastResult added in v0.0.11

func (m MsgRetired) LastResult() MsgResult

LastResult returns the last result entry, or an empty result.

func (MsgRetired) Recipient added in v0.0.11

func (m MsgRetired) Recipient() smtp.Path

Recipient of message as used in RCPT TO.

func (MsgRetired) Sender added in v0.0.11

func (m MsgRetired) Sender() (path smtp.Path, err error)

Sender of message as used in MAIL FROM.

type ReadReaderAtCloser added in v0.0.3

type ReadReaderAtCloser interface {
	io.ReadCloser
	io.ReaderAt
}

func OpenMessage

func OpenMessage(ctx context.Context, id int64) (ReadReaderAtCloser, error)

OpenMessage opens a message present in the queue.

type RetiredFilter added in v0.0.11

type RetiredFilter struct {
	Max          int
	IDs          []int64
	Account      string
	From         string
	To           string
	Submitted    string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
	LastActivity string // ">$duration" or "<$duration", also with "now" for duration.
	Transport    *string
	Success      *bool
}

RetiredFilter filters messages to list or operate on. Used by admin web interface and cli.

Only non-empty/non-zero values are applied to the filter. Leaving all fields empty/zero matches all messages.

type RetiredSort added in v0.0.11

type RetiredSort struct {
	Field  string // "Queued" or "LastActivity"/"".
	LastID int64  // If > 0, we return objects beyond this, less/greater depending on Asc.
	Last   any    // Value of Field for last object. Must be set iff LastID is set.
	Asc    bool   // Ascending, or descending.
}

type Sort added in v0.0.11

type Sort struct {
	Field  string // "Queued" or "NextAttempt"/"".
	LastID int64  // If > 0, we return objects beyond this, less/greater depending on Asc.
	Last   any    // Value of Field for last object. Must be set iff LastID is set.
	Asc    bool   // Ascending, or descending.
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL