Documentation ¶
Index ¶
- Constants
- Variables
- func GetExtentHandlerID() uint64
- type AppendExtentKeyFunc
- type DirtyExtentList
- type EvictIcacheFunc
- type EvictRequest
- type ExtentCache
- func (cache *ExtentCache) Append(ek *proto.ExtentKey, sync bool) (discardExtents []proto.ExtentKey)
- func (cache *ExtentCache) Get(offset uint64) (ret *proto.ExtentKey)
- func (cache *ExtentCache) List() []*proto.ExtentKey
- func (cache *ExtentCache) Max() *proto.ExtentKey
- func (cache *ExtentCache) PrepareReadRequests(offset, size int, data []byte) []*ExtentRequest
- func (cache *ExtentCache) PrepareWriteRequests(offset, size int, data []byte) []*ExtentRequest
- func (cache *ExtentCache) Refresh(inode uint64, getExtents GetExtentsFunc) error
- func (cache *ExtentCache) RemoveDiscard(discardExtents []proto.ExtentKey)
- func (cache *ExtentCache) SetSize(size uint64, sync bool)
- func (cache *ExtentCache) Size() (size int, gen uint64)
- type ExtentClient
- func (client *ExtentClient) Close() error
- func (client *ExtentClient) CloseStream(inode uint64) error
- func (client *ExtentClient) EvictStream(inode uint64) error
- func (client *ExtentClient) FileSize(inode uint64) (size int, gen uint64, valid bool)
- func (client *ExtentClient) Flush(inode uint64) error
- func (client *ExtentClient) GetRate() string
- func (client *ExtentClient) GetStreamer(inode uint64) *Streamer
- func (client *ExtentClient) OpenStream(inode uint64) error
- func (client *ExtentClient) Read(inode uint64, data []byte, offset int, size int) (read int, err error)
- func (client *ExtentClient) RefreshExtentsCache(inode uint64) error
- func (client *ExtentClient) SetFileSize(inode uint64, size int)
- func (client *ExtentClient) SetReadRate(val int) string
- func (client *ExtentClient) SetWriteRate(val int) string
- func (client *ExtentClient) Truncate(inode uint64, size int) error
- func (client *ExtentClient) Write(inode uint64, offset int, data []byte, flags int) (write int, err error)
- type ExtentConfig
- type ExtentHandler
- type ExtentReader
- type ExtentRequest
- type FlushRequest
- type GetExtentsFunc
- type GetReplyFunc
- type OpenRequest
- type Packet
- func NewCreateExtentPacket(dp *wrapper.DataPartition, inode uint64) *Packet
- func NewOverwritePacket(dp *wrapper.DataPartition, extentID uint64, extentOffset int, inode uint64, ...) *Packet
- func NewReadPacket(key *proto.ExtentKey, extentOffset, size int, inode uint64, fileOffset int, ...) *Packet
- func NewReply(reqID int64, partitionID uint64, extentID uint64) *Packet
- func NewWritePacket(inode uint64, fileOffset, storeMode int) *Packet
- type ReleaseRequest
- type StreamConn
- type Streamer
- func (s *Streamer) GetExtentReader(ek *proto.ExtentKey) (*ExtentReader, error)
- func (s *Streamer) GetExtents() error
- func (s *Streamer) IssueEvictRequest() error
- func (s *Streamer) IssueFlushRequest() error
- func (s *Streamer) IssueOpenRequest() error
- func (s *Streamer) IssueReleaseRequest() error
- func (s *Streamer) IssueTruncRequest(size int) error
- func (s *Streamer) IssueWriteRequest(offset int, data []byte, flags int) (write int, err error)
- func (s *Streamer) String() string
- type TruncRequest
- type TruncateFunc
- type WriteRequest
Constants ¶
const ( MaxMountRetryLimit = 5 MountRetryInterval = time.Second * 5 )
const ( ExtentStatusOpen int32 = iota ExtentStatusClosed ExtentStatusRecovery ExtentStatusError )
State machines
const ( StreamSendMaxRetry = 200 StreamSendSleepInterval = 100 * time.Millisecond )
const ( MaxSelectDataPartitionForWrite = 32 MaxNewHandlerRetry = 3 MaxPacketErrorCount = 32 MaxDirtyListLen = 0 )
const ( StreamerNormal int32 = iota StreamerError )
Variables ¶
var (
StreamConnPool = util.NewConnectPool()
)
var (
TryOtherAddrError = errors.New("TryOtherAddrError")
)
Functions ¶
func GetExtentHandlerID ¶
func GetExtentHandlerID() uint64
GetExtentHandlerID returns the extent handler ID.
Types ¶
type AppendExtentKeyFunc ¶
type DirtyExtentList ¶
DirtyExtentList defines the struct of the dirty extent list.
func NewDirtyExtentList ¶
func NewDirtyExtentList() *DirtyExtentList
NewDirtyExtentList returns a new DirtyExtentList instance.
func (*DirtyExtentList) Get ¶
func (dl *DirtyExtentList) Get() *list.Element
Get gets the next element in the dirty extent list.
func (*DirtyExtentList) Len ¶
func (dl *DirtyExtentList) Len() int
Len returns the size of the dirty extent list.
func (*DirtyExtentList) Put ¶
func (dl *DirtyExtentList) Put(eh *ExtentHandler)
Put puts a new extent handler into the dirty extent list.
func (*DirtyExtentList) Remove ¶
func (dl *DirtyExtentList) Remove(e *list.Element)
Remove removes the element from the dirty extent list.
type EvictIcacheFunc ¶
type EvictIcacheFunc func(inode uint64)
type EvictRequest ¶
type EvictRequest struct {
// contains filtered or unexported fields
}
EvictRequest defines an evict request.
type ExtentCache ¶
ExtentCache defines the struct of the extent cache.
func NewExtentCache ¶
func NewExtentCache(inode uint64) *ExtentCache
NewExtentCache returns a new extent cache.
func (*ExtentCache) Get ¶
func (cache *ExtentCache) Get(offset uint64) (ret *proto.ExtentKey)
Get returns the extent key based on the given offset.
func (*ExtentCache) List ¶
func (cache *ExtentCache) List() []*proto.ExtentKey
List returns a list of the extents in the cache.
func (*ExtentCache) Max ¶
func (cache *ExtentCache) Max() *proto.ExtentKey
Max returns the max extent key in the cache.
func (*ExtentCache) PrepareReadRequests ¶
func (cache *ExtentCache) PrepareReadRequests(offset, size int, data []byte) []*ExtentRequest
PrepareReadRequests classifies the incoming request.
func (*ExtentCache) PrepareWriteRequests ¶
func (cache *ExtentCache) PrepareWriteRequests(offset, size int, data []byte) []*ExtentRequest
PrepareWriteRequests TODO explain
func (*ExtentCache) Refresh ¶
func (cache *ExtentCache) Refresh(inode uint64, getExtents GetExtentsFunc) error
Refresh refreshes the extent cache.
func (*ExtentCache) RemoveDiscard ¶
func (cache *ExtentCache) RemoveDiscard(discardExtents []proto.ExtentKey)
func (*ExtentCache) SetSize ¶
func (cache *ExtentCache) SetSize(size uint64, sync bool)
SetSize set the size of the cache.
func (*ExtentCache) Size ¶
func (cache *ExtentCache) Size() (size int, gen uint64)
Size returns the size of the cache.
type ExtentClient ¶
type ExtentClient struct {
// contains filtered or unexported fields
}
ExtentClient defines the struct of the extent client.
func NewExtentClient ¶
func NewExtentClient(config *ExtentConfig) (client *ExtentClient, err error)
NewExtentClient returns a new extent client.
func (*ExtentClient) Close ¶ added in v1.5.0
func (client *ExtentClient) Close() error
func (*ExtentClient) CloseStream ¶
func (client *ExtentClient) CloseStream(inode uint64) error
Release request shall grab the lock until request is sent to the request channel
func (*ExtentClient) EvictStream ¶
func (client *ExtentClient) EvictStream(inode uint64) error
Evict request shall grab the lock until request is sent to the request channel
func (*ExtentClient) FileSize ¶
func (client *ExtentClient) FileSize(inode uint64) (size int, gen uint64, valid bool)
FileSize returns the file size.
func (*ExtentClient) Flush ¶
func (client *ExtentClient) Flush(inode uint64) error
func (*ExtentClient) GetRate ¶ added in v1.4.0
func (client *ExtentClient) GetRate() string
func (*ExtentClient) GetStreamer ¶
func (client *ExtentClient) GetStreamer(inode uint64) *Streamer
GetStreamer returns the streamer.
func (*ExtentClient) OpenStream ¶
func (client *ExtentClient) OpenStream(inode uint64) error
Open request shall grab the lock until request is sent to the request channel
func (*ExtentClient) RefreshExtentsCache ¶
func (client *ExtentClient) RefreshExtentsCache(inode uint64) error
RefreshExtentsCache refreshes the extent cache.
func (*ExtentClient) SetFileSize ¶
func (client *ExtentClient) SetFileSize(inode uint64, size int)
SetFileSize set the file size.
func (*ExtentClient) SetReadRate ¶ added in v1.4.0
func (client *ExtentClient) SetReadRate(val int) string
func (*ExtentClient) SetWriteRate ¶ added in v1.4.0
func (client *ExtentClient) SetWriteRate(val int) string
type ExtentConfig ¶
type ExtentConfig struct { Volume string Masters []string FollowerRead bool NearRead bool ReadRate int64 WriteRate int64 OnAppendExtentKey AppendExtentKeyFunc OnGetExtents GetExtentsFunc OnTruncate TruncateFunc OnEvictIcache EvictIcacheFunc }
type ExtentHandler ¶
type ExtentHandler struct {
// contains filtered or unexported fields
}
ExtentHandler defines the struct of the extent handler.
func NewExtentHandler ¶
func NewExtentHandler(stream *Streamer, offset int, storeMode int) *ExtentHandler
NewExtentHandler returns a new extent handler.
func (*ExtentHandler) String ¶
func (eh *ExtentHandler) String() string
String returns the string format of the extent handler.
type ExtentReader ¶
type ExtentReader struct {
// contains filtered or unexported fields
}
ExtentReader defines the struct of the extent reader.
func NewExtentReader ¶
func NewExtentReader(inode uint64, key *proto.ExtentKey, dp *wrapper.DataPartition, followerRead bool) *ExtentReader
NewExtentReader returns a new extent reader.
func (*ExtentReader) Read ¶
func (reader *ExtentReader) Read(req *ExtentRequest) (readBytes int, err error)
Read reads the extent request.
func (*ExtentReader) String ¶
func (reader *ExtentReader) String() (m string)
String returns the string format of the extent reader.
type ExtentRequest ¶
ExtentRequest defines the struct for the request of read or write an extent.
func NewExtentRequest ¶
func NewExtentRequest(offset, size int, data []byte, ek *proto.ExtentKey) *ExtentRequest
NewExtentRequest returns a new extent request.
func (*ExtentRequest) String ¶
func (er *ExtentRequest) String() string
String returns the string format of the extent request.
type FlushRequest ¶
type FlushRequest struct {
// contains filtered or unexported fields
}
FlushRequest defines a flush request.
type GetExtentsFunc ¶
type OpenRequest ¶
type OpenRequest struct {
// contains filtered or unexported fields
}
OpenRequest defines an open request.
type Packet ¶
Packet defines a wrapper of the packet in proto.
func NewCreateExtentPacket ¶
func NewCreateExtentPacket(dp *wrapper.DataPartition, inode uint64) *Packet
NewCreateExtentPacket returns a new packet to create extent.
func NewOverwritePacket ¶
func NewOverwritePacket(dp *wrapper.DataPartition, extentID uint64, extentOffset int, inode uint64, fileOffset int) *Packet
NewOverwritePacket returns a new overwrite packet.
func NewReadPacket ¶
func NewReadPacket(key *proto.ExtentKey, extentOffset, size int, inode uint64, fileOffset int, followerRead bool) *Packet
NewReadPacket returns a new read packet.
func NewWritePacket ¶
NewWritePacket returns a new write packet.
type ReleaseRequest ¶
type ReleaseRequest struct {
// contains filtered or unexported fields
}
ReleaseRequest defines a release request.
type StreamConn ¶
type StreamConn struct {
// contains filtered or unexported fields
}
StreamConn defines the struct of the stream connection.
func NewStreamConn ¶
func NewStreamConn(dp *wrapper.DataPartition, follower bool) *StreamConn
NewStreamConn returns a new stream connection.
func (*StreamConn) Send ¶
func (sc *StreamConn) Send(req *Packet, getReply GetReplyFunc) (err error)
Send send the given packet over the network through the stream connection until success or the maximum number of retries is reached.
func (*StreamConn) String ¶
func (sc *StreamConn) String() string
String returns the string format of the stream connection.
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
One inode corresponds to one streamer. All the requests to the same inode will be queued. TODO rename streamer here is not a good name as it also handles overwrites, not just stream write.
func NewStreamer ¶
func NewStreamer(client *ExtentClient, inode uint64) *Streamer
NewStreamer returns a new streamer.
func (*Streamer) GetExtentReader ¶
func (s *Streamer) GetExtentReader(ek *proto.ExtentKey) (*ExtentReader, error)
GetExtentReader returns the extent reader. TODO: use memory pool
func (*Streamer) GetExtents ¶
TODO should we call it RefreshExtents instead?
func (*Streamer) IssueEvictRequest ¶
func (*Streamer) IssueFlushRequest ¶
func (*Streamer) IssueOpenRequest ¶
Open request shall grab the lock until request is sent to the request channel
func (*Streamer) IssueReleaseRequest ¶
func (*Streamer) IssueTruncRequest ¶
func (*Streamer) IssueWriteRequest ¶
type TruncRequest ¶
type TruncRequest struct {
// contains filtered or unexported fields
}
TruncRequest defines a truncate request.
type TruncateFunc ¶
type WriteRequest ¶
type WriteRequest struct {
// contains filtered or unexported fields
}
WriteRequest defines a write request.