stream

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2019 License: Apache-2.0 Imports: 20 Imported by: 3

Documentation

Index

Constants

View Source
const (
	MaxMountRetryLimit = 5
	MountRetryInterval = time.Second * 5
)
View Source
const (
	ExtentStatusOpen int32 = iota
	ExtentStatusClosed
	ExtentStatusRecovery
	ExtentStatusError
)

State machines

View Source
const (
	StreamSendMaxRetry      = 200
	StreamSendSleepInterval = 100 * time.Millisecond
)
View Source
const (
	MaxSelectDataPartitionForWrite = 32
	MaxNewHandlerRetry             = 3
	MaxPacketErrorCount            = 32
	MaxDirtyListLen                = 0
)
View Source
const (
	StreamerNormal int32 = iota
	StreamerError
)

Variables

View Source
var (
	StreamConnPool = util.NewConnectPool()
)
View Source
var (
	TryOtherAddrError = errors.New("TryOtherAddrError")
)

Functions

func GetExtentHandlerID

func GetExtentHandlerID() uint64

GetExtentHandlerID returns the extent handler ID.

Types

type AppendExtentKeyFunc

type AppendExtentKeyFunc func(inode uint64, key proto.ExtentKey) error

type DirtyExtentList

type DirtyExtentList struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

DirtyExtentList defines the struct of the dirty extent list.

func NewDirtyExtentList

func NewDirtyExtentList() *DirtyExtentList

NewDirtyExtentList returns a new DirtyExtentList instance.

func (*DirtyExtentList) Get

func (dl *DirtyExtentList) Get() *list.Element

Get gets the next element in the dirty extent list.

func (*DirtyExtentList) Len

func (dl *DirtyExtentList) Len() int

Len returns the size of the dirty extent list.

func (*DirtyExtentList) Put

func (dl *DirtyExtentList) Put(eh *ExtentHandler)

Put puts a new extent handler into the dirty extent list.

func (*DirtyExtentList) Remove

func (dl *DirtyExtentList) Remove(e *list.Element)

Remove removes the element from the dirty extent list.

type EvictRequest

type EvictRequest struct {
	// contains filtered or unexported fields
}

EvictRequest defines an evict request.

type ExtentCache

type ExtentCache struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

ExtentCache defines the struct of the extent cache.

func NewExtentCache

func NewExtentCache(inode uint64) *ExtentCache

NewExtentCache returns a new extent cache.

func (*ExtentCache) Append

func (cache *ExtentCache) Append(ek *proto.ExtentKey, sync bool)

Append appends an extent key.

func (*ExtentCache) Get

func (cache *ExtentCache) Get(offset uint64) (ret *proto.ExtentKey)

Get returns the extent key based on the given offset.

func (*ExtentCache) List

func (cache *ExtentCache) List() []*proto.ExtentKey

List returns a list of the extents in the cache.

func (*ExtentCache) Max

func (cache *ExtentCache) Max() *proto.ExtentKey

Max returns the max extent key in the cache.

func (*ExtentCache) PrepareReadRequests

func (cache *ExtentCache) PrepareReadRequests(offset, size int, data []byte) []*ExtentRequest

PrepareReadRequests classifies the incoming request.

func (*ExtentCache) PrepareWriteRequests

func (cache *ExtentCache) PrepareWriteRequests(offset, size int, data []byte) []*ExtentRequest

PrepareWriteRequests TODO explain

func (*ExtentCache) Refresh

func (cache *ExtentCache) Refresh(inode uint64, getExtents GetExtentsFunc) error

Refresh refreshes the extent cache.

func (*ExtentCache) SetSize

func (cache *ExtentCache) SetSize(size uint64, sync bool)

SetSize set the size of the cache.

func (*ExtentCache) Size

func (cache *ExtentCache) Size() (size int, gen uint64)

Size returns the size of the cache.

type ExtentClient

type ExtentClient struct {
	// contains filtered or unexported fields
}

ExtentClient defines the struct of the extent client.

func NewExtentClient

func NewExtentClient(volname, master string, readRate, writeRate int64, appendExtentKey AppendExtentKeyFunc, getExtents GetExtentsFunc, truncate TruncateFunc) (client *ExtentClient, err error)

NewExtentClient returns a new extent client.

func (*ExtentClient) CloseStream

func (client *ExtentClient) CloseStream(inode uint64) error

Release request shall grab the lock until request is sent to the request channel

func (*ExtentClient) EvictStream

func (client *ExtentClient) EvictStream(inode uint64) error

Evict request shall grab the lock until request is sent to the request channel

func (*ExtentClient) FileSize

func (client *ExtentClient) FileSize(inode uint64) (size int, gen uint64, valid bool)

FileSize returns the file size.

func (*ExtentClient) Flush

func (client *ExtentClient) Flush(inode uint64) error

func (*ExtentClient) GetRate added in v1.4.0

func (client *ExtentClient) GetRate() string

func (*ExtentClient) GetStreamer

func (client *ExtentClient) GetStreamer(inode uint64) *Streamer

GetStreamer returns the streamer.

func (*ExtentClient) OpenStream

func (client *ExtentClient) OpenStream(inode uint64) error

Open request shall grab the lock until request is sent to the request channel

func (*ExtentClient) Read

func (client *ExtentClient) Read(inode uint64, data []byte, offset int, size int) (read int, err error)

func (*ExtentClient) RefreshExtentsCache

func (client *ExtentClient) RefreshExtentsCache(inode uint64) error

RefreshExtentsCache refreshes the extent cache.

func (*ExtentClient) SetFileSize

func (client *ExtentClient) SetFileSize(inode uint64, size int)

SetFileSize set the file size.

func (*ExtentClient) SetReadRate added in v1.4.0

func (client *ExtentClient) SetReadRate(val int) string

func (*ExtentClient) SetWriteRate added in v1.4.0

func (client *ExtentClient) SetWriteRate(val int) string

func (*ExtentClient) Truncate

func (client *ExtentClient) Truncate(inode uint64, size int) error

func (*ExtentClient) Write

func (client *ExtentClient) Write(inode uint64, offset int, data []byte, direct bool) (write int, err error)

Write writes the data.

type ExtentHandler

type ExtentHandler struct {
	// contains filtered or unexported fields
}

ExtentHandler defines the struct of the extent handler.

func NewExtentHandler

func NewExtentHandler(stream *Streamer, offset int, storeMode int) *ExtentHandler

NewExtentHandler returns a new extent handler.

func (*ExtentHandler) String

func (eh *ExtentHandler) String() string

String returns the string format of the extent handler.

type ExtentReader

type ExtentReader struct {
	// contains filtered or unexported fields
}

ExtentReader defines the struct of the extent reader.

func NewExtentReader

func NewExtentReader(inode uint64, key *proto.ExtentKey, dp *wrapper.DataPartition, followerRead bool) *ExtentReader

NewExtentReader returns a new extent reader.

func (*ExtentReader) Read

func (reader *ExtentReader) Read(req *ExtentRequest) (readBytes int, err error)

Read reads the extent request.

func (*ExtentReader) String

func (reader *ExtentReader) String() (m string)

String returns the string format of the extent reader.

type ExtentRequest

type ExtentRequest struct {
	FileOffset int
	Size       int
	Data       []byte
	ExtentKey  *proto.ExtentKey
}

ExtentRequest defines the struct for the request of read or write an extent.

func NewExtentRequest

func NewExtentRequest(offset, size int, data []byte, ek *proto.ExtentKey) *ExtentRequest

NewExtentRequest returns a new extent request.

func (*ExtentRequest) String

func (er *ExtentRequest) String() string

String returns the string format of the extent request.

type FlushRequest

type FlushRequest struct {
	// contains filtered or unexported fields
}

FlushRequest defines a flush request.

type GetExtentsFunc

type GetExtentsFunc func(inode uint64) (uint64, uint64, []proto.ExtentKey, error)

type GetReplyFunc

type GetReplyFunc func(conn *net.TCPConn) (err error, again bool)

type OpenRequest

type OpenRequest struct {
	// contains filtered or unexported fields
}

OpenRequest defines an open request.

type Packet

type Packet struct {
	proto.Packet
	// contains filtered or unexported fields
}

Packet defines a wrapper of the packet in proto.

func NewCreateExtentPacket

func NewCreateExtentPacket(dp *wrapper.DataPartition, inode uint64) *Packet

NewCreateExtentPacket returns a new packet to create extent.

func NewOverwritePacket

func NewOverwritePacket(dp *wrapper.DataPartition, extentID uint64, extentOffset int, inode uint64, fileOffset int) *Packet

NewOverwritePacket returns a new overwrite packet.

func NewReadPacket

func NewReadPacket(key *proto.ExtentKey, extentOffset, size int, inode uint64, fileOffset int, followerRead bool) *Packet

NewReadPacket returns a new read packet.

func NewReply

func NewReply(reqID int64, partitionID uint64, extentID uint64) *Packet

NewReply returns a new reply packet. TODO rename to NewReplyPacket?

func NewWritePacket

func NewWritePacket(inode uint64, fileOffset, storeMode int) *Packet

NewWritePacket returns a new write packet.

func (*Packet) String

func (p *Packet) String() string

String returns the string format of the packet.

type ReleaseRequest

type ReleaseRequest struct {
	// contains filtered or unexported fields
}

ReleaseRequest defines a release request.

type StreamConn

type StreamConn struct {
	// contains filtered or unexported fields
}

StreamConn defines the struct of the stream connection.

func NewStreamConn

func NewStreamConn(dp *wrapper.DataPartition, follower bool) *StreamConn

NewStreamConn returns a new stream connection.

func (*StreamConn) Send

func (sc *StreamConn) Send(req *Packet, getReply GetReplyFunc) (err error)

Send send the given packet over the network through the stream connection until success or the maximum number of retries is reached.

func (*StreamConn) String

func (sc *StreamConn) String() string

String returns the string format of the stream connection.

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

One inode corresponds to one streamer. All the requests to the same inode will be queued. TODO rename streamer here is not a good name as it also handles overwrites, not just stream write.

func NewStreamer

func NewStreamer(client *ExtentClient, inode uint64) *Streamer

NewStreamer returns a new streamer.

func (*Streamer) GetExtentReader

func (s *Streamer) GetExtentReader(ek *proto.ExtentKey) (*ExtentReader, error)

GetExtentReader returns the extent reader. TODO: use memory pool

func (*Streamer) GetExtents

func (s *Streamer) GetExtents() error

TODO should we call it RefreshExtents instead?

func (*Streamer) IssueEvictRequest

func (s *Streamer) IssueEvictRequest() error

func (*Streamer) IssueFlushRequest

func (s *Streamer) IssueFlushRequest() error

func (*Streamer) IssueOpenRequest

func (s *Streamer) IssueOpenRequest() error

Open request shall grab the lock until request is sent to the request channel

func (*Streamer) IssueReleaseRequest

func (s *Streamer) IssueReleaseRequest() error

func (*Streamer) IssueTruncRequest

func (s *Streamer) IssueTruncRequest(size int) error

func (*Streamer) IssueWriteRequest

func (s *Streamer) IssueWriteRequest(offset int, data []byte, direct bool) (write int, err error)

func (*Streamer) String

func (s *Streamer) String() string

String returns the string format of the streamer.

type TruncRequest

type TruncRequest struct {
	// contains filtered or unexported fields
}

TruncRequest defines a truncate request.

type TruncateFunc

type TruncateFunc func(inode, size uint64) error

type WriteRequest

type WriteRequest struct {
	// contains filtered or unexported fields
}

WriteRequest defines a write request.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL