Documentation ¶
Index ¶
- Constants
- Variables
- type APIClient
- type ApplyRequest
- type ApplyStage
- type ApplyStageInput
- type BuildRequest
- type BuildResult
- type BuildStage
- type BuildStageInput
- type BuildStageOutput
- type ChannelConsumerProducer
- type DefaultDownloadRateModifier
- type DownloadCache
- func (s *DownloadCache) Clear()
- func (s *DownloadCache) Count() (int, int)
- func (s *DownloadCache) DeleteAttachments(id ...string)
- func (s *DownloadCache) DeleteMessages(id ...string)
- func (s *DownloadCache) GetAttachment(id string) ([]byte, bool)
- func (s *DownloadCache) GetMessage(id string) (proton.Message, bool)
- func (s *DownloadCache) StoreAttachment(id string, data []byte)
- func (s *DownloadCache) StoreMessage(message proton.Message)
- type DownloadRateModifier
- type DownloadRequest
- type DownloadStage
- type DownloadStageInput
- type DownloadStageOutput
- type Handler
- type Job
- type JobWaiterMessage
- type LabelMap
- type MessageBuilder
- type MetadataStage
- type MetadataStageInput
- type MetadataStageOutput
- type Regulator
- type Reporter
- type Service
- type StageInputConsumer
- type StageOutputProducer
- type StateProvider
- type Status
- type UpdateApplier
Constants ¶
const DefaultRetryCoolDown = 20 * time.Second
const Gigabyte = 1024 * Megabyte
const Kilobyte = uint64(1024)
const Megabyte = 1024 * Kilobyte
const MetadataMaxMessages = 64
const MetadataPageSize = 128
const NumSyncStages = 4
Variables ¶
var ErrNoMoreInput = errors.New("no more input")
Functions ¶
This section is empty.
Types ¶
type APIClient ¶
type APIClient interface { GetGroupedMessageCount(ctx context.Context) ([]proton.MessageGroupCount, error) GetLabels(ctx context.Context, labelTypes ...proton.LabelType) ([]proton.Label, error) GetMessage(ctx context.Context, messageID string) (proton.Message, error) GetMessageMetadataPage(ctx context.Context, page, pageSize int, filter proton.MessageFilter) ([]proton.MessageMetadata, error) GetFullMessage(ctx context.Context, messageID string, scheduler proton.Scheduler, storageProvider proton.AttachmentAllocator) (proton.FullMessage, error) GetAttachmentInto(ctx context.Context, attachmentID string, reader io.ReaderFrom) error GetAttachment(ctx context.Context, attachmentID string) ([]byte, error) }
type ApplyRequest ¶
type ApplyRequest struct {
// contains filtered or unexported fields
}
type ApplyStage ¶
type ApplyStage struct {
// contains filtered or unexported fields
}
ApplyStage applies the sync updates and waits for their completion before proceeding with the next batch. This is the final stage in the sync pipeline.
func NewApplyStage ¶
func NewApplyStage(input ApplyStageInput) *ApplyStage
func (*ApplyStage) Run ¶
func (a *ApplyStage) Run(group *async.Group)
type ApplyStageInput ¶
type ApplyStageInput = StageInputConsumer[ApplyRequest]
type BuildRequest ¶
type BuildRequest struct {
// contains filtered or unexported fields
}
type BuildResult ¶
type BuildResult struct { AddressID string MessageID string Update *imap.MessageCreated }
type BuildStage ¶
type BuildStage struct {
// contains filtered or unexported fields
}
BuildStage is in charge of decrypting and converting the downloaded messages from the previous stage into RFC822 compliant messages which can then be sent to the IMAP server.
func NewBuildStage ¶
func NewBuildStage( input BuildStageInput, output BuildStageOutput, maxBuildMem uint64, panicHandler async.PanicHandler, observabilitySender observability.Sender, ) *BuildStage
func (*BuildStage) Run ¶
func (b *BuildStage) Run(group *async.Group)
type BuildStageInput ¶
type BuildStageInput = StageInputConsumer[BuildRequest]
type BuildStageOutput ¶
type BuildStageOutput = StageOutputProducer[ApplyRequest]
type ChannelConsumerProducer ¶
type ChannelConsumerProducer[T any] struct { // contains filtered or unexported fields }
func NewChannelConsumerProducer ¶
func NewChannelConsumerProducer[T any]() *ChannelConsumerProducer[T]
func (ChannelConsumerProducer[T]) Close ¶
func (c ChannelConsumerProducer[T]) Close()
type DefaultDownloadRateModifier ¶
type DefaultDownloadRateModifier struct{}
type DownloadCache ¶
type DownloadCache struct {
// contains filtered or unexported fields
}
func (*DownloadCache) Clear ¶
func (s *DownloadCache) Clear()
func (*DownloadCache) Count ¶
func (s *DownloadCache) Count() (int, int)
func (*DownloadCache) DeleteAttachments ¶
func (s *DownloadCache) DeleteAttachments(id ...string)
func (*DownloadCache) DeleteMessages ¶
func (s *DownloadCache) DeleteMessages(id ...string)
func (*DownloadCache) GetAttachment ¶
func (s *DownloadCache) GetAttachment(id string) ([]byte, bool)
func (*DownloadCache) GetMessage ¶
func (s *DownloadCache) GetMessage(id string) (proton.Message, bool)
func (*DownloadCache) StoreAttachment ¶
func (s *DownloadCache) StoreAttachment(id string, data []byte)
func (*DownloadCache) StoreMessage ¶
func (s *DownloadCache) StoreMessage(message proton.Message)
type DownloadRateModifier ¶
type DownloadRequest ¶
type DownloadRequest struct {
// contains filtered or unexported fields
}
type DownloadStage ¶
type DownloadStage struct {
// contains filtered or unexported fields
}
DownloadStage downloads the messages and attachments. It auto-throttles the download of the messages based on whether we run into 429|5xx codes.
func NewDownloadStage ¶
func NewDownloadStage( input DownloadStageInput, output DownloadStageOutput, maxParallelDownloads int, panicHandler async.PanicHandler, ) *DownloadStage
func (*DownloadStage) Run ¶
func (d *DownloadStage) Run(group *async.Group)
type DownloadStageInput ¶
type DownloadStageInput = StageInputConsumer[DownloadRequest]
type DownloadStageOutput ¶
type DownloadStageOutput = StageOutputProducer[BuildRequest]
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler is the interface from which we control the syncing of the IMAP data. One instance should be created for each user and used for every subsequent sync request.
func NewHandler ¶
func NewHandler( regulator Regulator, client APIClient, userID string, state StateProvider, log *logrus.Entry, panicHandler async.PanicHandler, ) *Handler
func (*Handler) CancelAndWait ¶
func (t *Handler) CancelAndWait()
func (*Handler) Execute ¶
func (t *Handler) Execute( syncReporter Reporter, labels LabelMap, updateApplier UpdateApplier, messageBuilder MessageBuilder, coolDown time.Duration, )
func (*Handler) OnSyncFinishedCH ¶
type Job ¶
type Job struct {
// contains filtered or unexported fields
}
Job represents a unit of work that will travel down the sync pipeline. The job will be split up into child jobs for each batch. The parent job (this) will then wait until all the children have finished executing. Execution can terminate by either: * Completing the pipeline successfully * Context Cancellation * Errors On error, or context cancellation all child jobs are cancelled.
func NewJob ¶
func NewJob(ctx context.Context, client APIClient, userID string, labels LabelMap, messageBuilder MessageBuilder, updateApplier UpdateApplier, syncReporter Reporter, state StateProvider, panicHandler async.PanicHandler, cache *DownloadCache, log *logrus.Entry, ) *Job
type JobWaiterMessage ¶ added in v3.8.0
type JobWaiterMessage int
const ( JobWaiterMessageCreated JobWaiterMessage = iota JobWaiterMessageFinished )
type MessageBuilder ¶
type MetadataStage ¶
type MetadataStage struct {
// contains filtered or unexported fields
}
MetadataStage is responsible for the throttling the sync pipeline by only allowing `MetadataMaxMessages` or up to maximum allowed memory usage messages to go through the pipeline. It is also responsible for interleaving different sync jobs so all jobs can progress and finish.
func NewMetadataStage ¶
func NewMetadataStage( input MetadataStageInput, output MetadataStageOutput, maxDownloadMem uint64, panicHandler async.PanicHandler, ) *MetadataStage
func (*MetadataStage) Run ¶
func (m *MetadataStage) Run(group *async.Group)
type MetadataStageInput ¶
type MetadataStageInput = StageInputConsumer[*Job]
type MetadataStageOutput ¶
type MetadataStageOutput = StageOutputProducer[DownloadRequest]
type Regulator ¶
Regulator is an abstraction for the sync service, since it regulates the number of concurrent sync activities.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service which mediates IMAP syncing in Bridge. IMPORTANT: Be sure to cancel all ongoing sync Handlers before cancelling this service's Group.
func NewService ¶
func NewService( panicHandler async.PanicHandler, observabilitySender observability.Sender, ) *Service
type StageInputConsumer ¶
type StageOutputProducer ¶
type StateProvider ¶
type StateProvider interface { AddFailedMessageID(context.Context, ...string) error RemFailedMessageID(context.Context, ...string) error GetSyncStatus(context.Context) (Status, error) ClearSyncStatus(context.Context) error SetHasLabels(context.Context, bool) error SetHasMessages(context.Context, bool) error SetLastMessageID(context.Context, string, int64) error SetMessageCount(context.Context, int64) error }
type Status ¶
type Status struct { HasLabels bool HasMessages bool HasMessageCount bool FailedMessages xmaps.Set[string] LastSyncedMessageID string NumSyncedMessages int64 TotalMessageCount int64 }
func DefaultStatus ¶
func DefaultStatus() Status