syncservice

package
v3.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2024 License: GPL-3.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultRetryCoolDown = 20 * time.Second
View Source
const Gigabyte = 1024 * Megabyte
View Source
const Kilobyte = uint64(1024)
View Source
const Megabyte = 1024 * Kilobyte
View Source
const MetadataMaxMessages = 64
View Source
const MetadataPageSize = 128
View Source
const NumSyncStages = 4

Variables

View Source
var ErrNoMoreInput = errors.New("no more input")

Functions

This section is empty.

Types

type APIClient

type APIClient interface {
	GetGroupedMessageCount(ctx context.Context) ([]proton.MessageGroupCount, error)
	GetLabels(ctx context.Context, labelTypes ...proton.LabelType) ([]proton.Label, error)
	GetMessage(ctx context.Context, messageID string) (proton.Message, error)
	GetMessageMetadataPage(ctx context.Context, page, pageSize int, filter proton.MessageFilter) ([]proton.MessageMetadata, error)
	GetFullMessage(ctx context.Context, messageID string, scheduler proton.Scheduler, storageProvider proton.AttachmentAllocator) (proton.FullMessage, error)
	GetAttachmentInto(ctx context.Context, attachmentID string, reader io.ReaderFrom) error
	GetAttachment(ctx context.Context, attachmentID string) ([]byte, error)
}

type ApplyRequest

type ApplyRequest struct {
	// contains filtered or unexported fields
}

type ApplyStage

type ApplyStage struct {
	// contains filtered or unexported fields
}

ApplyStage applies the sync updates and waits for their completion before proceeding with the next batch. This is the final stage in the sync pipeline.

func NewApplyStage

func NewApplyStage(input ApplyStageInput) *ApplyStage

func (*ApplyStage) Run

func (a *ApplyStage) Run(group *async.Group)

type ApplyStageInput

type ApplyStageInput = StageInputConsumer[ApplyRequest]

type BuildRequest

type BuildRequest struct {
	// contains filtered or unexported fields
}

type BuildResult

type BuildResult struct {
	AddressID string
	MessageID string
	Update    *imap.MessageCreated
}

type BuildStage

type BuildStage struct {
	// contains filtered or unexported fields
}

BuildStage is in charge of decrypting and converting the downloaded messages from the previous stage into RFC822 compliant messages which can then be sent to the IMAP server.

func NewBuildStage

func NewBuildStage(
	input BuildStageInput,
	output BuildStageOutput,
	maxBuildMem uint64,
	panicHandler async.PanicHandler,
	observabilitySender observability.Sender,
) *BuildStage

func (*BuildStage) Run

func (b *BuildStage) Run(group *async.Group)

type BuildStageInput

type BuildStageInput = StageInputConsumer[BuildRequest]

type BuildStageOutput

type BuildStageOutput = StageOutputProducer[ApplyRequest]

type ChannelConsumerProducer

type ChannelConsumerProducer[T any] struct {
	// contains filtered or unexported fields
}

func NewChannelConsumerProducer

func NewChannelConsumerProducer[T any]() *ChannelConsumerProducer[T]

func (ChannelConsumerProducer[T]) Close

func (c ChannelConsumerProducer[T]) Close()

func (ChannelConsumerProducer[T]) Consume

func (c ChannelConsumerProducer[T]) Consume(ctx context.Context) (T, error)

func (ChannelConsumerProducer[T]) Produce

func (c ChannelConsumerProducer[T]) Produce(ctx context.Context, value T) error

type DefaultDownloadRateModifier

type DefaultDownloadRateModifier struct{}

func (DefaultDownloadRateModifier) Apply

func (d DefaultDownloadRateModifier) Apply(wasSuccess bool, current int, max int) int

type DownloadCache

type DownloadCache struct {
	// contains filtered or unexported fields
}

func (*DownloadCache) Clear

func (s *DownloadCache) Clear()

func (*DownloadCache) Count

func (s *DownloadCache) Count() (int, int)

func (*DownloadCache) DeleteAttachments

func (s *DownloadCache) DeleteAttachments(id ...string)

func (*DownloadCache) DeleteMessages

func (s *DownloadCache) DeleteMessages(id ...string)

func (*DownloadCache) GetAttachment

func (s *DownloadCache) GetAttachment(id string) ([]byte, bool)

func (*DownloadCache) GetMessage

func (s *DownloadCache) GetMessage(id string) (proton.Message, bool)

func (*DownloadCache) StoreAttachment

func (s *DownloadCache) StoreAttachment(id string, data []byte)

func (*DownloadCache) StoreMessage

func (s *DownloadCache) StoreMessage(message proton.Message)

type DownloadRateModifier

type DownloadRateModifier interface {
	Apply(wasSuccess bool, current int, max int) int
}

type DownloadRequest

type DownloadRequest struct {
	// contains filtered or unexported fields
}

type DownloadStage

type DownloadStage struct {
	// contains filtered or unexported fields
}

DownloadStage downloads the messages and attachments. It auto-throttles the download of the messages based on whether we run into 429|5xx codes.

func NewDownloadStage

func NewDownloadStage(
	input DownloadStageInput,
	output DownloadStageOutput,
	maxParallelDownloads int,
	panicHandler async.PanicHandler,
) *DownloadStage

func (*DownloadStage) Run

func (d *DownloadStage) Run(group *async.Group)

type DownloadStageInput

type DownloadStageInput = StageInputConsumer[DownloadRequest]

type DownloadStageOutput

type DownloadStageOutput = StageOutputProducer[BuildRequest]

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler is the interface from which we control the syncing of the IMAP data. One instance should be created for each user and used for every subsequent sync request.

func NewHandler

func NewHandler(
	regulator Regulator,
	client APIClient,
	userID string,
	state StateProvider,
	log *logrus.Entry,
	panicHandler async.PanicHandler,
) *Handler

func (*Handler) Cancel

func (t *Handler) Cancel()

func (*Handler) CancelAndWait

func (t *Handler) CancelAndWait()

func (*Handler) Close

func (t *Handler) Close()

func (*Handler) Execute

func (t *Handler) Execute(
	syncReporter Reporter,
	labels LabelMap,
	updateApplier UpdateApplier,
	messageBuilder MessageBuilder,
	coolDown time.Duration,
)

func (*Handler) OnSyncFinishedCH

func (t *Handler) OnSyncFinishedCH() <-chan error

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job represents a unit of work that will travel down the sync pipeline. The job will be split up into child jobs for each batch. The parent job (this) will then wait until all the children have finished executing. Execution can terminate by either: * Completing the pipeline successfully * Context Cancellation * Errors On error, or context cancellation all child jobs are cancelled.

func NewJob

func NewJob(ctx context.Context,
	client APIClient,
	userID string,
	labels LabelMap,
	messageBuilder MessageBuilder,
	updateApplier UpdateApplier,
	syncReporter Reporter,
	state StateProvider,
	panicHandler async.PanicHandler,
	cache *DownloadCache,
	log *logrus.Entry,
) *Job

type JobWaiterMessage added in v3.8.0

type JobWaiterMessage int
const (
	JobWaiterMessageCreated JobWaiterMessage = iota
	JobWaiterMessageFinished
)

type LabelMap

type LabelMap = map[string]proton.Label

type MessageBuilder

type MessageBuilder interface {
	WithKeys(f func(*crypto.KeyRing, map[string]*crypto.KeyRing) error) error
	BuildMessage(apiLabels map[string]proton.Label, full proton.FullMessage, addrKR *crypto.KeyRing, buffer *bytes.Buffer) (BuildResult, error)
}

type MetadataStage

type MetadataStage struct {
	// contains filtered or unexported fields
}

MetadataStage is responsible for the throttling the sync pipeline by only allowing `MetadataMaxMessages` or up to maximum allowed memory usage messages to go through the pipeline. It is also responsible for interleaving different sync jobs so all jobs can progress and finish.

func NewMetadataStage

func NewMetadataStage(
	input MetadataStageInput,
	output MetadataStageOutput,
	maxDownloadMem uint64,
	panicHandler async.PanicHandler,
) *MetadataStage

func (*MetadataStage) Run

func (m *MetadataStage) Run(group *async.Group)

type MetadataStageInput

type MetadataStageInput = StageInputConsumer[*Job]

type MetadataStageOutput

type MetadataStageOutput = StageOutputProducer[DownloadRequest]

type Regulator

type Regulator interface {
	Sync(ctx context.Context, stage *Job) error
}

Regulator is an abstraction for the sync service, since it regulates the number of concurrent sync activities.

type Reporter

type Reporter interface {
	OnStart(ctx context.Context)
	OnFinished(ctx context.Context)
	OnError(ctx context.Context, err error)
	OnProgress(ctx context.Context, delta int64)
	InitializeProgressCounter(ctx context.Context, current int64, total int64)
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service which mediates IMAP syncing in Bridge. IMPORTANT: Be sure to cancel all ongoing sync Handlers before cancelling this service's Group.

func NewService

func NewService(
	panicHandler async.PanicHandler,
	observabilitySender observability.Sender,
) *Service

func (*Service) Close added in v3.8.0

func (s *Service) Close()

func (*Service) Run

func (s *Service) Run()

func (*Service) Sync

func (s *Service) Sync(ctx context.Context, stage *Job) error

type StageInputConsumer

type StageInputConsumer[T any] interface {
	Consume(ctx context.Context) (T, error)
}

type StageOutputProducer

type StageOutputProducer[T any] interface {
	Produce(ctx context.Context, value T) error
	Close()
}

type StateProvider

type StateProvider interface {
	AddFailedMessageID(context.Context, ...string) error
	RemFailedMessageID(context.Context, ...string) error
	GetSyncStatus(context.Context) (Status, error)
	ClearSyncStatus(context.Context) error
	SetHasLabels(context.Context, bool) error
	SetHasMessages(context.Context, bool) error
	SetLastMessageID(context.Context, string, int64) error
	SetMessageCount(context.Context, int64) error
}

type Status

type Status struct {
	HasLabels           bool
	HasMessages         bool
	HasMessageCount     bool
	FailedMessages      xmaps.Set[string]
	LastSyncedMessageID string
	NumSyncedMessages   int64
	TotalMessageCount   int64
}

func DefaultStatus

func DefaultStatus() Status

func (Status) InProgress added in v3.7.1

func (s Status) InProgress() bool

func (Status) IsComplete

func (s Status) IsComplete() bool

type UpdateApplier

type UpdateApplier interface {
	ApplySyncUpdates(ctx context.Context, updates []BuildResult) error
	SyncSystemLabelsOnly(ctx context.Context, labels map[string]proton.Label) error
	SyncLabels(ctx context.Context, labels map[string]proton.Label) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL