range_partition

package
v0.0.0-...-e603270 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2023 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

Index

Constants

View Source
const (
	BitDelete       byte = 1 << 0    // Set if the key has been deleted.
	BitValuePointer byte = 1 << 1    // Set if the value is NOT stored directly next to key.
	ValueThrottle        = (4 << 10) // 4 * KB
)
View Source
const (
	KB = 1024
	MB = KB * 1024
	GB = MB * 1024
)
View Source
const (
	MAX_GC_ONCE = 3
)

Variables

View Source
var (
	ErrBlockedWrites = errors.New("Writes are blocked, possibly due to DropAll or Close")
)

Functions

func ShouldWriteValueToLSM

func ShouldWriteValueToLSM(e *Entry) bool

Types

type CommitEnds

type CommitEnds struct {
	LogEnd  uint32
	RowEnd  uint32
	MetaEnd uint32
}

type DefaultPickupPolicy

type DefaultPickupPolicy struct {
	// contains filtered or unexported fields
}

func (DefaultPickupPolicy) PickupTables

func (p DefaultPickupPolicy) PickupTables(tbls []*table.Table, maxCapacity uint64) ([]*table.Table, uint64)

In size-tiered compaction, newer and smaller SSTables are successively merged into older and larger SSTables

type Entry

type Entry struct {
	Key       []byte //slice of inner
	Value     []byte //slice of inner
	ExpiresAt uint64 //copy from inner
	Meta      uint32 //copy from inner

	//used in memory, not in log
	ExtentID uint64
	Offset   uint32
	End      uint32
	// contains filtered or unexported fields
}

func DecodeEntry

func DecodeEntry(data []byte) (*Entry, error)

func NewDeleteEntry

func NewDeleteEntry(userKey []byte) *Entry

func NewPutEntry

func NewPutEntry(userKey []byte, expireAt uint64, lenOfValue uint32) *Entry

func NewPutKVEntry

func NewPutKVEntry(k, v []byte, expireAt uint64) *Entry

func (*Entry) Decode

func (entry *Entry) Decode() error

func (*Entry) Encode

func (entry *Entry) Encode() []byte

func (*Entry) FinishWrite

func (entry *Entry) FinishWrite() error

func (*Entry) Format

func (entry *Entry) Format() string

func (*Entry) Size

func (entry *Entry) Size() int

func (*Entry) UpdateTS

func (entry *Entry) UpdateTS(ts uint64)

func (*Entry) Valid

func (entry *Entry) Valid() bool

func (*Entry) WriteValue

func (entry *Entry) WriteValue(d []byte) error

type GcTask

type GcTask struct {
	ForceGC bool
	ExIDs   []uint64 //if not forceGC, pickup will choose automatically
}

type MemTable

type MemTable struct {
	*skiplist.Skiplist
	/*
		this OriginDiscard will be written to SST's discard field
		when flushed. it means this key-value pair will be discarded
		when GC happens.
	*/
	OriginDiscard map[uint64]int64
}

func NewMemTable

func NewMemTable(capacity int64) *MemTable

type Option

type Option struct {
	MaxSkipList          int64 //max size of skiplist, when compacting, the max is 2 * MaxSkipList
	WriteChCapacity      int   //write channel length
	MustSync             bool
	MaxExtentSize        uint32
	CompressionType      table.CompressionType
	AssertKeys           bool
	MaxUnCommitedLogSize uint64
}

type OptionFunc

type OptionFunc func(*Option)

func DefaultOption

func DefaultOption() OptionFunc

func MaxExtentSize

func MaxExtentSize(n uint32) OptionFunc

func TestOption

func TestOption() OptionFunc

func WithAssertKeys

func WithAssertKeys() OptionFunc

func WithCompression

func WithCompression(codec string) OptionFunc

func WithMaxSkipList

func WithMaxSkipList(n int64) OptionFunc

func WithMaxUnCommitedLogSize

func WithMaxUnCommitedLogSize(n uint64) OptionFunc

MaxUnCommitedLogSize is the max size of uncommitted log, the unit is Byte

func WithSync

func WithSync(b bool) OptionFunc

func WriteChCapacity

func WriteChCapacity(n int) OptionFunc

type PickupTables

type PickupTables interface {
	PickupTables(tbls []*table.Table, maxCapacity uint64) (tables []*table.Table, eID uint64)
}

given all tables in range partition, if eID is not 0, caller can call stream.Truncate(eID) to truncate the stream, in our system, every extentID is bigger than 0. RETURN: if tables is nil, do not compact if table's estimated size is near maxCapacity, minor compaction policy will not choose this table

type RangePartition

type RangePartition struct {
	utils.SafeMutex //protect mt,imm when swapping mt, imm

	PartID   uint64
	StartKey []byte
	EndKey   []byte
	// contains filtered or unexported fields
}

func OpenRangePartition

func OpenRangePartition(id uint64, metaStream streamclient.StreamClient, rowStream streamclient.StreamClient,
	logStream streamclient.StreamClient,
	startKey []byte, endKey []byte,
	opts ...OptionFunc,
) (*RangePartition, error)

func (*RangePartition) CanSplit

func (rp *RangePartition) CanSplit() error

split相关, 提供相关参数给上层 ADD more policy here, to support different Split policy

func (*RangePartition) CheckTableOrder

func (rp *RangePartition) CheckTableOrder(out []*table.Table)

检查是否所有tables全局有序, 否则panic

func (*RangePartition) Close

func (rp *RangePartition) Close() error

func (*RangePartition) Connect

func (rp *RangePartition) Connect() error

read metaStream, connect commitlog, rowDataStreamand blobDataStream

func (*RangePartition) Delete

func (rp *RangePartition) Delete(key []byte) error

func (*RangePartition) Get

func (rp *RangePartition) Get(userKey []byte) ([]byte, error)

func (*RangePartition) GetSplitPoint

func (rp *RangePartition) GetSplitPoint() []byte

split相关, 提供相关参数给上层 ADD more policy here, to support different Split policy

func (*RangePartition) Head

func (rp *RangePartition) Head(userKey []byte) (*pspb.HeadInfo, error)

func (*RangePartition) IsUserKeyInRange

func (rp *RangePartition) IsUserKeyInRange(userKey []byte) bool

func (*RangePartition) LogRowMetaStreamEnd

func (rp *RangePartition) LogRowMetaStreamEnd() CommitEnds

func (*RangePartition) Range

func (rp *RangePartition) Range(prefix []byte, start []byte, limit uint32) [][]byte

func (*RangePartition) SubmitCompaction

func (rp *RangePartition) SubmitCompaction() error

submit a major compaction task

func (*RangePartition) SubmitGC

func (rp *RangePartition) SubmitGC(task GcTask) error

func (*RangePartition) Write

func (rp *RangePartition) Write(k, v []byte) error

func (*RangePartition) WriteAsync

func (rp *RangePartition) WriteAsync(k, v []byte, f func(error))

func (*RangePartition) WriteEntries

func (rp *RangePartition) WriteEntries(entries []*Entry) error

func (*RangePartition) WriteEntryAsync

func (rp *RangePartition) WriteEntryAsync(e *Entry, f func(error))

Directories

Path Synopsis
y

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL