fxfer

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2025 License: MIT Imports: 18 Imported by: 0

README

fxfer

Motivation

Imagine managing files and objects stored across various systems—whether it's a NAS, your local machine, or a cloud storage service like S3, GCS, or Azure Blob Storage. We refer to these as "source storage." Now, suppose you need to seamlessly transfer these files to another storage system, which we call "destination storage", regardless of the unreliability of the network or the large size of the files. How would you go about doing this?

This library is designed to make such transfers effortless, resumable, and reliable. Built with extensibility in mind, it allows you to easily add support for new source or destination storage, empowering you to handle diverse storage solutions with ease.

Concepts

Storage

Place where files are processed and transferred. There are two types of storage:

  • Source Storage: The storage where your files are stored, so you SHOULD have read access to this storage.
  • Destination Storage: The storage where you want to transfer your files to, so you SHOULD have both read and write access to this storage.
Client

Between the source storage and the destination storage, the protocol is used to transfer the files. The protocol could be FTP, SFTP, S3, ...

Transfer file

The file that persists the information about the file you want to transfer. It contains the file's name, size, and other metadata, additional information. This file SHOULD be put on Destination Storage when starting a transfer process.

Features

  • 📂 Transfer files seamlessly between source storage and destination storage.
  • 🌐 Supports a variety of storages: local filesystem, (S)FTP server, and cloud platforms (e.g., S3, ...).
  • 🛠️ Easily extensible to include new storage types.
  • 🚀 Optimized for performance and reliability.
  • 📶 All connections are employed connection pooling to reduce latency.
  • ⏭️ Supports resuming interrupted transfers (ideal for large files or unreliable connections).
  • 🖲️ Support tracking the transfer progress.

Roadmap

Support storages
  • Support local filesystem storage.
  • Support S3 storage, S3-compatible storage (e.g., MinIO, Ceph, Storj, ...).
  • Support FTP, SFTP server storage.
  • Support Azure Blob Storage.
  • Support Google Cloud Storage.
Resilience and performance
  • Support resuming interrupted transfers.
  • Support transfer progress tracking.
  • Support exponential backoff for retrying failed transfers automatically.
  • Support checksum verification.
  • Support compression during transfer.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMaxFileSizeExceeded = func(required, got int64) error {
		return fmt.Errorf("file size exceeds the maximum allowed size: %d > %d bytes", got, required)
	}
	ErrMinFileSizeNotMet = func(required, got int64) error {
		return fmt.Errorf("file size does not meet the minimum required size: %d < %d bytes", got, required)
	}
	ErrExtensionNotAllowed = func(ext string) error {
		return fmt.Errorf("file extension is not allowed: %s", ext)
	}
	ErrExtensionBlocked = func(ext string) error {
		return fmt.Errorf("file extension is blocked: %s", ext)
	}
	ErrModifiedBefore = func(t time.Time) error {
		return fmt.Errorf("file was modified before the required time: %s", t.Format(time.RFC3339))
	}
	ErrModifiedAfter = func(t time.Time) error {
		return fmt.Errorf("file was modified after the required time: %s", t.Format(time.RFC3339))
	}
	ErrFileNamePatternMismatch = func(pattern string) error {
		return fmt.Errorf("file name does not match the required pattern: %s", pattern)
	}
)

Functions

This section is empty.

Types

type ChecksumAlgorithm

type ChecksumAlgorithm int

ChecksumAlgorithm defines the supported checksum algorithms for file transfer validation.

const (
	// NoneChecksumAlgorithm is the default checksum algorithm.
	NoneChecksumAlgorithm ChecksumAlgorithm = iota
	// ChecksumAlgorithmCRC32 is the cyclic redundancy check (CRC32) checksum algorithm.
	// It is the default checksum algorithm, suitable for large files (> 1GB, fast, less secure).
	ChecksumAlgorithmCRC32
	// ChecksumAlgorithmMD5 is the message-digest algorithm 5 (MD5) checksum algorithm.
	// It is suitable for general files (< 1GB, moderate speed, more secure)
	ChecksumAlgorithmMD5
	// ChecksumAlgorithmSHA256 is the secure hash algorithm 256 (SHA-256) checksum algorithm.
	// It is suitable for sensitive files (slow, most secure)
	ChecksumAlgorithmSHA256
)

type DestinationCommand

type DestinationCommand struct {
	// FilePath is the path of the destination file
	FilePath string `json:"filePath" yaml:"filePath" validate:"required"`
	// Storage: see storage.Destination
	Storage storage.Destination `json:"storage" yaml:"storage" validate:"required"`
	// Client: see protoc.Client
	Client protoc.Client `json:"client" yaml:"client" validate:"required"`
}

DestinationCommand represents the destination file to transfer. It contains the file path, storage, and client.

func (DestinationCommand) Validate

func (dest DestinationCommand) Validate(ctx context.Context) error

type Progress

type Progress struct {
	// Status is the status of the progress
	Status ProgressStatus

	// TotalSize is the total number of bytes that need to be transferred
	TotalSize int64

	// TransferredSize is the number of bytes that have been transferred
	TransferredSize int64

	// Percentage is the percentage of the transfer that has been completed
	Percentage int

	// Speed is the speed of the transfer in bytes per second
	Speed int64

	// Duration is the duration of the transfer
	Duration time.Duration

	// Error is the error that occurred during the transfer (when Status is ProgressStatusInError)
	Error error

	// StartAt is the time when the transfer started
	StartAt time.Time

	// FinishAt is the time when the transfer finished
	FinishAt time.Time
}

Progress is a struct that contains information about the progress

type ProgressStatus

type ProgressStatus int

ProgressStatus is an enum that represents the status of the progress

const (
	// ProgressStatusInProgress is the status of the progress when the transfer is in progress
	ProgressStatusInProgress ProgressStatus = iota
	// ProgressStatusFinalizing is the status of the progress when the transfer is finalizing
	ProgressStatusFinalizing
	// ProgressStatusFinished is the status of the progress when the transfer is finished
	ProgressStatusFinished
	// ProgressStatusInError is the status of the progress when the transfer is in error
	ProgressStatusInError
)

type ProgressUpdatedCallback

type ProgressUpdatedCallback func(progress Progress)

ProgressUpdatedCallback is a function that is called when the progress of a transfer is updated.

type RetryConfig

type RetryConfig struct {
	// MaxRetryAttempts is the maximum number of retry attempts, default = 5.
	MaxRetryAttempts int
	// InitialDelay is the initial delay before the first retry, default = 1 second.
	InitialDelay time.Duration
	// MaxDelay is the maximum delay between retries, default = 30 seconds.
	MaxDelay time.Duration
}

RetryConfig defines the retry configuration for the transferer.

type SourceCommand

type SourceCommand struct {
	// FilePath is the path of the source file
	FilePath string `json:"filePath" yaml:"filePath" validate:"required"`
	// Storage: see storage.Source
	Storage storage.Source `json:"storage" yaml:"storage" validate:"required"`
	// Client: see protoc.Client
	Client protoc.Client `json:"client" yaml:"client" validate:"required"`
}

SourceCommand represents the source file to transfer. It contains the file path, storage, and client.

func (SourceCommand) Validate

func (src SourceCommand) Validate(ctx context.Context) error

type TransferOption

type TransferOption func(*transferer)

func WithChecksumAlgorithm

func WithChecksumAlgorithm(algorithm ChecksumAlgorithm) TransferOption

WithChecksumAlgorithm sets the checksum algorithm for the transferer. It is recommended to use the default checksum algorithm (CRC32) unless there is a specific requirement for a different algorithm.

func WithDisabledRetry

func WithDisabledRetry() TransferOption

WithDisabledRetry disables the retry mechanism for the transferer. Default is false (enabled). If disabled, the transferer will not retry failed transfers, regardless of setting WithRetryConfig option.

func WithExtensionBlacklist

func WithExtensionBlacklist(extensions ...string) TransferOption

WithExtensionBlacklist sets the list of blocked file extensions for transfer. Default is empty (no restriction).

func WithExtensionWhitelist

func WithExtensionWhitelist(extensions ...string) TransferOption

WithExtensionWhitelist sets the list of allowed file extensions for transfer. Default is empty (no restriction).

func WithFileNamePattern

func WithFileNamePattern(pattern *regexp.Regexp) TransferOption

WithFileNamePattern sets the regular expression pattern for file names. Default is nil (no restriction).

func WithMaxFileSize

func WithMaxFileSize(size int64) TransferOption

WithMaxFileSize sets the maximum file size allowed for transfer. Default is 0 (no limit).

func WithMinFileSize

func WithMinFileSize(size int64) TransferOption

WithMinFileSize sets the minimum file size required for transfer. Default is 0 (no limit).

func WithModifiedAfter

func WithModifiedAfter(modTime time.Time) TransferOption

WithModifiedAfter sets the minimum modified time required for transfer. Default is zero (no restriction).

func WithModifiedBefore

func WithModifiedBefore(modTime time.Time) TransferOption

WithModifiedBefore sets the maximum modified time required for transfer. Default is zero (no restriction).

func WithProgressRefreshInterval

func WithProgressRefreshInterval(interval time.Duration) TransferOption

WithProgressRefreshInterval sets the interval for refreshing the progress update. Default is 1 second.

func WithRetryConfig

func WithRetryConfig(config RetryConfig) TransferOption

WithRetryConfig sets the retry configuration for the transferer. Support partial configuration, default values will be used if not set.

type Transferer

type Transferer interface {
	// Transfer handles the transfer of a file from a source to a destination,
	// ensuring validation, sanitization, and resumption of incomplete transfers.
	// It supports progress updates via a callback function.
	//
	// Parameters:
	//   - ctx: the context for managing the transfer lifecycle.
	//   - src: see SourceCommand for more details.
	//   - dest: see DestinationCommand for more details.
	//   - cb: the callback function to handle progress updates (see ProgressUpdatedCallback).
	//
	// Returns:
	//   - err: if any step in the transfer process fails, nil otherwise
	Transfer(ctx context.Context, src SourceCommand, dest DestinationCommand, cb ProgressUpdatedCallback) (err error)
}

Transferer is the interface for handling file transfers.

func NewTransferer

func NewTransferer(
	logger logr.Logger,
	options ...TransferOption,
) (t Transferer)

NewTransferer creates a new transferer with the optional TransferOption(s).

Directories

Path Synopsis
internal
iometer/mock
Package mock_iometer is a generated GoMock package.
Package mock_iometer is a generated GoMock package.
mock
Package mock_protoc is a generated GoMock package.
Package mock_protoc is a generated GoMock package.
s3
mock
Package mock_storage is a generated GoMock package.
Package mock_storage is a generated GoMock package.
s3
Package s3 provides a storage backend using AWS S3 or compatible servers.
Package s3 provides a storage backend using AWS S3 or compatible servers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL