vfsindex

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2020 License: Apache-2.0 Imports: 28 Imported by: 1

README

vfs-index

vfs-index is simple indexer for simple data collection(json , csv, msgpack...) on OS's virtual file system. this indexer is not requred daemon process, no lock.

$ cat test.json
[
  {
    "id": 130988433,
    "name": "2011_04_24-1.m4v"
  },
  {
    "id": 130988434,
    "name": "2011_04_24-2.mp4"
  },
  {
    "id": 130988435,
    "name": "2011_04_24-3.mp4"
  },

this data is bigger ( 100 milion row ?..) if you search jq, very slow.

indexing this data. if indexing stop by ctrl-c , indexing will continue on run next index command.

$ go get github.com/kazu/vfs-index/cmd/vfs-index
$ vfs-index index --index=../idx --column=name --table=test --data=./
100% |██████████████████████████████████████████████████| (2316/2316, 2072 it/s) [1s:0s]
$

search data

$ vfs-index search  -q='name.search("2011_04")' --index=../idx --column=name --table=test --data=./
{"id":130988433,"name":"2011_04_24-1.m4v"}
{"id":130988434,"name":"2011_04_24-2.mp4"}
{"id":130988435,"name":"2011_04_24-3.mp4"}
index merging done [==============================================================] 67507 / 67507

search by number attributes/field

$ vfs-index search  -q='id == 130988433' --index=../idx --column=name --table=test --data=./
{"id":130988433,"name":"2011_04_24-1.m4v"}

merge index

$ vfs-index merge --index=../idx --column=name --table=test --data=./
index merging done [==============================================================] 67507 / 67507

support query.

if attribute/field is number, can select by Arithmetic comparison. support comparation ops. == >= < <= > .

$ vfs-index search -q='id == 130988471' --index=../idx --table=test --data=./

string search

$ vfs-index search -q='name.search("フロンターレ")' --index=../idx --table=test --data=./

use in golang.

import package

import vfs "github.com/kazu/vfs-index"

indexing

func DefaultOption() vfs.Option {
	return vfs.RootDir("/Users/xtakei/vfs-idx")
}


idx, e := vfs.Open("/Users/xtakei/example/data", DefaultOption())
e = idx.Regist("test", "id")

searching

func DefaultOption() vfs.Option {
	return vfs.RootDir("/Users/xtakei/vfs-idx")
}

// search number index
idx, e := vfs.Open("/Users/xtakei/example/data", DefaultOption())
sCond := idx.On("test", vfs.ReaderColumn("id"), vfs.Output(vfs.MapInfOutput))

record := sCond.Select(func(m vfs.SearchCondElem2) bool {
    return m.Op("id", "<",  122878513)
}).First()

// search by matching substring
sCondName := idx.On("test", vfs.ReaderColumn("name"), vfs.Output(vfs.MapInfOutput))
matches2 := sCondName.Match("ロシア人").All()

index merging stop after 1 minutes.


idx, e := vfs.Open("/Users/xtakei/example/data", DefaultOption())
sCond := idx.On("test", vfs.ReaderColumn("id"), vfs.Output(vfs.MapInfOutput))
sCond.StartMerging()
time.Sleep(1 * time.Minute)
sCond.CancelAndWait()

TODO

  • write file list
  • read file list
  • write number column index
  • read number column index
  • write tri-gram clumn index
  • support string search
  • support index merging
  • add comment
  • csv support
  • msgpack support
  • support compression data ( lz4(?) )

Documentation

Index

Constants

View Source
const (
	RECORD_WRITING byte = iota
	RECORD_WRITTEN
	RECORD_MERGING
	RECORD_MERGED
)
View Source
const (
	JsonOutput = iota
	MapInfOutput
)
View Source
const (
	KeyStateGot int = 1
	// KeyStateRun int = 2
	// KeyStateFlase int = 3
	KeyStateFinish int = 4
)
View Source
const (
	MAX_IDX_CACHE      = 512
	MIN_NEGATIVE_CACHE = 8
)
View Source
const (
	RECORDS_INIT = 64
)

Variables

View Source
var (
	ErrInvalidTableName = errors.New("table name is invalid")
	ErrNotFoundFile     = errors.New("file not found")
	ErrInvalidIdxName   = errors.New("idx file name is invalid")
	ErrNotHasColumn     = errors.New("record dosent have this column")
	ErrNotIndexDir      = errors.New("Indexfile must be Index top directory")
	ErrStopTraverse     = errors.New("stop traverse")

	ErrParameterInvalid = errors.New("parameter invalid")
	ErrNotSupported     = errors.New("not supported")

	ErrMustCsvHeader = errors.New("not set csv header")
)
View Source
var CsvHeader string
View Source
var DefaultDecoder []Decoder = []Decoder{
	Decoder{
		FileType: "csv",
		Encoder: func(v interface{}) ([]byte, error) {
			return json.Marshal(v)
		},
		Decoder: func(raw []byte, v interface{}) error {

			return nil
		},
		Tokenizer: func(ctx context.Context, rio io.Reader, f *File) <-chan *Record {
			ch := make(chan *Record, 5)

			go func() {
				buf, err := ioutil.ReadAll(rio)

				if err != nil {
					defer close(ch)

				}

				s := string(buf)

				lines := strings.Split(s, "\n")
				CsvHeader = lines[0]
				lines = lines[1:]
				cur := len(CsvHeader) + 1
				for _, line := range lines {
					ch <- &Record{fileID: f.id, offset: int64(cur), size: int64(len(line))}
					cur += len(line) + 1
				}
				close(ch)
			}()
			return ch
		},
	},
	Decoder{
		FileType: "json",
		Encoder: func(v interface{}) ([]byte, error) {
			b, e := json.Marshal(v)
			if e != nil {
				return b, e
			}
			var out bytes.Buffer
			json.Indent(&out, b, "", "\t")
			return out.Bytes(), e

		},
		Decoder: func(raw []byte, v interface{}) error {
			e := json.Unmarshal(raw, v)
			if e != nil {
				return e
			}
			if value, ok := v.(*(map[string]interface{})); ok {
				for key, v := range *value {
					if f64, ok := v.(float64); ok {
						(*value)[key] = uint64(f64)
					}
				}
			}
			return nil

		},
		Tokenizer: func(ctx context.Context, rio io.Reader, f *File) <-chan *Record {
			ch := make(chan *Record, 100)
			go func() {
				dec := json.NewDecoder(rio)

				var rec *Record

				nest := int(0)
				defer close(ch)
				for {
					token, err := dec.Token()
					if err == io.EOF {
						break
					}
					switch token {
					case json.Delim('{'):
						nest++
						if nest == 1 {
							rec = &Record{fileID: f.id, offset: dec.InputOffset() - 1}
						}

					case json.Delim('}'):
						nest--
						if nest == 0 {
							rec.size = dec.InputOffset() - rec.offset
							ch <- rec
						}
					}
					select {
					case <-ctx.Done():
						return
					default:
					}
				}

			}()
			return ch
		},
	},
}
View Source
var GGlobCache map[string][]string = map[string][]string{}
View Source
var LogWriter io.StringWriter = os.Stderr
View Source
var Opt optionState = optionState{
	// contains filtered or unexported fields
}
View Source
var StringOp map[string]CondOp = map[string]CondOp{
	"==": CondOpEq,
	"<=": CondOpLe,
	"<":  CondOpLt,
	">=": CondOpGe,
	">":  CondOpGe,
}
View Source
var ZERO_TIME time.Time = time.Time{}

Functions

func AddingDir

func AddingDir(s string, n int) string

func ColumnPath

func ColumnPath(tdir, col string, isNum bool) string

func ColumnPathWithStatus

func ColumnPathWithStatus(tdir, col string, isNum bool, s, e string, status byte) string

func EmptySkip

func EmptySkip(o map[int]bool) map[int]bool

func EncodeTri

func EncodeTri(s string) (result []string)

func FileExist

func FileExist(filename string) bool

func FileListPath

func FileListPath(tabledir string) string

func FileListPathWithAdding

func FileListPathWithAdding(tabledir string, s, e uint64, usePid bool) string

func FileMtime

func FileMtime(filename string) time.Time

func GetInode

func GetInode(info os.FileInfo) uint64

func InitIdxCaches

func InitIdxCaches(i *IdxCaches)

func JoinExt

func JoinExt(s ...string) string

func LessEqString

func LessEqString(s, d string) (isLess bool)

func Log

func Log(l LogLevel, f string, args ...interface{})

func SafeRename

func SafeRename(src, dst string) error

func TriKeys

func TriKeys(s string) (result []uint64)

func TrimFilePathSuffix

func TrimFilePathSuffix(path string) string

Types

type BufWriterIO

type BufWriterIO struct {
	*os.File
	// contains filtered or unexported fields
}

func NewBufWriterIO

func NewBufWriterIO(o *os.File, n int) *BufWriterIO

func (*BufWriterIO) Flush

func (b *BufWriterIO) Flush() (e error)

func (*BufWriterIO) Write

func (b *BufWriterIO) Write(p []byte) (n int, e error)

func (*BufWriterIO) WriteAt

func (b *BufWriterIO) WriteAt(p []byte, offset int64) (n int, e error)

type Column

type Column struct {
	Table   string
	Name    string
	Dir     string
	Flist   *FileList
	IsNum   bool
	Dirties Records
	// contains filtered or unexported fields
}

func NewColumn

func NewColumn(flist *FileList, table, col string) *Column

func (*Column) IsNumViaIndex

func (c *Column) IsNumViaIndex() bool

func (*Column) Key2Path

func (c *Column) Key2Path(key uint64, state byte) string

func (*Column) MergingIndex

func (c *Column) MergingIndex(ctx context.Context) error

func (*Column) Path

func (c *Column) Path() string

func (*Column) RecordEqInt

func (c *Column) RecordEqInt(v int) (record *Record)

func (*Column) TableDir

func (c *Column) TableDir() string

func (*Column) Update

func (c *Column) Update(d time.Duration) error

func (*Column) WriteDirties

func (c *Column) WriteDirties()

type CondFn

type CondFn func(f *IndexFile) CondType

type CondOp

type CondOp byte
const (
	CondOpEq CondOp = iota
	CondOpLe
	CondOpLt
	CondOpGe
	CondOpGt
)

type CondType

type CondType byte
const (
	CondTrue CondType = iota
	CondSkip
	CondFalse
	CondLazy
)

type Decoder

type Decoder struct {
	FileType  string
	Decoder   func([]byte, interface{}) error
	Encoder   func(interface{}) ([]byte, error)
	Tokenizer func(context.Context, io.Reader, *File) <-chan *Record
}

func GetDecoder

func GetDecoder(fname string) (dec Decoder, e error)

GetDecoder ... return format Decoder/Encoder from fname(file name)

type File

type File struct {
	// contains filtered or unexported fields
}

func FileFromFbs

func FileFromFbs(r io.Reader) *File

func NewFile

func NewFile(id uint64, name string, index_at int64) *File

func (*File) Records

func (f *File) Records(ctx context.Context, dir string) <-chan *Record

FIXME: support other format

func (*File) ToFbs

func (f *File) ToFbs(l *FileList) []byte

func (*File) Write

func (f *File) Write(l *FileList) error

type FileList

type FileList struct {
	Dir       string
	IndexedAt time.Time
	Files     []*File
}

func CreateFileList

func CreateFileList(tdir string) (flist *FileList, err error)

func OpenFileList

func OpenFileList(tdir string) (flist *FileList)

func (*FileList) FPath

func (l *FileList) FPath(id uint64) (path string, e error)

func (*FileList) Reload

func (l *FileList) Reload() error

func (*FileList) Store

func (l *FileList) Store()

func (*FileList) Update

func (flist *FileList) Update()

type GetCol

type GetCol func() *Column

type GetColumn

type GetColumn func() *Column

type GetValue

type GetValue func(string, CondOp) *SearchFinder

type GetValue2

type GetValue2 func(string, CondOp) *SearchFinder2

type GlobCache

type GlobCache struct {
	Keys  []string
	ReqCh chan GlobRequest
	// contains filtered or unexported fields
}

func (*GlobCache) Add

func (g *GlobCache) Add(key, value string)

func (*GlobCache) Finish

func (g *GlobCache) Finish(key string)

func (*GlobCache) Get

func (g *GlobCache) Get(pat string) *query.PathInfoList

func (*GlobCache) GetCh

func (g *GlobCache) GetCh(pat string) <-chan string

func (*GlobCache) PrepareRead

func (g *GlobCache) PrepareRead(pat string) *query.PathInfoList

func (GlobCache) Run

func (g GlobCache) Run(key string)

func (*GlobCache) Start

func (g *GlobCache) Start() error

type GlobRequest

type GlobRequest struct {
	// contains filtered or unexported fields
}

type IdxCache

type IdxCache struct {
	FirstEnd Range
	Pos      RecordPos
}

type IdxCaches

type IdxCaches struct {
	// contains filtered or unexported fields
}

func NewIdxCaches

func NewIdxCaches() *IdxCaches

type IdxInfo

type IdxInfo struct {
	// contains filtered or unexported fields
}

type IdxPathInfo

type IdxPathInfo string

func (IdxPathInfo) Greater

func (s IdxPathInfo) Greater(d IdxPathInfo) bool

func (IdxPathInfo) Info

func (p IdxPathInfo) Info() (col string, isNum bool, first, last, fileID uint64, offset int64)

func (IdxPathInfo) IsMerged

func (p IdxPathInfo) IsMerged() bool

func (IdxPathInfo) Less

func (s IdxPathInfo) Less(d IdxPathInfo) bool

col, isNum, first, last, fileID , offset

func (IdxPathInfo) TDir

func (p IdxPathInfo) TDir() string

func (IdxPathInfo) Table

func (p IdxPathInfo) Table() string

type IdxWriter

type IdxWriter struct {
	IsNum        bool
	ValueEncoder func(r *Record) []string
}

type IndexFile

type IndexFile struct {
	Path  string
	Ftype IndexFileType
	// contains filtered or unexported fields
}

IndexFile ... file entity of index file in index table directories

func ListMergedIndex added in v0.1.2

func ListMergedIndex(c *Column, fn CondFn, opts ...SelectOption) (result []*IndexFile)

func NewIndexFile

func NewIndexFile(c *Column, path string) *IndexFile

func OpenIndexFile

func OpenIndexFile(c *Column) (idxFile *IndexFile)

func (*IndexFile) FindByKey

func (f *IndexFile) FindByKey(key uint64) (result []*IndexFile)

func (*IndexFile) FindNearByKey

func (f *IndexFile) FindNearByKey(key uint64, less bool) (results []*IndexFile)

func (*IndexFile) First

func (f *IndexFile) First() *IndexFile

First ... Find first IndexFile.

func (*IndexFile) FirstRecord

func (f *IndexFile) FirstRecord() *Record

func (*IndexFile) IdxInfo

func (f *IndexFile) IdxInfo() IndexPathInfo

func (*IndexFile) Init

func (f *IndexFile) Init()

func (*IndexFile) IsType

func (f *IndexFile) IsType(t IndexFileType) bool

func (*IndexFile) KeyRecord

func (f *IndexFile) KeyRecord() (result *query.InvertedMapNum)

func (*IndexFile) KeyRecords

func (f *IndexFile) KeyRecords() *query.KeyRecordList

func (*IndexFile) Last

func (f *IndexFile) Last() *IndexFile

First ... Find first IndexFile.

func (*IndexFile) LastRecord

func (f *IndexFile) LastRecord() *Record

func (*IndexFile) RecordByKey

func (f *IndexFile) RecordByKey(key uint64) RecordFn

func (*IndexFile) RecordNearByKey

func (f *IndexFile) RecordNearByKey(key uint64, less bool) RecordFn

func (*IndexFile) Select

func (f *IndexFile) Select(opts ...SelectOption) (err error)

type IndexFileType

type IndexFileType int
const (
	IdxFileType_None IndexFileType = 0
	IdxFileType_Dir  IndexFileType = 1 << iota
	IdxFileType_Merge
	IdxFileType_Write
	IdxFileType_MyColum
	IdxFileType_NoComplete
)

type IndexPathInfo

type IndexPathInfo struct {
	// contains filtered or unexported fields
}

func NewIndexInfo

func NewIndexInfo(fileID uint64, offset int64, first uint64, last uint64) IndexPathInfo

type Indexer

type Indexer struct {
	Root string
	Cols map[string]*Column
	// contains filtered or unexported fields
}

func Open

func Open(dpath string, opts ...Option) (*Indexer, error)

Open ... open index. dpath is data directory,

func (*Indexer) On

func (idx *Indexer) On(table string, opts ...Option) *SearchCond

On ... return SearchCond(Search Element) , table is table name, column is set by ReaderColumn("column name")

func (*Indexer) OpenCol

func (idx *Indexer) OpenCol(flist *FileList, table, col string) *Column

func (*Indexer) Regist

func (idx *Indexer) Regist(table, col string) error

Regist ... indexing specified table , col (column)

type LogLevel

type LogLevel int
const (
	LOG_ERROR LogLevel = iota
	LOG_WARN
	LOG_DEBUG
)
var CurrentLogLoevel LogLevel = LOG_DEBUG

type Option

type Option func(*optionState)

type ReaderOpt map[string]string

func MergeDuration

func MergeDuration(d time.Duration) Option

func MergeOnSearch

func MergeOnSearch(enable bool) Option

MergeOnSearch ... enable to merge index on search

func Output

func Output(t Outputer) Option

func ReaderColumn

func ReaderColumn(s string) Option

ReaderColumn ... config for columname for search/read

func RegitConcurrent

func RegitConcurrent(n int) Option

func RootDir

func RootDir(s string) Option

RootDir ... set index top directory

type Outputer

type Outputer byte

type ProgressBar

type ProgressBar struct {
	// contains filtered or unexported fields
}

func NewProgressBar

func NewProgressBar(opts ...mpb.ContainerOption) (bar ProgressBar)

func (*ProgressBar) Add

func (p *ProgressBar) Add(name string, total int) (bar *mpb.Bar)

func (*ProgressBar) Done

func (p *ProgressBar) Done()

type Range

type Range struct {
	// contains filtered or unexported fields
}

type RangeCur

type RangeCur struct {
	Range
	// contains filtered or unexported fields
}

type Record

type Record struct {
	// contains filtered or unexported fields
}

func NewRecord

func NewRecord(id uint64, offset, size int64) *Record

func RecordFromFbs

func RecordFromFbs(r io.Reader) *Record

func (*Record) IsExist

func (r *Record) IsExist(c *Column) bool

func (*Record) Raw

func (r *Record) Raw(c *Column) (data []byte)

func (*Record) StrValue

func (r *Record) StrValue(c *Column) string

func (*Record) ToFbs

func (r *Record) ToFbs(inf interface{}) []byte

func (*Record) Uint64Value

func (r *Record) Uint64Value(c *Column) uint64

func (*Record) Write

func (r *Record) Write(c *Column) error

Write ... write column index

type RecordFn

type RecordFn func(map[int]bool) []*query.Record

type RecordPos

type RecordPos struct {
	// contains filtered or unexported fields
}

type Records

type Records []*Record

func NewRecords

func NewRecords(n int) Records

func (Records) Add

func (recs Records) Add(r *Record) Records

type ResultOpt

type ResultOpt func(*Column, []*query.Record) interface{}

func ResultOutput

func ResultOutput(name string) ResultOpt

type RowIndex

type RowIndex struct {
	// contains filtered or unexported fields
}

type SearchCond

type SearchCond struct {
	Err error
	// contains filtered or unexported fields
}

SearchCond .. saerch condition object.

func (*SearchCond) CancelAndWait

func (cond *SearchCond) CancelAndWait()

CancelAndWait ... wait for canceld backgraound routine( mainly merging index)

func (*SearchCond) Column

func (cond *SearchCond) Column() *Column

func (*SearchCond) FindBy

func (cond *SearchCond) FindBy(col string, kInf interface{}) (sinfo *SearchFinder)

func (*SearchCond) Match

func (f *SearchCond) Match(s string) *SearchFinder

func (*SearchCond) Query deprecated

func (f *SearchCond) Query(s string) (r *SearchFinder)

Deprecated: should use Query2

func (*SearchCond) Query2

func (f *SearchCond) Query2(s string) (r *SearchFinder2)

func (*SearchCond) Select2

func (cond *SearchCond) Select2(fn func(SearchCondElem2) bool) (sfinder *SearchFinder2)

func (*SearchCond) StartMerging

func (cond *SearchCond) StartMerging()

type SearchCondElem2

type SearchCondElem2 struct {
	Column GetCol
	// contains filtered or unexported fields
}

func (SearchCondElem2) Op

func (cond SearchCondElem2) Op(col, op string, v interface{}) (result bool)

type SearchFinder

type SearchFinder struct {
	// contains filtered or unexported fields
}

func EmptySearchFinder

func EmptySearchFinder() *SearchFinder

func (*SearchFinder) All

func (s1 *SearchFinder) All(opts ...ResultOpt) []interface{}

func (*SearchFinder) And

func (s1 *SearchFinder) And(s2 *SearchFinder) (s *SearchFinder)

func (*SearchFinder) First

func (s1 *SearchFinder) First(opts ...ResultOpt) interface{}

func (*SearchFinder) KeyRecord

func (s *SearchFinder) KeyRecord() *query.KeyRecord

func (*SearchFinder) Last

func (s1 *SearchFinder) Last(opts ...ResultOpt) interface{}

func (*SearchFinder) Records

func (s *SearchFinder) Records() []*query.Record

type SearchFinder2

type SearchFinder2 struct {
	// contains filtered or unexported fields
}

func NewSearchFinder2

func NewSearchFinder2(c *Column) *SearchFinder2

func (*SearchFinder2) All

func (sf *SearchFinder2) All(opts ...ResultOpt) interface{}

func (*SearchFinder2) And

func (sf *SearchFinder2) And(i int, key uint64) SkipFn

func (*SearchFinder2) Count

func (sf *SearchFinder2) Count() int

func (*SearchFinder2) First

func (sf *SearchFinder2) First(opts ...ResultOpt) interface{}

func (*SearchFinder2) Last

func (sf *SearchFinder2) Last(opts ...ResultOpt) interface{}

func (*SearchFinder2) Records

func (sf *SearchFinder2) Records() (recs []*query.Record)

type SearchMode

type SearchMode byte
const (
	SEARCH_INIT SearchMode = iota
	SEARCH_START
	SEARCH_ASC
	SEARCH_DESC
	SEARCH_ALL
	SEARCH_FINISH
)

type SelectOpt

type SelectOpt struct {
	// contains filtered or unexported fields
}

func (*SelectOpt) Merge

func (opt *SelectOpt) Merge(opts []SelectOption)

type SelectOption

type SelectOption func(*SelectOpt)

func OptAsc

func OptAsc(isAsc bool) SelectOption

func OptCcondFn

func OptCcondFn(c CondFn) SelectOption

func OptRange

func OptRange(start, last uint64) SelectOption

func OptTraverse

func OptTraverse(fn TraverseFn) SelectOption

type SetKey

type SetKey func(interface{}) []uint64

type SkipFn

type SkipFn func(map[int]bool) map[int]bool

type TraverseFn

type TraverseFn func(f *IndexFile) error

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL