drivers

package
v0.5.33 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2022 License: MIT Imports: 42 Imported by: 0

Documentation

Overview

Package drivers abstracts different object storages, such as local, s3

Index

Constants

View Source
const (
	// S3_POLICY_EXPIRE_IN_HOURS how long access rights given to other node will be valid
	S3_POLICY_EXPIRE_IN_HOURS = 24
)

Variables

View Source
var ErrNoNextPage = fmt.Errorf("no next page")

ErrNoNextPage indicates that there is no next page in ListFiles

View Source
var FailSaveBucketName string

FailSaveBucketName name of the bucket to save FV faild segments to

View Source
var TestMemoryStorages map[string]*MemoryOS

TestMemoryStorages used for testing purposes

View Source
var Testing bool

Testing indicates that test is running

Functions

func FailSaveEnabled added in v0.5.33

func FailSaveEnabled() bool

should be saved to GS

func GetSegmentData

func GetSegmentData(ctx context.Context, uri string) ([]byte, error)

func PrepareOSURL added in v0.5.13

func PrepareOSURL(input string) (string, error)

PrepareOSURL used for resolving files when necessary and turning into a URL. Don't use this when the URL comes from untrusted sources e.g. AuthWebhookUrl.

func Save2GS added in v0.5.33

func Save2GS(fileName string, data []byte) (string, error)

Save2GS saves data to Google Cloud Storage

func SaveFile2GS added in v0.5.33

func SaveFile2GS(inpFileName, targetFileName string) (string, error)

SaveFile2GS saves file to Google Cloud Storage

func SavePairData2GS added in v0.5.33

func SavePairData2GS(trusturi string, data1 []byte, untrusturi string, data2 []byte, suffix string, src []byte) error

func SaveRetried added in v0.5.13

func SaveRetried(ctx context.Context, sess OSSession, name string, data []byte, meta map[string]string, retryCount int) (string, error)

SaveRetried tries to SaveData specified number of times

func SetCreds added in v0.5.33

func SetCreds(bucket, creds string)

SetCreds ...

Types

type FileInfo added in v0.5.13

type FileInfo struct {
	Name         string
	ETag         string
	LastModified time.Time
	Size         *int64
}

type FileInfoReader added in v0.5.13

type FileInfoReader struct {
	FileInfo
	Metadata map[string]string
	Body     io.ReadCloser
}

func ParallelReadFiles added in v0.5.13

func ParallelReadFiles(ctx context.Context, sess OSSession, filesNames []string, workers int) ([]*FileInfoReader, [][]byte, error)

ParallelReadFiles reads files in parallel, using specified number of jobs

type MemoryOS

type MemoryOS struct {
	// contains filtered or unexported fields
}

func NewMemoryDriver

func NewMemoryDriver(baseURI *url.URL) *MemoryOS

func (*MemoryOS) GetSession

func (ostore *MemoryOS) GetSession(path string) *MemorySession

func (*MemoryOS) NewSession

func (ostore *MemoryOS) NewSession(path string) OSSession

type MemorySession

type MemorySession struct {
	// contains filtered or unexported fields
}

func (*MemorySession) EndSession

func (ostore *MemorySession) EndSession()

EndSession clears memory cache

func (*MemorySession) GetData

func (ostore *MemorySession) GetData(name string) []byte

GetData returns the cached data for a name.

A name can be an absolute or relative URI. An absolute URI has the following format: - ostore.os.baseURI + /stream/ + ostore.path + path + file The following are valid relative URIs: - /stream/ + ostore.path + path + file (if ostore.os.baseURI is empty) - ostore.path + path + file

func (*MemorySession) GetInfo

func (ostore *MemorySession) GetInfo() *net.OSInfo

func (*MemorySession) IsExternal

func (ostore *MemorySession) IsExternal() bool

func (*MemorySession) IsOwn added in v0.5.13

func (ostore *MemorySession) IsOwn(url string) bool

func (*MemorySession) ListFiles added in v0.5.13

func (ostore *MemorySession) ListFiles(ctx context.Context, prefix, delim string) (PageInfo, error)

func (*MemorySession) OS added in v0.5.13

func (ostore *MemorySession) OS() OSDriver

func (*MemorySession) ReadData added in v0.5.13

func (ostore *MemorySession) ReadData(ctx context.Context, name string) (*FileInfoReader, error)

func (*MemorySession) SaveData

func (ostore *MemorySession) SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error)

type MockOSSession added in v0.5.13

type MockOSSession struct {
	mock.Mock
	// contains filtered or unexported fields
}

func NewMockOSSession added in v0.5.19

func NewMockOSSession() *MockOSSession

func (*MockOSSession) EndSession added in v0.5.13

func (s *MockOSSession) EndSession()

func (*MockOSSession) GetInfo added in v0.5.13

func (s *MockOSSession) GetInfo() *net.OSInfo

func (*MockOSSession) IsExternal added in v0.5.13

func (s *MockOSSession) IsExternal() bool

func (*MockOSSession) IsOwn added in v0.5.13

func (s *MockOSSession) IsOwn(url string) bool

func (*MockOSSession) ListFiles added in v0.5.13

func (s *MockOSSession) ListFiles(ctx context.Context, prefix, delim string) (PageInfo, error)

func (*MockOSSession) OS added in v0.5.13

func (s *MockOSSession) OS() OSDriver

func (*MockOSSession) ReadData added in v0.5.13

func (s *MockOSSession) ReadData(ctx context.Context, name string) (*FileInfoReader, error)

func (*MockOSSession) SaveData added in v0.5.13

func (s *MockOSSession) SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error)

type OSDriver

type OSDriver interface {
	NewSession(path string) OSSession
}

OSDriver common interface for Object Storage

var NodeStorage OSDriver

NodeStorage is current node's primary driver

var RecordStorage OSDriver

RecordStorage is current node's "stream recording" driver

func NewCustomS3Driver added in v0.5.13

func NewCustomS3Driver(host, bucket, region, accessKey, accessKeySecret string, useFullAPI bool) (OSDriver, error)

NewCustomS3Driver for creating S3-compatible stores other than S3 itself

func NewGoogleDriver

func NewGoogleDriver(bucket, keyData string, useFullAPI bool) (OSDriver, error)

func NewS3Driver

func NewS3Driver(region, bucket, accessKey, accessKeySecret string, useFullAPI bool) (OSDriver, error)

func ParseOSURL added in v0.5.13

func ParseOSURL(input string, useFullAPI bool) (OSDriver, error)

ParseOSURL returns the correct OS for a given OS url

type OSSession

type OSSession interface {
	OS() OSDriver

	SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error)
	EndSession()

	// Info in order to have this session used via RPC
	GetInfo() *net.OSInfo

	// Indicates whether data may be external to this node
	IsExternal() bool

	// Indicates whether this is the correct OS for a given URL
	IsOwn(url string) bool

	// ListFiles return list of files
	ListFiles(ctx context.Context, prefix, delim string) (PageInfo, error)

	ReadData(ctx context.Context, name string) (*FileInfoReader, error)
}

func NewSession

func NewSession(info *net.OSInfo) OSSession

NewSession returns new session based on OSInfo received from the network

type OverwriteQueue added in v0.5.19

type OverwriteQueue struct {
	// contains filtered or unexported fields
}

func NewOverwriteQueue added in v0.5.19

func NewOverwriteQueue(session OSSession, name, desc string, maxRetries int, initialTimeout, maxTimeout time.Duration) *OverwriteQueue

func (*OverwriteQueue) Save added in v0.5.19

func (oq *OverwriteQueue) Save(data []byte)

Save queues data to be saved

func (*OverwriteQueue) StopAfter added in v0.5.19

func (oq *OverwriteQueue) StopAfter(pause time.Duration)

StopAfter stops reading loop after some time

type PageInfo added in v0.5.13

type PageInfo interface {
	Files() []FileInfo
	Directories() []string
	HasNextPage() bool
	NextPage() (PageInfo, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL