fxfer

package module
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2025 License: MIT Imports: 18 Imported by: 0

README

fxfer

Go Reference Go Report Card GitHub release License Codecov


Table of Contents

Motivation

Imagine managing files and objects stored across various systems, whether it's a NAS, your local machine, or a cloud storage service like S3, GCS, or Azure Blob Storage. We refer to these as "source storage." Now, suppose you need to seamlessly transfer these files to another storage system, which we call "destination storage", regardless of the unreliability of the network or the large size of the files. How would you go about doing this?

This library is designed to make such transfers effortless, resumable, and reliable. Built with extensibility in mind, it allows you to easily add support for new source or destination storage, empowering you to handle diverse storage solutions with ease.

Installation

To install fxfer, use go get:

go get github.com/derektruong/fxfer

Next include fxfer in your application.

import "github.com/derektruong/fxfer"

Concepts

Storage

Place where files are processed and transferred. There are two types of storage:

  • Source Storage: The storage where your files are stored, so you SHOULD have read access to this storage.
  • Destination Storage: The storage where you want to transfer your files to, so you SHOULD have both read and write access to this storage.
Client

Between the source storage and the destination storage, the protocol is used to transfer the files. The protocol could be FTP, SFTP, S3, ...

Transfer file

The file that persists the information about the file you want to transfer. It contains the file's name, size, and other metadata, additional information. This file SHOULD be put on Destination Storage when starting a transfer process.

Features

  • 📂 Transfer files seamlessly between source storage and destination storage.
  • 🌐 Supports a variety of storages: local filesystem, (S)FTP server, and cloud platforms (e.g., S3, ...).
  • 🛠️ Easily extensible to include new storage types.
  • 🚀 Optimized for performance and reliability.
  • 📶 All connections are employed connection pooling to reduce latency.
  • ⏭️ Supports resuming interrupted transfers (ideal for large files or unreliable connections).
  • 🖲️ Support tracking the transfer progress.

Roadmap

Support storages
  • Support local filesystem storage.
  • Support S3 storage, S3-compatible storage (e.g., MinIO, Ceph, Storj, ...).
  • Support FTP, SFTP server storage.
  • Support Azure Blob Storage.
  • Support Google Cloud Storage.
Resilience and performance
  • Support resuming interrupted transfers.
  • Support transfer progress tracking.
  • Support exponential backoff for retrying failed transfers automatically.
  • Support checksum verification.
  • Support compression during transfer.

Usage

Real-world examples

Please refer to the examples directory for detailed usage instructions.

Quick Start
func main() {
	// setup clients
	srcClient := s3protoc.NewClient(s3Endpoint, s3Bucket, s3Region, s3AccessKey, s3SecretKey)
	destClient := ftpprotoc.NewClient(ftpHost, ftpPort, ftpUser, ftpPassword)

	// setup storages
	srcStorage := s3.NewSource(logger)
	destStorage := ftp.NewDestination(logger)

	transfer := fxfer.NewTransfer(logger, fxfer.WithMaxFileSize(5<<40)) // 5TB
	err = transfer.Transfer(
		ctx,
		fxfer.SourceConfig{
			FilePath: "path/to/source/file",
			Storage:  srcStorage,
			Client:   srcClient,
		},
		fxfer.DestinationConfig{
			FilePath: "path/to/destination/file",
			Storage:  destStorage,
			Client:   destClient,
		},
		examples.HandleProgressUpdated(logger),
	)
	if err != nil {
		log.Fatal(err)
	}
}
Taskfile

Note: Put .env file in the examples with corresponding credentials for each storage type.

If you have Taskfile installed, you can run the examples very easily. See ./Taskfile.yaml for more details.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMaxFileSizeExceeded = func(required, got int64) error {
		return fmt.Errorf("file size exceeds the maximum allowed size: %d > %d bytes", got, required)
	}
	ErrMinFileSizeNotMet = func(required, got int64) error {
		return fmt.Errorf("file size does not meet the minimum required size: %d < %d bytes", got, required)
	}
	ErrExtensionNotAllowed = func(ext string) error {
		return fmt.Errorf("file extension is not allowed: %s", ext)
	}
	ErrExtensionBlocked = func(ext string) error {
		return fmt.Errorf("file extension is blocked: %s", ext)
	}
	ErrModifiedBefore = func(t time.Time) error {
		return fmt.Errorf("file was modified before the required time: %s", t.Format(time.RFC3339))
	}
	ErrModifiedAfter = func(t time.Time) error {
		return fmt.Errorf("file was modified after the required time: %s", t.Format(time.RFC3339))
	}
	ErrFileNamePatternMismatch = func(pattern string) error {
		return fmt.Errorf("file name does not match the required pattern: %s", pattern)
	}
)

Functions

This section is empty.

Types

type ChecksumAlgorithm

type ChecksumAlgorithm int

ChecksumAlgorithm defines the supported checksum algorithms for file transfer validation.

const (
	// NoneChecksumAlgorithm is the default checksum algorithm.
	NoneChecksumAlgorithm ChecksumAlgorithm = iota
	// ChecksumAlgorithmCRC32 is the cyclic redundancy check (CRC32) checksum algorithm.
	// It is the default checksum algorithm, suitable for large files (> 1GB, fast, less secure).
	ChecksumAlgorithmCRC32
	// ChecksumAlgorithmMD5 is the message-digest algorithm 5 (MD5) checksum algorithm.
	// It is suitable for general files (< 1GB, moderate speed, more secure)
	ChecksumAlgorithmMD5
	// ChecksumAlgorithmSHA256 is the secure hash algorithm 256 (SHA-256) checksum algorithm.
	// It is suitable for sensitive files (slow, most secure)
	ChecksumAlgorithmSHA256
)

type DestinationConfig added in v0.6.0

type DestinationConfig struct {
	// FilePath is the path of the destination file
	FilePath string `json:"filePath" yaml:"filePath" validate:"required"`
	// Storage: see storage.Destination
	Storage storage.Destination `json:"storage" yaml:"storage" validate:"required"`
	// Client: see protoc.Client
	Client protoc.Client `json:"client" yaml:"client" validate:"required"`
}

DestinationConfig represents the destination file to transfer. It contains the file path, storage, and client.

func (DestinationConfig) Validate added in v0.6.0

func (dest DestinationConfig) Validate(ctx context.Context) error

type Progress

type Progress struct {
	// Status is the status of the progress
	Status ProgressStatus

	// TotalSize is the total number of bytes that need to be transferred
	TotalSize int64

	// TransferredSize is the number of bytes that have been transferred
	TransferredSize int64

	// Percentage is the percentage of the transfer that has been completed
	Percentage int

	// Speed is the speed of the transfer in bytes per second
	Speed int64

	// Duration is the duration of the transfer
	Duration time.Duration

	// Error is the error that occurred during the transfer (when Status is ProgressStatusInError)
	Error error

	// StartAt is the time when the transfer started
	StartAt time.Time

	// FinishAt is the time when the transfer finished
	FinishAt time.Time
}

Progress is a struct that contains information about the progress

type ProgressStatus

type ProgressStatus int

ProgressStatus is an enum that represents the status of the progress

const (
	// ProgressStatusInProgress is the status of the progress when the transfer is in progress
	ProgressStatusInProgress ProgressStatus = iota
	// ProgressStatusFinalizing is the status of the progress when the transfer is finalizing
	ProgressStatusFinalizing
	// ProgressStatusFinished is the status of the progress when the transfer is finished
	ProgressStatusFinished
	// ProgressStatusInError is the status of the progress when the transfer is in error
	ProgressStatusInError
)

type ProgressUpdatedCallback

type ProgressUpdatedCallback func(progress Progress)

ProgressUpdatedCallback is a function that is called when the progress of a transfer is updated.

type RetryConfig

type RetryConfig struct {
	// MaxRetryAttempts is the maximum number of retry attempts, default = 5.
	MaxRetryAttempts int
	// InitialDelay is the initial delay before the first retry, default = 1 second.
	InitialDelay time.Duration
	// MaxDelay is the maximum delay between retries, default = 30 seconds.
	MaxDelay time.Duration
}

RetryConfig defines the retry configuration for the transfer.

type SourceConfig added in v0.6.0

type SourceConfig struct {
	// FilePath is the path of the source file
	FilePath string `json:"filePath" yaml:"filePath" validate:"required"`
	// Storage: see storage.Source
	Storage storage.Source `json:"storage" yaml:"storage" validate:"required"`
	// Client: see protoc.Client
	Client protoc.Client `json:"client" yaml:"client" validate:"required"`
}

SourceConfig represents the source file to transfer. It contains the file path, storage, and client.

func (SourceConfig) Validate added in v0.6.0

func (src SourceConfig) Validate(ctx context.Context) error

type Transfer added in v0.6.0

type Transfer interface {
	// Transfer handles the transfer of a file from a source to a destination,
	// ensuring validation, sanitization, and resumption of incomplete transfers.
	// It supports progress updates via a callback function.
	//
	// Parameters:
	//   - ctx: the context for managing the transfer lifecycle.
	//   - src: see SourceConfig for more details.
	//   - dest: see DestinationConfig for more details.
	//   - cb: the callback function to handle progress updates (see ProgressUpdatedCallback).
	//
	// Returns:
	//   - err: if any step in the transfer process fails, nil otherwise
	Transfer(ctx context.Context, src SourceConfig, dest DestinationConfig, cb ProgressUpdatedCallback) (err error)
}

Transfer is the interface for handling file transfers.

func NewTransfer added in v0.6.0

func NewTransfer(
	logger logr.Logger,
	options ...TransferOption,
) (t Transfer)

NewTransfer creates a new transfer with the optional TransferOption(s).

type TransferOption

type TransferOption func(*transfer)

func WithChecksumAlgorithm

func WithChecksumAlgorithm(algorithm ChecksumAlgorithm) TransferOption

WithChecksumAlgorithm sets the checksum algorithm for the transfer. It is recommended to use the default checksum algorithm (CRC32) unless there is a specific requirement for a different algorithm.

func WithDisabledRetry

func WithDisabledRetry() TransferOption

WithDisabledRetry disables the retry mechanism for the transfer. Default is false (enabled). If disabled, the transfer will not retry failed transfers, regardless of setting WithRetryConfig option.

func WithExtensionBlacklist

func WithExtensionBlacklist(extensions ...string) TransferOption

WithExtensionBlacklist sets the list of blocked file extensions for transfer. Default is empty (no restriction).

func WithExtensionWhitelist

func WithExtensionWhitelist(extensions ...string) TransferOption

WithExtensionWhitelist sets the list of allowed file extensions for transfer. Default is empty (no restriction).

func WithFileNamePattern

func WithFileNamePattern(pattern *regexp.Regexp) TransferOption

WithFileNamePattern sets the regular expression pattern for file names. Default is nil (no restriction).

func WithMaxFileSize

func WithMaxFileSize(size int64) TransferOption

WithMaxFileSize sets the maximum file size allowed for transfer. Default is 0 (no limit).

func WithMinFileSize

func WithMinFileSize(size int64) TransferOption

WithMinFileSize sets the minimum file size required for transfer. Default is 0 (no limit).

func WithModifiedAfter

func WithModifiedAfter(modTime time.Time) TransferOption

WithModifiedAfter sets the minimum modified time required for transfer. Default is zero (no restriction).

func WithModifiedBefore

func WithModifiedBefore(modTime time.Time) TransferOption

WithModifiedBefore sets the maximum modified time required for transfer. Default is zero (no restriction).

func WithProgressRefreshInterval

func WithProgressRefreshInterval(interval time.Duration) TransferOption

WithProgressRefreshInterval sets the interval for refreshing the progress update. Default is 1 second.

func WithRetryConfig

func WithRetryConfig(config RetryConfig) TransferOption

WithRetryConfig sets the retry configuration for the transfer. Support partial configuration, default values will be used if not set.

Directories

Path Synopsis
internal
iometer/mock
Package mock_iometer is a generated GoMock package.
Package mock_iometer is a generated GoMock package.
mock
Package mock_protoc is a generated GoMock package.
Package mock_protoc is a generated GoMock package.
s3
mock
Package mock_storage is a generated GoMock package.
Package mock_storage is a generated GoMock package.
s3
Package s3 provides a storage backend using AWS S3 or compatible servers.
Package s3 provides a storage backend using AWS S3 or compatible servers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL