Documentation ¶
Index ¶
- Constants
- Variables
- type CephStorage
- func (cluster *CephStorage) Append(poolname string, oid string, data io.Reader, offset uint64, isExist bool) (size int64, err error)
- func (cluster *CephStorage) GetUniqUploadName() string
- func (cluster *CephStorage) GetUsedSpacePercent() (pct int, err error)
- func (cluster *CephStorage) Put(poolname string, oid string, data io.Reader) (size int64, err error)
- func (cluster *CephStorage) Remove(poolname string, oid string) error
- func (c *CephStorage) Shutdown()
- type DataCache
- type RadosDownloader
- type RadosSmallDownloader
- type ReadCloser
- type YigStorage
- func (yig *YigStorage) AbortMultipartUpload(credential common.Credential, bucketName, objectName, uploadId string) error
- func (yig *YigStorage) AppendObject(bucketName string, objectName string, credential common.Credential, ...) (result datatype.AppendObjectResult, err error)
- func (yig *YigStorage) CompleteMultipartUpload(credential common.Credential, bucketName, objectName, uploadId string, ...) (result datatype.CompleteMultipartResult, err error)
- func (yig *YigStorage) CopyObject(targetObject *meta.Object, source io.Reader, credential common.Credential, ...) (result datatype.PutObjectResult, err error)
- func (yig *YigStorage) CopyObjectPart(bucketName, objectName, uploadId string, partId int, size int64, ...) (result datatype.PutObjectResult, err error)
- func (yig *YigStorage) DelBucketLc(bucketName string, credential common.Credential) error
- func (yig *YigStorage) DeleteBucket(bucketName string, credential common.Credential) (err error)
- func (yig *YigStorage) DeleteBucketCors(bucketName string, credential common.Credential) error
- func (yig *YigStorage) DeleteBucketPolicy(credential common.Credential, bucketName string) error
- func (yig *YigStorage) DeleteObject(bucketName string, objectName string, version string, ...) (result datatype.DeleteObjectResult, err error)
- func (yig *YigStorage) GetBucket(bucketName string) (meta.Bucket, error)
- func (yig *YigStorage) GetBucketAcl(bucketName string, credential common.Credential) (policy datatype.AccessControlPolicyResponse, err error)
- func (yig *YigStorage) GetBucketCors(bucketName string, credential common.Credential) (cors datatype.Cors, err error)
- func (yig *YigStorage) GetBucketInfo(bucketName string, credential common.Credential) (bucket meta.Bucket, err error)
- func (yig *YigStorage) GetBucketLc(bucketName string, credential common.Credential) (lc datatype.Lc, err error)
- func (yig *YigStorage) GetBucketPolicy(credential common.Credential, bucketName string) (bucketPolicy policy.Policy, err error)
- func (yig *YigStorage) GetBucketVersioning(bucketName string, credential common.Credential) (versioning datatype.Versioning, err error)
- func (yig *YigStorage) GetClusterByFsName(fsName string) (cluster *CephStorage, err error)
- func (yig *YigStorage) GetObject(object *meta.Object, startOffset int64, length int64, writer io.Writer, ...) (err error)
- func (yig *YigStorage) GetObjectAcl(bucketName string, objectName string, version string, ...) (policy datatype.AccessControlPolicyResponse, err error)
- func (yig *YigStorage) GetObjectInfo(bucketName string, objectName string, version string, ...) (object *meta.Object, err error)
- func (yig *YigStorage) ListBuckets(credential common.Credential) (buckets []meta.Bucket, err error)
- func (yig *YigStorage) ListMultipartUploads(credential common.Credential, bucketName string, ...) (result datatype.ListMultipartUploadsResponse, err error)
- func (yig *YigStorage) ListObjectParts(credential common.Credential, bucketName, objectName string, ...) (result datatype.ListPartsResponse, err error)
- func (yig *YigStorage) ListObjects(credential common.Credential, bucketName string, ...) (result meta.ListObjectsInfo, err error)
- func (yig *YigStorage) ListObjectsInternal(bucketName string, request datatype.ListObjectsRequest) (retObjects []*meta.Object, prefixes []string, truncated bool, ...)
- func (yig *YigStorage) ListVersionedObjects(credential common.Credential, bucketName string, ...) (result meta.VersionedListObjectsInfo, err error)
- func (yig *YigStorage) MakeBucket(bucketName string, acl datatype.Acl, credential common.Credential) error
- func (yig *YigStorage) NewMultipartUpload(credential common.Credential, bucketName, objectName string, ...) (uploadId string, err error)
- func (yig *YigStorage) PickOneClusterAndPool(bucket string, object string, size int64, isAppend bool) (cluster *CephStorage, poolName string)
- func (y *YigStorage) PingCache(interval time.Duration)
- func (yig *YigStorage) PutObject(bucketName string, objectName string, credential common.Credential, size int64, ...) (result datatype.PutObjectResult, err error)
- func (yig *YigStorage) PutObjectPart(bucketName, objectName string, credential common.Credential, uploadId string, ...) (result datatype.PutObjectPartResult, err error)
- func (yig *YigStorage) SetBucketAcl(bucketName string, policy datatype.AccessControlPolicy, acl datatype.Acl, ...) error
- func (yig *YigStorage) SetBucketCors(bucketName string, cors datatype.Cors, credential common.Credential) error
- func (yig *YigStorage) SetBucketLc(bucketName string, lc datatype.Lc, credential common.Credential) error
- func (yig *YigStorage) SetBucketPolicy(credential common.Credential, bucketName string, bucketPolicy policy.Policy) (err error)
- func (yig *YigStorage) SetBucketVersioning(bucketName string, versioning datatype.Versioning, ...) error
- func (yig *YigStorage) SetObjectAcl(bucketName string, objectName string, version string, ...) error
- func (y *YigStorage) Stop()
- func (yig *YigStorage) UpdateObjectAttrs(targetObject *meta.Object, credential common.Credential) (result datatype.PutObjectResult, err error)
Constants ¶
const ( MON_TIMEOUT = "10" OSD_TIMEOUT = "10" STRIPE_UNIT = 512 << 10 /* 512K */ STRIPE_COUNT = 2 OBJECT_SIZE = 8 << 20 /* 8M */ BUFFER_SIZE = 1 << 20 /* 1M */ MIN_CHUNK_SIZE = 512 << 10 /* 512K */ MAX_CHUNK_SIZE = 8 * BUFFER_SIZE /* 8M */ SMALL_FILE_POOLNAME = "rabbit" BIG_FILE_POOLNAME = "tiger" BIG_FILE_THRESHOLD = 128 << 10 /* 128K */ AIO_CONCURRENT = 4 )
const ( MAX_PART_SIZE = 5 << 30 // 5GB MAX_PART_NUMBER = 10000 )
const ( RECYCLE_QUEUE_SIZE = 100 MAX_TRY_TIMES = 3 )
const ( AES_BLOCK_SIZE = 16 ENCRYPTION_KEY_LENGTH = 32 // key size for AES-"256" INITIALIZATION_VECTOR_LENGTH = 16 // block size of AES DEFAULT_CEPHCONFIG_PATTERN = "conf/*.conf" )
const CLUSTER_MAX_USED_SPACE_PERCENT = 85
const ( // only objects smaller than threshold are cached FILE_CACHE_THRESHOLD_SIZE = 4 << 20 // 4M )
Variables ¶
var RecycleQueue chan objectToRecycle
var (
RootContext = context.Background()
)
Functions ¶
This section is empty.
Types ¶
type CephStorage ¶ added in v1.1.0
type CephStorage struct { Name string Conn *rados.Conn InstanceId uint64 Logger *log.Logger CountMutex *sync.Mutex Counter uint64 }
func NewCephStorage ¶ added in v1.1.0
func NewCephStorage(configFile string, logger *log.Logger) *CephStorage
func (*CephStorage) GetUniqUploadName ¶ added in v1.1.0
func (cluster *CephStorage) GetUniqUploadName() string
func (*CephStorage) GetUsedSpacePercent ¶ added in v1.1.0
func (cluster *CephStorage) GetUsedSpacePercent() (pct int, err error)
func (*CephStorage) Remove ¶ added in v1.1.0
func (cluster *CephStorage) Remove(poolname string, oid string) error
func (*CephStorage) Shutdown ¶ added in v1.1.0
func (c *CephStorage) Shutdown()
type DataCache ¶
type DataCache interface { WriteFromCache(object *meta.Object, startOffset int64, length int64, out io.Writer, writeThrough func(io.Writer) error, onCacheMiss func(io.Writer) error) error GetAlignedReader(object *meta.Object, startOffset int64, length int64, readThrough func() (io.ReadCloser, error), onCacheMiss func(io.Writer) error) (io.ReadCloser, error) Remove(key string) }
type RadosDownloader ¶ added in v1.1.0
type RadosDownloader struct {
// contains filtered or unexported fields
}
func (*RadosDownloader) Close ¶ added in v1.1.0
func (rd *RadosDownloader) Close() error
type RadosSmallDownloader ¶ added in v1.1.0
type RadosSmallDownloader struct {
// contains filtered or unexported fields
}
func (*RadosSmallDownloader) Close ¶ added in v1.1.0
func (rd *RadosSmallDownloader) Close() error
type ReadCloser ¶
type ReadCloser struct {
// contains filtered or unexported fields
}
func (*ReadCloser) Close ¶
func (r *ReadCloser) Close() error
type YigStorage ¶
type YigStorage struct { DataStorage map[string]*CephStorage DataCache DataCache MetaStorage *meta.Meta KMS crypto.KMS Logger *log.Logger Stopping bool WaitGroup *sync.WaitGroup }
*YigStorage implements api.ObjectLayer
func (*YigStorage) AbortMultipartUpload ¶
func (yig *YigStorage) AbortMultipartUpload(credential common.Credential, bucketName, objectName, uploadId string) error
func (*YigStorage) AppendObject ¶
func (yig *YigStorage) AppendObject(bucketName string, objectName string, credential common.Credential, offset uint64, size int64, data io.Reader, metadata map[string]string, acl datatype.Acl, sseRequest datatype.SseRequest, objInfo *meta.Object) (result datatype.AppendObjectResult, err error)
TODO: Append Support Encryption
func (*YigStorage) CompleteMultipartUpload ¶
func (yig *YigStorage) CompleteMultipartUpload(credential common.Credential, bucketName, objectName, uploadId string, uploadedParts []meta.CompletePart) (result datatype.CompleteMultipartResult, err error)
func (*YigStorage) CopyObject ¶
func (yig *YigStorage) CopyObject(targetObject *meta.Object, source io.Reader, credential common.Credential, sseRequest datatype.SseRequest) (result datatype.PutObjectResult, err error)
func (*YigStorage) CopyObjectPart ¶
func (yig *YigStorage) CopyObjectPart(bucketName, objectName, uploadId string, partId int, size int64, data io.Reader, credential common.Credential, sseRequest datatype.SseRequest) (result datatype.PutObjectResult, err error)
func (*YigStorage) DelBucketLc ¶ added in v1.1.0
func (yig *YigStorage) DelBucketLc(bucketName string, credential common.Credential) error
func (*YigStorage) DeleteBucket ¶
func (yig *YigStorage) DeleteBucket(bucketName string, credential common.Credential) (err error)
func (*YigStorage) DeleteBucketCors ¶
func (yig *YigStorage) DeleteBucketCors(bucketName string, credential common.Credential) error
func (*YigStorage) DeleteBucketPolicy ¶
func (yig *YigStorage) DeleteBucketPolicy(credential common.Credential, bucketName string) error
func (*YigStorage) DeleteObject ¶
func (yig *YigStorage) DeleteObject(bucketName string, objectName string, version string, credential common.Credential) (result datatype.DeleteObjectResult, err error)
When bucket versioning is Disabled/Enabled/Suspended, and request versionId is set/unset:
| | with versionId | without versionId | |-----------|------------------------------|--------------------------------------------------------| | Disabled | error | remove object | | Enabled | remove corresponding version | add a delete marker | | Suspended | remove corresponding version | remove null version object(if exists) and add a | | | | null version delete marker |
See http://docs.aws.amazon.com/AmazonS3/latest/dev/Versioning.html
func (*YigStorage) GetBucket ¶
func (yig *YigStorage) GetBucket(bucketName string) (meta.Bucket, error)
For INTERNAL USE ONLY
func (*YigStorage) GetBucketAcl ¶
func (yig *YigStorage) GetBucketAcl(bucketName string, credential common.Credential) ( policy datatype.AccessControlPolicyResponse, err error)
func (*YigStorage) GetBucketCors ¶
func (yig *YigStorage) GetBucketCors(bucketName string, credential common.Credential) (cors datatype.Cors, err error)
func (*YigStorage) GetBucketInfo ¶
func (yig *YigStorage) GetBucketInfo(bucketName string, credential common.Credential) (bucket meta.Bucket, err error)
func (*YigStorage) GetBucketLc ¶ added in v1.1.0
func (yig *YigStorage) GetBucketLc(bucketName string, credential common.Credential) (lc datatype.Lc, err error)
func (*YigStorage) GetBucketPolicy ¶
func (yig *YigStorage) GetBucketPolicy(credential common.Credential, bucketName string) (bucketPolicy policy.Policy, err error)
func (*YigStorage) GetBucketVersioning ¶
func (yig *YigStorage) GetBucketVersioning(bucketName string, credential common.Credential) ( versioning datatype.Versioning, err error)
func (*YigStorage) GetClusterByFsName ¶
func (yig *YigStorage) GetClusterByFsName(fsName string) (cluster *CephStorage, err error)
func (*YigStorage) GetObject ¶
func (yig *YigStorage) GetObject(object *meta.Object, startOffset int64, length int64, writer io.Writer, sseRequest datatype.SseRequest) (err error)
func (*YigStorage) GetObjectAcl ¶
func (yig *YigStorage) GetObjectAcl(bucketName string, objectName string, version string, credential common.Credential) (policy datatype.AccessControlPolicyResponse, err error)
func (*YigStorage) GetObjectInfo ¶
func (yig *YigStorage) GetObjectInfo(bucketName string, objectName string, version string, credential common.Credential) (object *meta.Object, err error)
func (*YigStorage) ListBuckets ¶
func (yig *YigStorage) ListBuckets(credential common.Credential) (buckets []meta.Bucket, err error)
func (*YigStorage) ListMultipartUploads ¶
func (yig *YigStorage) ListMultipartUploads(credential common.Credential, bucketName string, request datatype.ListUploadsRequest) (result datatype.ListMultipartUploadsResponse, err error)
func (*YigStorage) ListObjectParts ¶
func (yig *YigStorage) ListObjectParts(credential common.Credential, bucketName, objectName string, request datatype.ListPartsRequest) (result datatype.ListPartsResponse, err error)
func (*YigStorage) ListObjects ¶
func (yig *YigStorage) ListObjects(credential common.Credential, bucketName string, request datatype.ListObjectsRequest) (result meta.ListObjectsInfo, err error)
func (*YigStorage) ListObjectsInternal ¶
func (yig *YigStorage) ListObjectsInternal(bucketName string, request datatype.ListObjectsRequest) (retObjects []*meta.Object, prefixes []string, truncated bool, nextMarker, nextVerIdMarker string, err error)
func (*YigStorage) ListVersionedObjects ¶
func (yig *YigStorage) ListVersionedObjects(credential common.Credential, bucketName string, request datatype.ListObjectsRequest) (result meta.VersionedListObjectsInfo, err error)
TODO: refactor, similar to ListObjects or not?
func (*YigStorage) MakeBucket ¶
func (yig *YigStorage) MakeBucket(bucketName string, acl datatype.Acl, credential common.Credential) error
func (*YigStorage) NewMultipartUpload ¶
func (yig *YigStorage) NewMultipartUpload(credential common.Credential, bucketName, objectName string, metadata map[string]string, acl datatype.Acl, sseRequest datatype.SseRequest) (uploadId string, err error)
func (*YigStorage) PickOneClusterAndPool ¶ added in v1.1.0
func (yig *YigStorage) PickOneClusterAndPool(bucket string, object string, size int64, isAppend bool) (cluster *CephStorage, poolName string)
func (*YigStorage) PingCache ¶
func (y *YigStorage) PingCache(interval time.Duration)
check cache health per one second if enable cache
func (*YigStorage) PutObject ¶
func (yig *YigStorage) PutObject(bucketName string, objectName string, credential common.Credential, size int64, data io.Reader, metadata map[string]string, acl datatype.Acl, sseRequest datatype.SseRequest) (result datatype.PutObjectResult, err error)
Write path:
+-----------+
PUT object/part | | Ceph
+---------+------------+----------+ Encryptor +-----> | | | | | | +-----------+ v v SHA256 MD5(ETag)
SHA256 is calculated only for v4 signed authentication Encryptor is enabled when user set SSE headers
func (*YigStorage) PutObjectPart ¶
func (yig *YigStorage) PutObjectPart(bucketName, objectName string, credential common.Credential, uploadId string, partId int, size int64, data io.Reader, md5Hex string, sseRequest datatype.SseRequest) (result datatype.PutObjectPartResult, err error)
func (*YigStorage) SetBucketAcl ¶
func (yig *YigStorage) SetBucketAcl(bucketName string, policy datatype.AccessControlPolicy, acl datatype.Acl, credential common.Credential) error
func (*YigStorage) SetBucketCors ¶
func (yig *YigStorage) SetBucketCors(bucketName string, cors datatype.Cors, credential common.Credential) error
func (*YigStorage) SetBucketLc ¶ added in v1.1.0
func (yig *YigStorage) SetBucketLc(bucketName string, lc datatype.Lc, credential common.Credential) error
func (*YigStorage) SetBucketPolicy ¶
func (yig *YigStorage) SetBucketPolicy(credential common.Credential, bucketName string, bucketPolicy policy.Policy) (err error)
func (*YigStorage) SetBucketVersioning ¶
func (yig *YigStorage) SetBucketVersioning(bucketName string, versioning datatype.Versioning, credential common.Credential) error
func (*YigStorage) SetObjectAcl ¶
func (yig *YigStorage) SetObjectAcl(bucketName string, objectName string, version string, policy datatype.AccessControlPolicy, acl datatype.Acl, credential common.Credential) error
func (*YigStorage) Stop ¶
func (y *YigStorage) Stop()
func (*YigStorage) UpdateObjectAttrs ¶ added in v1.1.8
func (yig *YigStorage) UpdateObjectAttrs(targetObject *meta.Object, credential common.Credential) (result datatype.PutObjectResult, err error)