operation

package
v0.0.0-...-45e1a9a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2024 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// when the remote server does not allow range requests (Accept-Ranges was not set)
	ErrRangeRequestsNotSupported = errors.New("Range requests are not supported by the remote server")
	// ErrInvalidRange is returned by Read when trying to read past the end of the file
	ErrInvalidRange = errors.New("Invalid range")
)
View Source
var ErrorNotFound = errors.New("not found")

Functions

func DeleteFiles

func DeleteFiles(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error)

DeleteFiles batch deletes a list of fileIds

func DeleteFilesAtOneVolumeServer

func DeleteFilesAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error)

DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc

func DeleteFilesWithLookupVolumeId

func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]*LookupResult, error)) ([]*volume_server_pb.DeleteResult, error)

func GetBuffer

func GetBuffer() *bytebufferpool.ByteBuffer

func GetVolumeSyncStatus

func GetVolumeSyncStatus(server pb.ServerAddress, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error)

func LookupFileId

func LookupFileId(masterFn GetMasterFn, grpcDialOption grpc.DialOption, fileId string) (fullUrl string, jwt string, err error)

func LookupJwt

func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt)

func LookupVolumeIds

func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids []string) (map[string]*LookupResult, error)

LookupVolumeIds find volume locations by cache and actual lookup

func ParseFileId

func ParseFileId(fid string) (vid string, key_cookie string, err error)

func PutBuffer

func PutBuffer(buf *bytebufferpool.ByteBuffer)

func TailVolume

func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error

func TailVolumeFromSource

func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error

func WithMasterServerClient

func WithMasterServerClient(streamingMode bool, masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error

func WithVolumeServerClient

func WithVolumeServerClient(streamingMode bool, volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error

Types

type AssignProxy

type AssignProxy struct {
	// contains filtered or unexported fields
}

This is a proxy to the master server, only for assigning volume ids. It runs via grpc to the master server in streaming mode. The connection to the master would only be re-established when the last connection has error.

func NewAssignProxy

func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concurrency int) (ap *AssignProxy, err error)

func (*AssignProxy) Assign

func (ap *AssignProxy) Assign(primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error)

type AssignResult

type AssignResult struct {
	Fid       string              `json:"fid,omitempty"`
	Url       string              `json:"url,omitempty"`
	PublicUrl string              `json:"publicUrl,omitempty"`
	GrpcPort  int                 `json:"grpcPort,omitempty"`
	Count     uint64              `json:"count,omitempty"`
	Error     string              `json:"error,omitempty"`
	Auth      security.EncodedJwt `json:"auth,omitempty"`
	Replicas  []Location          `json:"replicas,omitempty"`
}

func Assign

func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error)

type ChunkInfo

type ChunkInfo struct {
	Fid    string `json:"fid"`
	Offset int64  `json:"offset"`
	Size   int64  `json:"size"`
}

type ChunkList

type ChunkList []*ChunkInfo

func (ChunkList) Len

func (s ChunkList) Len() int

func (ChunkList) Less

func (s ChunkList) Less(i, j int) bool

func (ChunkList) Swap

func (s ChunkList) Swap(i, j int)

type ChunkManifest

type ChunkManifest struct {
	Name   string    `json:"name,omitempty"`
	Mime   string    `json:"mime,omitempty"`
	Size   int64     `json:"size,omitempty"`
	Chunks ChunkList `json:"chunks,omitempty"`
}

func LoadChunkManifest

func LoadChunkManifest(buffer []byte, isCompressed bool) (*ChunkManifest, error)

func (*ChunkManifest) DeleteChunks

func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption) error

func (*ChunkManifest) Marshal

func (cm *ChunkManifest) Marshal() ([]byte, error)

type ChunkedFileReader

type ChunkedFileReader struct {
	// contains filtered or unexported fields
}

seekable chunked file reader

func NewChunkedFileReader

func NewChunkedFileReader(chunkList []*ChunkInfo, master pb.ServerAddress, grpcDialOption grpc.DialOption) *ChunkedFileReader

func (*ChunkedFileReader) Close

func (cf *ChunkedFileReader) Close() (e error)

func (*ChunkedFileReader) Read

func (cf *ChunkedFileReader) Read(p []byte) (int, error)

func (*ChunkedFileReader) ReadAt

func (cf *ChunkedFileReader) ReadAt(p []byte, off int64) (n int, err error)

func (*ChunkedFileReader) Seek

func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error)

func (*ChunkedFileReader) WriteTo

func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error)

type DeleteResult

type DeleteResult struct {
	Fid    string `json:"fid"`
	Size   int    `json:"size"`
	Status int    `json:"status"`
	Error  string `json:"error,omitempty"`
}

type FilePart

type FilePart struct {
	Reader      io.Reader
	FileName    string
	FileSize    int64
	MimeType    string
	ModTime     int64 //in seconds
	Replication string
	Collection  string
	DataCenter  string
	Ttl         string
	DiskType    string
	Server      string //this comes from assign result
	Fid         string //this comes from assign result, but customizable
	Fsync       bool
}

func NewFileParts

func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error)

func (FilePart) Upload

func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error)

type GetMasterFn

type GetMasterFn func(ctx context.Context) pb.ServerAddress

type HTTPClient

type HTTPClient interface {
	Do(req *http.Request) (*http.Response, error)
}

HTTPClient interface for testing

var (
	HttpClient HTTPClient
)

type JoinResult

type JoinResult struct {
	VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"`
	Error           string `json:"error,omitempty"`
}

type Location

type Location struct {
	Url        string `json:"url,omitempty"`
	PublicUrl  string `json:"publicUrl,omitempty"`
	DataCenter string `json:"dataCenter,omitempty"`
	GrpcPort   int    `json:"grpcPort,omitempty"`
}

func (*Location) ServerAddress

func (l *Location) ServerAddress() pb.ServerAddress

type LookupResult

type LookupResult struct {
	VolumeOrFileId string     `json:"volumeOrFileId,omitempty"`
	Locations      []Location `json:"locations,omitempty"`
	Jwt            string     `json:"jwt,omitempty"`
	Error          string     `json:"error,omitempty"`
}

func LookupVolumeId

func LookupVolumeId(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid string) (*LookupResult, error)

func (*LookupResult) String

func (lr *LookupResult) String() string

type StorageOption

type StorageOption struct {
	Replication       string
	DiskType          string
	Collection        string
	DataCenter        string
	Rack              string
	DataNode          string
	TtlSeconds        int32
	VolumeGrowthCount uint32
	MaxFileNameLength uint32
	Fsync             bool
	SaveInside        bool
}

func (*StorageOption) ToAssignRequests

func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, altRequest *VolumeAssignRequest)

func (*StorageOption) TtlString

func (so *StorageOption) TtlString() string

type SubmitResult

type SubmitResult struct {
	FileName string `json:"fileName,omitempty"`
	FileUrl  string `json:"url,omitempty"`
	Fid      string `json:"fid,omitempty"`
	Size     uint32 `json:"size,omitempty"`
	Error    string `json:"error,omitempty"`
}

func SubmitFiles

func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error)

type UploadOption

type UploadOption struct {
	UploadUrl         string
	Filename          string
	Cipher            bool
	IsInputCompressed bool
	MimeType          string
	PairMap           map[string]string
	Jwt               security.EncodedJwt
	RetryForever      bool
	Md5               string
	BytesBuffer       *bytes.Buffer
}

type UploadResult

type UploadResult struct {
	Name       string `json:"name,omitempty"`
	Size       uint32 `json:"size,omitempty"`
	Error      string `json:"error,omitempty"`
	ETag       string `json:"eTag,omitempty"`
	CipherKey  []byte `json:"cipherKey,omitempty"`
	Mime       string `json:"mime,omitempty"`
	Gzip       uint32 `json:"gzip,omitempty"`
	ContentMd5 string `json:"contentMd5,omitempty"`
	RetryCount int    `json:"-"`
}

func Upload

func Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte)

Upload sends a POST request to a volume server to upload the content with fast compression

func UploadData

func UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error)

Upload sends a POST request to a volume server to upload the content with adjustable compression level

func UploadWithRetry

func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte)

UploadWithRetry will retry both assigning volume request and uploading content The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume.

func (*UploadResult) ToPbFileChunk

func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64, tsNs int64) *filer_pb.FileChunk

type VidCache

type VidCache struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*VidCache) Get

func (vc *VidCache) Get(vid string) ([]Location, error)

func (*VidCache) Set

func (vc *VidCache) Set(vid string, locations []Location, duration time.Duration)

type VidInfo

type VidInfo struct {
	Locations       []Location
	NextRefreshTime time.Time
}

type VolumeAssignRequest

type VolumeAssignRequest struct {
	Count               uint64
	Replication         string
	Collection          string
	Ttl                 string
	DiskType            string
	DataCenter          string
	Rack                string
	DataNode            string
	WritableVolumeCount uint32
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL