storage

package
v0.55.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2025 License: BSD-3-Clause Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FieldAPIRef = log.Field("item_apiref")
	FieldSize   = log.Field("item_size_num")
	FielID      = log.Field("item_id")
)

Variables

View Source
var (
	KeyBackendSync = "cdn:backend:sync"
)

Functions

func CountItemUnitsToDeleteByItemID

func CountItemUnitsToDeleteByItemID(db gorp.SqlExecutor, itemID string) (int64, error)

func CountItemsForUnit

func CountItemsForUnit(db gorp.SqlExecutor, unitID string) (int64, error)

func DeleteUnit

func DeleteUnit(m *gorpmapper.Mapper, db gorp.SqlExecutor, u *sdk.CDNUnit) error

func HasItemUnitsByUnitAndHashLocator

func HasItemUnitsByUnitAndHashLocator(db gorp.SqlExecutor, unitID string, hashLocator string, itemType sdk.CDNItemType) (bool, error)

func HashItemUnitByApiRefHash

func HashItemUnitByApiRefHash(db gorp.SqlExecutor, apiRefHash string, unitID string) (bool, error)

func InitDBMapping

func InitDBMapping(m *gorpmapper.Mapper)

func InsertUnit

InsertUnit in database.

func LoadAllItemUnitsByItemIDs

func LoadAllItemUnitsByItemIDs(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, itemID string, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItemUnit, error)

func LoadAllItemUnitsIDsByItemID

func LoadAllItemUnitsIDsByItemID(db gorp.SqlExecutor, itemID string) ([]string, error)

func LoadAllItemUnitsIDsByItemIDsAndUnitID

func LoadAllItemUnitsIDsByItemIDsAndUnitID(db gorp.SqlExecutor, unitID string, itemID []string) ([]string, error)

func LoadAllItemUnitsIDsByUnitID

func LoadAllItemUnitsIDsByUnitID(db gorp.SqlExecutor, unitID string, offset, limit int64) ([]string, error)

func LoadAllItemUnitsToDeleteByUnit

func LoadAllItemUnitsToDeleteByUnit(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, unitID string, limit int, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItemUnit, error)

func LoadAllSynchronizedItemIDs

func LoadAllSynchronizedItemIDs(db gorp.SqlExecutor, bufferUnitID string, maxStorageCount int64) ([]string, error)

func LoadAllUnits

func LoadAllUnits(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, opts ...LoadUnitOptionFunc) ([]sdk.CDNUnit, error)

LoadAllUnits loads all the units from the database

func LoadItemUnitByUnit

func LoadItemUnitByUnit(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, unitID string, itemID string, opts ...gorpmapper.GetOptionFunc) (*sdk.CDNItemUnit, error)

func LoadItemUnitsByUnit

func LoadItemUnitsByUnit(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, unitID string, size *int, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItemUnit, error)

func LoadLastItemUnitByJobUnitType

func LoadLastItemUnitByJobUnitType(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, unitID string, jobRunID int64, cdnType sdk.CDNItemType, opts ...gorpmapper.GetOptionFunc) (*sdk.CDNItemUnit, error)

func LoadLastItemUnitByRunJobIDUnitType added in v0.53.1

func LoadLastItemUnitByRunJobIDUnitType(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, unitID string, runJobID string, cdnType sdk.CDNItemType, opts ...gorpmapper.GetOptionFunc) (*sdk.CDNItemUnit, error)

func LoadUnitByID

func LoadUnitByID(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, id string) (*sdk.CDNUnit, error)

LoadUnitByID returns a unit from database for given id.

func LoadUnitByName

func LoadUnitByName(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, name string) (*sdk.CDNUnit, error)

LoadUnitByName returns a unit from database for given name.

func MarkItemUnitToDelete

func MarkItemUnitToDelete(db gorpmapper.SqlExecutorWithTx, ids []string) (int, error)

func RegisterDriver

func RegisterDriver(typ string, i Interface)

Types

type AbstractUnit

type AbstractUnit struct {
	GoRoutines *sdk.GoRoutines
	// contains filtered or unexported fields
}

func (*AbstractUnit) CanSync

func (a *AbstractUnit) CanSync() bool

func (*AbstractUnit) ExistsInDatabase

func (a *AbstractUnit) ExistsInDatabase(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, id string) (*sdk.CDNItemUnit, error)

func (*AbstractUnit) ID

func (a *AbstractUnit) ID() string

func (*AbstractUnit) Name

func (a *AbstractUnit) Name() string

func (*AbstractUnit) New

func (a *AbstractUnit) New(gorts *sdk.GoRoutines, config AbstractUnitConfig)

func (*AbstractUnit) Set

func (a *AbstractUnit) Set(u sdk.CDNUnit)

func (*AbstractUnit) SyncBandwidth

func (a *AbstractUnit) SyncBandwidth() float64

func (*AbstractUnit) SyncItemChannel

func (a *AbstractUnit) SyncItemChannel() chan string

type AbstractUnitConfig

type AbstractUnitConfig struct {
	// contains filtered or unexported fields
}

type BufferConfiguration

type BufferConfiguration struct {
	Redis      *sdk.RedisConf            `toml:"redis" json:"redis" mapstructure:"redis"`
	Local      *LocalBufferConfiguration `toml:"local" json:"local" mapstructure:"local"`
	Nfs        *NFSBufferConfiguration   `toml:"nfs" json:"nfs,omitempty" mapstructure:"nfs"`
	BufferType CDNBufferType             `toml:"bufferType" json:"bufferType" comment:"it can be 'log' to receive logs or 'file' to receive artifacts"`
}

type BufferUnit

type BufferUnit interface {
	Interface
	Unit
	Init(ctx context.Context, cfg interface{}, bufferType CDNBufferType) error
	Size(i sdk.CDNItemUnit) (int64, error)
	Read(i sdk.CDNItemUnit, r io.Reader, w io.Writer) error
	BufferType() CDNBufferType
}

type CDNBufferType

type CDNBufferType string
const (
	CDNBufferTypeLog  CDNBufferType = "log"
	CDNBufferTypeFile CDNBufferType = "file"
)

type Configuration

type Configuration struct {
	HashLocatorSalt string                          `toml:"hashLocatorSalt" json:"hash_locator_salt" mapstructure:"hashLocatorSalt"`
	Buffers         map[string]BufferConfiguration  `toml:"buffers" json:"buffers" mapstructure:"buffers"`
	Storages        map[string]StorageConfiguration `toml:"storages" json:"storages" mapstructure:"storages"`
	SyncSeconds     int                             `` /* 150-byte string literal not displayed */
	SyncNbElements  int64                           `toml:"syncNbElements" default:"100" json:"syncNbElements" comment:"nb items to synchronize from the buffer"`
	PurgeSeconds    int                             `` /* 163-byte string literal not displayed */
	PurgeNbElements int                             `toml:"purgeNbElements" default:"1000" json:"purgeNbElements" comment:"nb items to delete in each purge loop"`
}

type FileBufferUnit

type FileBufferUnit interface {
	BufferUnit
	NewWriter(ctx context.Context, i sdk.CDNItemUnit) (io.WriteCloser, error)
	Write(i sdk.CDNItemUnit, r io.Reader, w io.Writer) error
}

type Interface

type Interface interface {
	Name() string
	ID() string
	New(gorts *sdk.GoRoutines, config AbstractUnitConfig)
	Set(u sdk.CDNUnit)
	ItemExists(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, i sdk.CDNItem) (bool, error)
	Status(ctx context.Context) []sdk.MonitoringStatusLine
	SyncBandwidth() float64
	Remove(ctx context.Context, i sdk.CDNItemUnit) error
}

func GetDriver

func GetDriver(typ string) Interface

type ItemToSync

type ItemToSync struct {
	ItemID  string    `db:"id"`
	Created time.Time `db:"created"`
}

func LoadAllItemIDUnknownByUnit

func LoadAllItemIDUnknownByUnit(db gorp.SqlExecutor, unitID string, offset int64, limit int64) ([]ItemToSync, error)

type LoadUnitOptionFunc

type LoadUnitOptionFunc func(context.Context, gorp.SqlExecutor, ...*unitDB) error

type LocalBufferConfiguration

type LocalBufferConfiguration struct {
	Path       string                 `toml:"path" json:"path"`
	Encryption []*keyloader.KeyConfig `toml:"encryption" json:"-" mapstructure:"encryption"`
}

type LocalStorageConfiguration

type LocalStorageConfiguration struct {
	Path       string                                  `toml:"path" json:"path"`
	Encryption []convergent.ConvergentEncryptionConfig `toml:"encryption" json:"-" mapstructure:"encryption"`
}

type LogBufferUnit

type LogBufferUnit interface {
	BufferUnit
	Add(i sdk.CDNItemUnit, score uint, since uint, value string) error
	Card(i sdk.CDNItemUnit) (int, error)
	NewAdvancedReader(ctx context.Context, i sdk.CDNItemUnit, format sdk.CDNReaderFormat, from int64, size uint, sort int64) (io.ReadCloser, error)
	Keys() ([]string, error)
	Copy(ctx context.Context, srcItemID, destItemID string) error
}

type LogConfig

type LogConfig struct {
	// Step logs
	StepMaxSize        int64 `toml:"stepMaxSize" default:"15728640" comment:"Max step logs size in bytes (default: 15MB)" json:"stepMaxSize"`
	StepLinesRateLimit int64 `` /* 126-byte string literal not displayed */
}

type NFSBufferConfiguration

type NFSBufferConfiguration struct {
	Host            string                 `toml:"host" json:"host"`
	TargetPartition string                 `toml:"targetPartition" json:"targetPartition"`
	UserID          uint32                 `toml:"userID" json:"userID"`
	GroupID         uint32                 `toml:"groupID" json:"groupID"`
	Encryption      []*keyloader.KeyConfig `toml:"encryption" json:"-" mapstructure:"encryption"`
}

type RunningStorageUnits

type RunningStorageUnits struct {
	Buffers  []BufferUnit
	Storages []StorageUnit
	// contains filtered or unexported fields
}

func Init

func (*RunningStorageUnits) CanSync

func (x *RunningStorageUnits) CanSync(unitID string) bool

func (RunningStorageUnits) FileBuffer

func (x RunningStorageUnits) FileBuffer() FileBufferUnit

func (*RunningStorageUnits) FillSyncItemChannel

func (x *RunningStorageUnits) FillSyncItemChannel(ctx context.Context, s StorageUnit, nbItem int64) error

func (*RunningStorageUnits) FillWithUnknownItems

func (x *RunningStorageUnits) FillWithUnknownItems(ctx context.Context, s StorageUnit, maxItemByLoop int64) error

func (*RunningStorageUnits) FilterItemUnitFromBuffer

func (x *RunningStorageUnits) FilterItemUnitFromBuffer(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit

func (*RunningStorageUnits) FilterItemUnitReaderByType

func (x *RunningStorageUnits) FilterItemUnitReaderByType(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit

func (*RunningStorageUnits) FilterNotSyncBackend

func (x *RunningStorageUnits) FilterNotSyncBackend(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit

func (RunningStorageUnits) GetBuffer

func (x RunningStorageUnits) GetBuffer(bufferType sdk.CDNItemType) BufferUnit

func (RunningStorageUnits) GetItemUnitByLocatorByUnit

func (r RunningStorageUnits) GetItemUnitByLocatorByUnit(locator string, unitID string, itemType sdk.CDNItemType) (bool, error)

func (RunningStorageUnits) GetSource

func (r RunningStorageUnits) GetSource(ctx context.Context, i *sdk.CDNItem) (Source, error)

func (RunningStorageUnits) HashLocator

func (x RunningStorageUnits) HashLocator(loc string) string

func (*RunningStorageUnits) IsBuffer

func (x *RunningStorageUnits) IsBuffer(id string) bool

func (RunningStorageUnits) LogsBuffer

func (x RunningStorageUnits) LogsBuffer() LogBufferUnit

func (*RunningStorageUnits) NewItemUnit

func (RunningStorageUnits) NewSource

func (r RunningStorageUnits) NewSource(ctx context.Context, refItemUnit sdk.CDNItemUnit) (Source, error)

func (*RunningStorageUnits) Purge

func (*RunningStorageUnits) PushInSyncQueue

func (r *RunningStorageUnits) PushInSyncQueue(ctx context.Context, itemID string, created time.Time)

func (*RunningStorageUnits) RemoveFromRedisSyncQueue

func (r *RunningStorageUnits) RemoveFromRedisSyncQueue(ctx context.Context, s StorageUnit, id string)

func (*RunningStorageUnits) Start

func (r *RunningStorageUnits) Start(ctx context.Context, gorts *sdk.GoRoutines)

func (RunningStorageUnits) Storage

func (r RunningStorageUnits) Storage(name string) StorageUnit

func (*RunningStorageUnits) SyncBuffer

func (r *RunningStorageUnits) SyncBuffer(ctx context.Context)

type S3StorageConfiguration

type S3StorageConfiguration struct {
	BucketName          string                                  `toml:"bucketName" json:"bucketName" comment:"Name of the S3 bucket to use when storing artifacts"`
	Region              string                                  `toml:"region" json:"region" default:"us-east-1" comment:"The AWS region"`
	Prefix              string                                  `` /* 135-byte string literal not displayed */
	AuthFromEnvironment bool                                    `` /* 146-byte string literal not displayed */
	SharedCredsFile     string                                  `toml:"sharedCredsFile" json:"sharedCredsFile" comment:"The path for the AWS credential file, used with profile"`
	Profile             string                                  `toml:"profile" json:"profile" comment:"The profile within the AWS credentials file to use"`
	AccessKeyID         string                                  `toml:"accessKeyId" json:"accessKeyId" comment:"A static AWS Secret Key ID"`
	SecretAccessKey     string                                  `toml:"secretAccessKey" json:"-" comment:"A static AWS Secret Access Key"`
	SessionToken        string                                  `toml:"sessionToken" json:"-" comment:"A static AWS session token"`
	Endpoint            string                                  `toml:"endpoint" json:"endpoint" comment:"S3 API Endpoint (optional)" commented:"true"` //optional
	DisableSSL          bool                                    `toml:"disableSSL" json:"disableSSL" commented:"true"`                                  //optional
	ForcePathStyle      bool                                    `toml:"forcePathStyle" json:"forcePathStyle" commented:"true"`                          //optional
	Encryption          []convergent.ConvergentEncryptionConfig `toml:"encryption" json:"-" mapstructure:"encryption"`
}

type Source

type Source interface {
	NewReader(context.Context) (io.ReadCloser, error)
	Read(io.Reader, io.Writer) error
	Name() string
	SyncBandwidth() float64
}

type Stat

type Stat struct {
	StorageName string `db:"storage_name"`
	Type        string `db:"type"`
	Number      int64  `db:"number"`
}

func CountItemUnitToDelete

func CountItemUnitToDelete(db gorp.SqlExecutor) (res []Stat, err error)

func CountItemsForUnitByType

func CountItemsForUnitByType(db gorp.SqlExecutor, unitID, stype string) (res []Stat, err error)

type StorageConfiguration

type StorageConfiguration struct {
	SyncParallel  int64                       `toml:"syncParallel" json:"sync_parallel" comment:"number of parallel sync processes"`
	SyncBandwidth int64                       `toml:"syncBandwidth" json:"sync_bandwidth" comment:"global bandwith shared by the sync processes (in Mb)"`
	DisableSync   bool                        `toml:"disableSync" json:"disable_sync" comment:"flag to disabled backend synchronization"`
	Local         *LocalStorageConfiguration  `toml:"local" json:"local,omitempty" mapstructure:"local"`
	Swift         *SwiftStorageConfiguration  `toml:"swift" json:"swift,omitempty" mapstructure:"swift"`
	Webdav        *WebdavStorageConfiguration `toml:"webdav" json:"webdav,omitempty" mapstructure:"webdav"`
	S3            *S3StorageConfiguration     `toml:"s3" json:"s3,omitempty" mapstructure:"s3"`
}

type StorageUnit

type StorageUnit interface {
	Interface
	Unit
	Init(ctx context.Context, cfg interface{}) error
	SyncItemChannel() chan string
	NewWriter(ctx context.Context, i sdk.CDNItemUnit) (io.WriteCloser, error)
	Write(i sdk.CDNItemUnit, r io.Reader, w io.Writer) error
	CanSync() bool
}

type StorageUnitWithLocator

type StorageUnitWithLocator interface {
	StorageUnit
	NewLocator(s string) (string, error)
}

type SwiftStorageConfiguration

type SwiftStorageConfiguration struct {
	Address         string                                  `toml:"address" json:"address"`
	Username        string                                  `toml:"username" json:"username"`
	Password        string                                  `toml:"password" json:"-"`
	Tenant          string                                  `toml:"tenant" json:"tenant"`
	Domain          string                                  `toml:"domain" json:"domain"`
	Region          string                                  `toml:"region" json:"region"`
	ContainerPrefix string                                  `toml:"container_prefix" json:"container_prefix"`
	Encryption      []convergent.ConvergentEncryptionConfig `toml:"encryption" json:"-" mapstructure:"encryption"`
}

type Unit

type Unit interface {
	Read(i sdk.CDNItemUnit, r io.Reader, w io.Writer) error
	NewReader(ctx context.Context, i sdk.CDNItemUnit) (io.ReadCloser, error)
	GetDriverName() string
	ResyncWithDatabase(ctx context.Context, db gorp.SqlExecutor, t sdk.CDNItemType, dryRun bool)
}

type WebdavStorageConfiguration

type WebdavStorageConfiguration struct {
	Address    string                                  `toml:"address" json:"address"`
	Username   string                                  `toml:"username" json:"username"`
	Password   string                                  `toml:"password" json:"password"`
	Path       string                                  `toml:"path" json:"path"`
	Encryption []convergent.ConvergentEncryptionConfig `toml:"encryption" json:"-" mapstructure:"encryption"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL