Documentation ¶
Overview ¶
Copyright 2019 The OpenSDS Authors.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2019 The OpenSDS Authors.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- type CephStorage
- func (cluster *CephStorage) Append(poolname string, oid string, data io.Reader, offset uint64, isExist bool) (size int64, err error)
- func (cluster *CephStorage) GetUniqUploadName() string
- func (cluster *CephStorage) GetUsedSpacePercent() (pct int, err error)
- func (cluster *CephStorage) Put(poolname string, oid string, data io.Reader) (size int64, err error)
- func (cluster *CephStorage) Remove(poolname string, oid string) error
- func (c *CephStorage) Shutdown()
- type GcMgr
- func (gm *GcMgr) CreateGcObjectRecordCleanStream(in ...<-chan *GcObjectResult) <-chan *GcObjectResult
- func (gm *GcMgr) CreateObjectDeleteStream(in <-chan *types.GcObject) <-chan *GcObjectResult
- func (gm *GcMgr) QueryGcObjectStream() <-chan *types.GcObject
- func (gm *GcMgr) Start()
- func (gm *GcMgr) Stop()
- type GcObjectResult
- type MultipartReader
- type ObjectMetaInfo
- type RadosDownloader
- type RadosSmallDownloader
- type YigStorage
- func (yig *YigStorage) AbortMultipartUpload(ctx context.Context, multipartUpload *pb.MultipartUpload) error
- func (yig *YigStorage) BackendCheck(ctx context.Context, backendDetail *pb.BackendDetailS3) error
- func (yig *YigStorage) BucketCreate(ctx context.Context, input *pb.Bucket) error
- func (yig *YigStorage) BucketDelete(ctx context.Context, in *pb.Bucket) error
- func (yig *YigStorage) ChangeStorageClass(ctx context.Context, object *pb.Object, newClass *string) error
- func (y *YigStorage) Close() error
- func (yig *YigStorage) CompleteMultipartUpload(ctx context.Context, multipartUpload *pb.MultipartUpload, ...) (*model.CompleteMultipartUploadResult, error)
- func (yig *YigStorage) Copy(ctx context.Context, stream io.Reader, target *pb.Object) (result dscommon.PutResult, err error)
- func (yig *YigStorage) Delete(ctx context.Context, object *pb.DeleteObjectInput) error
- func (yig *YigStorage) Get(ctx context.Context, object *pb.Object, start int64, end int64) (io.ReadCloser, error)
- func (yig *YigStorage) GetClusterByFsName(fsName string) (cluster *CephStorage, err error)
- func (yig *YigStorage) InitMultipartUpload(ctx context.Context, object *pb.Object) (*pb.MultipartUpload, error)
- func (yig *YigStorage) ListParts(ctx context.Context, multipartUpload *pb.ListParts) (*model.ListPartsOutput, error)
- func (yig *YigStorage) PickOneClusterAndPool(bucket string, object string, size int64, isAppend bool) (cluster *CephStorage, poolName string)
- func (yig *YigStorage) Put(ctx context.Context, stream io.Reader, obj *pb.Object) (result dscommon.PutResult, err error)
- func (yig *YigStorage) Restore(ctx context.Context, target *pb.Restore) error
- func (yig *YigStorage) UploadPart(ctx context.Context, stream io.Reader, multipartUpload *pb.MultipartUpload, ...) (*model.UploadPartResult, error)
Constants ¶
const ( MON_TIMEOUT = "10" OSD_TIMEOUT = "10" STRIPE_UNIT = 512 << 10 /* 512K */ STRIPE_COUNT = 2 OBJECT_SIZE = 8 << 20 /* 8M */ BUFFER_SIZE = 1 << 20 /* 1M */ MIN_CHUNK_SIZE = 512 << 10 /* 512K */ MAX_CHUNK_SIZE = 8 * BUFFER_SIZE /* 8M */ SMALL_FILE_POOLNAME = "rabbit" BIG_FILE_POOLNAME = "tiger" BIG_FILE_THRESHOLD = 128 << 10 /* 128K */ AIO_CONCURRENT = 4 )
const ( GC_OBJECT_LIMIT_NUM = 10000 // max interval time of gc in seconds. GC_MAX_INTERVAL_TIME = 3600 CEPH_OBJ_NON_EXIST_ERR = "rados: ret=-2" )
const ( MAX_PART_SIZE = 5 << 30 // 5GB MIN_PART_SIZE = 5 << 20 // 5MB MIN_PART_NUMBER = 1 MAX_PART_NUMBER = 10000 )
const ( AES_BLOCK_SIZE = 16 ENCRYPTION_KEY_LENGTH = 32 // key size for AES-"256" INITIALIZATION_VECTOR_LENGTH = 16 // block size of AES DEFAULT_CEPHCONFIG_PATTERN = "conf/*.conf" )
const CLUSTER_MAX_USED_SPACE_PERCENT = 85
Variables ¶
var (
RootContext = context.Background()
)
Functions ¶
This section is empty.
Types ¶
type CephStorage ¶
type CephStorage struct { Name string Conn *rados.Conn InstanceId uint64 CountMutex *sync.Mutex Counter uint64 BufPool *sync.Pool BigBufPool *sync.Pool }
func NewCephStorage ¶
func NewCephStorage(configFile string) *CephStorage
func (*CephStorage) GetUniqUploadName ¶
func (cluster *CephStorage) GetUniqUploadName() string
func (*CephStorage) GetUsedSpacePercent ¶
func (cluster *CephStorage) GetUsedSpacePercent() (pct int, err error)
func (*CephStorage) Shutdown ¶
func (c *CephStorage) Shutdown()
type GcMgr ¶
type GcMgr struct {
// contains filtered or unexported fields
}
func (*GcMgr) CreateGcObjectRecordCleanStream ¶
func (gm *GcMgr) CreateGcObjectRecordCleanStream(in ...<-chan *GcObjectResult) <-chan *GcObjectResult
func (*GcMgr) CreateObjectDeleteStream ¶
func (gm *GcMgr) CreateObjectDeleteStream(in <-chan *types.GcObject) <-chan *GcObjectResult
func (*GcMgr) QueryGcObjectStream ¶
type GcObjectResult ¶
type MultipartReader ¶
type MultipartReader struct {
// contains filtered or unexported fields
}
func NewMultipartReader ¶
func NewMultipartReader(yig *YigStorage, uploadIdStr string, start int64, end int64) (*MultipartReader, error)
func (*MultipartReader) Close ¶
func (mr *MultipartReader) Close() error
type ObjectMetaInfo ¶
func ParseObjectMeta ¶
func ParseObjectMeta(meta string) (ObjectMetaInfo, error)
type RadosDownloader ¶
type RadosDownloader struct {
// contains filtered or unexported fields
}
func (*RadosDownloader) Close ¶
func (rd *RadosDownloader) Close() error
type RadosSmallDownloader ¶
type RadosSmallDownloader struct {
// contains filtered or unexported fields
}
func (*RadosSmallDownloader) Close ¶
func (rd *RadosSmallDownloader) Close() error
type YigStorage ¶
type YigStorage struct { DataStorage map[string]*CephStorage MetaStorage *meta.Meta KMS crypto.KMS Stopping bool // contains filtered or unexported fields }
YigStorage implements StorageDriver
func (*YigStorage) AbortMultipartUpload ¶
func (yig *YigStorage) AbortMultipartUpload(ctx context.Context, multipartUpload *pb.MultipartUpload) error
func (*YigStorage) BackendCheck ¶ added in v1.2.0
func (yig *YigStorage) BackendCheck(ctx context.Context, backendDetail *pb.BackendDetailS3) error
func (*YigStorage) BucketCreate ¶ added in v1.2.2
func (*YigStorage) BucketDelete ¶ added in v1.2.2
func (*YigStorage) ChangeStorageClass ¶
func (*YigStorage) Close ¶
func (y *YigStorage) Close() error
func (*YigStorage) CompleteMultipartUpload ¶
func (yig *YigStorage) CompleteMultipartUpload(ctx context.Context, multipartUpload *pb.MultipartUpload, completeUpload *model.CompleteMultipartUpload) (*model.CompleteMultipartUploadResult, error)
func (*YigStorage) Delete ¶
func (yig *YigStorage) Delete(ctx context.Context, object *pb.DeleteObjectInput) error
* @objectId: input object id which will be deleted. * Below is the process logic: * 1. check whether objectId is multipart uploaded, if so * retrieve all the object ids from multiparts and put them into gc. * or else, put it into gc. *
func (*YigStorage) Get ¶
func (yig *YigStorage) Get(ctx context.Context, object *pb.Object, start int64, end int64) (io.ReadCloser, error)
func (*YigStorage) GetClusterByFsName ¶
func (yig *YigStorage) GetClusterByFsName(fsName string) (cluster *CephStorage, err error)
func (*YigStorage) InitMultipartUpload ¶
func (yig *YigStorage) InitMultipartUpload(ctx context.Context, object *pb.Object) (*pb.MultipartUpload, error)
func (*YigStorage) ListParts ¶
func (yig *YigStorage) ListParts(ctx context.Context, multipartUpload *pb.ListParts) (*model.ListPartsOutput, error)
func (*YigStorage) PickOneClusterAndPool ¶
func (yig *YigStorage) PickOneClusterAndPool(bucket string, object string, size int64, isAppend bool) (cluster *CephStorage, poolName string)
func (*YigStorage) Put ¶
func (yig *YigStorage) Put(ctx context.Context, stream io.Reader, obj *pb.Object) (result dscommon.PutResult, err error)
ctx should contain below elements: * size: object size. * encryptionKey: * md5: the md5 put by user for the uploading object.
func (*YigStorage) UploadPart ¶
func (yig *YigStorage) UploadPart(ctx context.Context, stream io.Reader, multipartUpload *pb.MultipartUpload, partNumber int64, upBytes int64) (*model.UploadPartResult, error)