Documentation
¶
Index ¶
- Constants
- type ChunkService
- func (x *ChunkService) DeleteChunk(chunk *models.Chunk) (*models.Chunk, error)
- func (x *ChunkService) FreezeChunk(chunk *models.Chunk) (*models.Chunk, error)
- func (x *ChunkService) GetMergableChunks(schema string, now time.Time) ([]*models.Chunk, error)
- func (x *ChunkService) GetWritableChunks(schema, partition string, objSize int64) ([]*models.Chunk, error)
- func (x *ChunkService) IsMergableChunk(chunk *models.Chunk, ts time.Time) bool
- func (x *ChunkService) PutChunk(recordID string, size int64, schema, partition string, now time.Time) error
- func (x *ChunkService) UpdateChunk(chunk *models.Chunk, recordID string, objSize int64) error
- type ChunkServiceArguments
- type MetaService
- func (x *MetaService) GetObjectID(s3Bucket, s3Key string) (int64, error)
- func (x *MetaService) GetObjects(targetRecordIDs []string, schema models.ParquetSchemaName) ([]*models.S3Object, error)
- func (x *MetaService) HeadPartition(partitionKey string) (bool, error)
- func (x *MetaService) PutObjects(items []*repository.MetaRecordObject) error
- func (x *MetaService) PutPartition(partitionKey string) error
- type RecordService
- type S3Service
- func (x *S3Service) AsyncDownload(src models.S3Object) (io.ReadCloser, error)
- func (x *S3Service) AsyncUpload(body io.Reader, dst models.S3Object, encoding string) error
- func (x *S3Service) DeleteS3Objects(objects []*models.S3Object) error
- func (x *S3Service) DownloadS3Object(obj models.S3Object) (*string, error)
- func (x *S3Service) HeadObject(obj models.S3Object) (bool, error)
- func (x *S3Service) UploadFileToS3(filePath string, dst models.S3Object) error
- type SQSService
Constants ¶
const ( DefaultChunkKeyPrefix = "chunk/" DefaultChunkFreezedAfter = time.Minute * 3 DefaultChunkChunkMaxSize = 1280 * 1024 * 1024 DefaultChunkChunkMinSize = 1000 * 1024 * 1024 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChunkService ¶
type ChunkService struct {
// contains filtered or unexported fields
}
func NewChunkService ¶
func NewChunkService(repo repository.ChunkRepository, args *ChunkServiceArguments) *ChunkService
func (*ChunkService) DeleteChunk ¶
func (*ChunkService) FreezeChunk ¶
func (*ChunkService) GetMergableChunks ¶
func (*ChunkService) GetWritableChunks ¶
func (*ChunkService) IsMergableChunk ¶
func (*ChunkService) UpdateChunk ¶
type ChunkServiceArguments ¶
type MetaService ¶
type MetaService struct {
// contains filtered or unexported fields
}
MetaService is accessor of MetaRepository
func NewMetaService ¶
func NewMetaService(repo repository.MetaRepository, newTimer util.RetryTimerFactory) *MetaService
NewMetaService is constructor of MetaService
func (*MetaService) GetObjectID ¶
func (x *MetaService) GetObjectID(s3Bucket, s3Key string) (int64, error)
GetObjectID provides objectID that is unique ID for S3 object
func (*MetaService) GetObjects ¶
func (x *MetaService) GetObjects(targetRecordIDs []string, schema models.ParquetSchemaName) ([]*models.S3Object, error)
GetObjects retrieves set of MetaRecordObject and converts them to []*models.S3Object
func (*MetaService) HeadPartition ¶
func (x *MetaService) HeadPartition(partitionKey string) (bool, error)
HeadPartition checks an existance of partition and cache the result.
func (*MetaService) PutObjects ¶
func (x *MetaService) PutObjects(items []*repository.MetaRecordObject) error
PutObjects puts set of MetaRecordObject
func (*MetaService) PutPartition ¶
func (x *MetaService) PutPartition(partitionKey string) error
PutPartition register an existance of partition and cache the result.
type RecordService ¶
type RecordService struct { ObjectSizeLimit int64 // contains filtered or unexported fields }
func NewRecordService ¶
func NewRecordService(newS3 adaptor.S3ClientFactory, newEncoder adaptor.EncoderFactory, newDecoder adaptor.DecoderFactory) *RecordService
func (*RecordService) Close ¶
func (x *RecordService) Close() error
func (*RecordService) Load ¶
func (x *RecordService) Load(src *models.S3Object, schema models.ParquetSchemaName, ch chan *models.RecordQueue) error
func (*RecordService) RawObjects ¶
func (x *RecordService) RawObjects() []*models.RawObject
type S3Service ¶
type S3Service struct {
// contains filtered or unexported fields
}
S3Service is accessor to S3
func NewS3Service ¶
func NewS3Service(newS3 adaptor.S3ClientFactory) *S3Service
NewS3Service is constructor of
func (*S3Service) AsyncDownload ¶
AsyncDownload is for downloading data via io.ReadCloser
func (*S3Service) AsyncUpload ¶
AsyncUpload is for uploading object by io.Reader.
func (*S3Service) DeleteS3Objects ¶
DeleteS3Objects is warpper of s3.DeleteObjects
func (*S3Service) DownloadS3Object ¶
DownloadS3Object downloads a specified remote object from S3
func (*S3Service) HeadObject ¶
HeadObject checks object existance. If HeadObject got awserr, return false, nil anyway.
type SQSService ¶
type SQSService struct {
// contains filtered or unexported fields
}
SQSService is accessor to SQS
func NewSQSService ¶
func NewSQSService(newSQS adaptor.SQSClientFactory) *SQSService
NewSQSService is constructor of
func (*SQSService) DeleteMessage ¶
func (x *SQSService) DeleteMessage(url string, receipt string) error
DeleteMessage is wrapper of sqs:DeleteMessage
func (*SQSService) ReceiveMessage ¶
func (x *SQSService) ReceiveMessage(url string, timeout int64, msg interface{}) (*string, error)
ReceiveMessage is wrapper of sqs:ReceiveMessage
func (*SQSService) SendSQS ¶
func (x *SQSService) SendSQS(msg interface{}, url string) error
SendSQS is wrapper of sqs:SendMessage of AWS