pipeline

package
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2024 License: MIT Imports: 36 Imported by: 3

Documentation

Index

Constants

View Source
const (
	RWTypeCompress RWType = "compress"

	CompressTypeNop    CompressType = "nop"
	CompressTypeGzip   CompressType = "gzip"
	CompressTypeSnappy CompressType = "snappy"
	CompressTypeZstd   CompressType = "zstd"

	CompressLevelFast   CompressLevel = "fast"
	CompressLevelBetter CompressLevel = "better"
	CompressLevelBest   CompressLevel = "best"
)
View Source
const (
	RWTypeStore RWType = "store"

	StoreTypeOss = "oss"
	StoreTypeFtp = "ftp"
	StoreTypeSSH = "ssh"
)
View Source
const PluginTypePipeline plugin.Type = "pipeline"
View Source
const PluginTypeRWGroup plugin.Type = "rwgroup"

Variables

View Source
var (
	ErrStoppedManually  = errors.New("RW stopped manually")
	ErrChecksumNotMatch = errors.New("checksum not match")
)

Functions

func ApplyCommonCfgToRW

func ApplyCommonCfgToRW(rw RW, cfg *RWCommonCfg)

func FileExtFromCompressCfg

func FileExtFromCompressCfg(c *CompressRWCfg) string

func NewCmdRWCfg

func NewCmdRWCfg() *cmd.Cfg

func NewGzipReader

func NewGzipReader(r io.ReadCloser, cfg *CompressRWCfg) *rwwrapper.ReaderWrapperr

func NewSnappyWriter

func NewSnappyWriter(w io.WriteCloser, cfg *CompressRWCfg) *rwwrapper.WriterWrapperr

func NewZstdReader

func NewZstdReader(r io.ReadCloser, cfg *CompressRWCfg) *rwwrapper.ReaderWrapperr

Types

type BaseRW

type BaseRW struct {
	runner.Runner
	// contains filtered or unexported fields
}

BaseRW implement RW interface werr: async或deadline只要写失败就存werr,如果werr!=nil,之后的async写都skip掉 rerr: async或buf的err

func NewBaseRW

func NewBaseRW(name string) *BaseRW

func (*BaseRW) As

func (b *BaseRW) As(role RWRole)

func (*BaseRW) AsReader

func (b *BaseRW) AsReader()

func (*BaseRW) AsStarter

func (b *BaseRW) AsStarter()

func (*BaseRW) AsWriter

func (b *BaseRW) AsWriter()

func (*BaseRW) AsyncChanCap

func (b *BaseRW) AsyncChanCap() int

func (*BaseRW) AsyncChanLen

func (b *BaseRW) AsyncChanLen() int

func (*BaseRW) Close

func (b *BaseRW) Close() error

func (*BaseRW) EnableCalcHash

func (b *BaseRW) EnableCalcHash(algo string)

func (*BaseRW) EnableChecksum

func (b *BaseRW) EnableChecksum(checksum string, algo string)

func (*BaseRW) EnableMonitorSpeed

func (b *BaseRW) EnableMonitorSpeed()

func (*BaseRW) EnableRateLimit

func (b *BaseRW) EnableRateLimit(rl ratelimit.RxTxRateLimiter)

func (*BaseRW) EnableReadBuf

func (b *BaseRW) EnableReadBuf(bufSize int, async bool, asyncChanBufSize int)

func (*BaseRW) EnableWriteBuf

func (b *BaseRW) EnableWriteBuf(bufSize int, deadline int, async bool, asyncChanBufSize int)

func (*BaseRW) Flush

func (b *BaseRW) Flush() error

func (*BaseRW) GetCfg

func (b *BaseRW) GetCfg() interface{}

func (*BaseRW) Hash

func (b *BaseRW) Hash() string

func (*BaseRW) Init

func (b *BaseRW) Init() (err error)

func (*BaseRW) Is

func (b *BaseRW) Is(role RWRole) bool

func (*BaseRW) IsAsyncOrDeadline

func (b *BaseRW) IsAsyncOrDeadline() bool

func (*BaseRW) IsReader

func (b *BaseRW) IsReader() bool

func (*BaseRW) IsStarter

func (b *BaseRW) IsStarter() bool

func (*BaseRW) IsWriter

func (b *BaseRW) IsWriter() bool

func (*BaseRW) NestReader

func (b *BaseRW) NestReader(r io.ReadCloser) error

func (*BaseRW) NestWriter

func (b *BaseRW) NestWriter(w io.WriteCloser) error

func (*BaseRW) Nread

func (b *BaseRW) Nread() uint64

func (*BaseRW) Nwrite

func (b *BaseRW) Nwrite() uint64

func (*BaseRW) Read

func (b *BaseRW) Read(p []byte) (int, error)

func (*BaseRW) Reader

func (b *BaseRW) Reader() io.ReadCloser

func (*BaseRW) RegisterReadHook

func (b *BaseRW) RegisterReadHook(rh ...ReadHook)

func (*BaseRW) RegisterWriteHook

func (b *BaseRW) RegisterWriteHook(wh ...WriteHook)

func (*BaseRW) Role

func (b *BaseRW) Role() RWRole

func (*BaseRW) Start

func (b *BaseRW) Start() error

Start 如果当前RW可以作为Starter,那么需要实现Start方法.

func (*BaseRW) Stop

func (b *BaseRW) Stop() error

Stop 通知RW停止 不在Stop方法中做CloseReader和CloseWriter的操作的原因是想做到优雅关闭 由Starter检测到stop信号后停止read,然后再进行Close,参考Start方法中的example

存在一种特殊情况,例如 b.Reader()是TailRW或者其他类似的ReadCloser,当执行Read方法的时候,会存在阻塞的情况 如果不执行Close方法且一直没有新内容的话,那么Read会一直阻塞住 所以对b.Reader()和b.Writer()也执行一次runner.Stop通知,由TailRW自己在Stop方法中做Close操作.

func (*BaseRW) Type

func (b *BaseRW) Type() interface{}

func (*BaseRW) Write

func (b *BaseRW) Write(p []byte) (int, error)

func (*BaseRW) Writer

func (b *BaseRW) Writer() io.WriteCloser

type Cfg

type Cfg struct {
	RWs []*RWCfg `json:"rws" validate:"min=1" yaml:"rws"`
}

func NewCfg

func NewCfg() *Cfg

func (*Cfg) Add added in v0.0.8

func (c *Cfg) Add(role RWRole, typ RWType, cfg interface{}, commonCfg *RWCommonCfg) *Cfg

type CmdRW

type CmdRW struct {
	RW
	*cmd.Cfg
}

func NewCmdRW

func NewCmdRW() *CmdRW

func (*CmdRW) GetCfg

func (c *CmdRW) GetCfg() interface{}

func (*CmdRW) Init

func (c *CmdRW) Init() error

func (*CmdRW) Start

func (c *CmdRW) Start() error

func (*CmdRW) Stop

func (c *CmdRW) Stop() error

func (*CmdRW) Type

func (c *CmdRW) Type() interface{}

type CompressLevel

type CompressLevel string

type CompressRW

type CompressRW struct {
	RW
	*CompressRWCfg
}

func NewCompressRW

func NewCompressRW() *CompressRW

func (*CompressRW) GetCfg

func (c *CompressRW) GetCfg() interface{}

func (*CompressRW) Init

func (c *CompressRW) Init() error

func (*CompressRW) NestReader

func (c *CompressRW) NestReader(r io.ReadCloser) error

func (*CompressRW) NestWriter

func (c *CompressRW) NestWriter(w io.WriteCloser) error

func (*CompressRW) Nwrite

func (c *CompressRW) Nwrite() uint64

func (*CompressRW) Type

func (c *CompressRW) Type() interface{}

type CompressRWCfg

type CompressRWCfg struct {
	Type        CompressType  `json:"type"        validate:"required" yaml:"type"`
	Level       CompressLevel `json:"level"       validate:"required" yaml:"level"`
	Concurrency int           `json:"concurrency" yaml:"concurrency"`
}

func NewCompressRWCfg

func NewCompressRWCfg() *CompressRWCfg

type CompressType

type CompressType string

type CopyRW

type CopyRW struct {
	RW
	*CopyRWCfg
}

func NewCopyRW

func NewCopyRW() *CopyRW

func (*CopyRW) GetCfg

func (cp *CopyRW) GetCfg() interface{}

func (*CopyRW) Init

func (cp *CopyRW) Init() error

func (*CopyRW) Start

func (cp *CopyRW) Start() error

func (*CopyRW) Type

func (cp *CopyRW) Type() interface{}

type CopyRWCfg

type CopyRWCfg struct {
	BufSize int `json:"bufSize" yaml:"bufSize"`
}

func NewCopyRWCfg

func NewCopyRWCfg() *CopyRWCfg

type FileRW

type FileRW struct {
	RW
	*FileRWCfg
	// contains filtered or unexported fields
}

func NewFileRW

func NewFileRW() *FileRW

func (*FileRW) GetCfg

func (f *FileRW) GetCfg() interface{}

func (*FileRW) Init

func (f *FileRW) Init() error

func (*FileRW) Type

func (f *FileRW) Type() interface{}

type FileRWCfg

type FileRWCfg struct {
	Path string `json:"path" yaml:"path"`
	Perm uint32 `json:"perm" yaml:"perm"`
}

func NewFileRWCfg

func NewFileRWCfg() *FileRWCfg

type FtpCfg

type FtpCfg struct {
	Addr string `json:"addr" validate:"min=1" yaml:"addr"`
	User string `json:"user" validate:"min=1" yaml:"user"`
	Pwd  string `json:"pwd"  validate:"min=1" yaml:"pwd"`
}

type NopRW

type NopRW struct {
	RW
	*NopRWCfg
}

func NewNopRW

func NewNopRW() *NopRW

func (*NopRW) GetCfg

func (n *NopRW) GetCfg() interface{}

func (*NopRW) Type

func (n *NopRW) Type() interface{}

type NopRWCfg

type NopRWCfg struct{}

func NewNopRWCfg

func NewNopRWCfg() *NopRWCfg

type OssCfg

type OssCfg struct {
	Ak     string `json:"ak"     validate:"min=1" yaml:"ak"`
	Sk     string `json:"sk"     validate:"min=1" yaml:"sk"`
	Region string `json:"region" yaml:"region"`
	Append bool   `json:"append" yaml:"append"`
}

type Pipeline

type Pipeline struct {
	runner.Runner
	*Cfg
	// contains filtered or unexported fields
}

func New

func New() *Pipeline

func (*Pipeline) GetCfg

func (p *Pipeline) GetCfg() interface{}

func (*Pipeline) Init

func (p *Pipeline) Init() error

func (*Pipeline) Start

func (p *Pipeline) Start() error

func (*Pipeline) Stop

func (p *Pipeline) Stop() error

func (*Pipeline) Type

func (p *Pipeline) Type() interface{}

type RW

type RW interface {
	runner.Runner
	io.ReadWriteCloser
	plugin.Plugin

	NestReader(io.ReadCloser) error
	NestWriter(io.WriteCloser) error
	Reader() io.ReadCloser
	Writer() io.WriteCloser

	Flush() error

	RegisterReadHook(...ReadHook)
	RegisterWriteHook(...WriteHook)

	Nwrite() uint64
	Nread() uint64

	Hash() string

	EnableMonitorSpeed()
	EnableCalcHash(hashAlgo string)
	EnableChecksum(checksum string, hashAlgo string)
	EnableRateLimit(ratelimit.RxTxRateLimiter)
	EnableWriteBuf(bufSize int, deadline int, async bool, asyncChanBufSize int)
	EnableReadBuf(bufSize int, async bool, asyncChanBufSize int)

	IsAsyncOrDeadline() bool
	AsyncChanLen() int
	AsyncChanCap() int

	AsStarter()
	IsStarter() bool
	AsReader()
	IsReader() bool
	AsWriter()
	IsWriter() bool
	As(RWRole)
	Is(RWRole) bool
	Role() RWRole
}

func Create

func Create(role RWRole, typ RWType, cfg interface{}, commonCfg *RWCommonCfg) RW

type RWCfg

type RWCfg struct {
	Type      RWType       `json:"rwType"    validate:"required" yaml:"rwType"`
	Cfg       interface{}  `json:"cfg"       validate:"required" yaml:"cfg"`
	CommonCfg *RWCommonCfg `json:"commonCfg" yaml:"commonCfg"`
	Role      RWRole       `json:"role"      validate:"required" yaml:"role"`
}

type RWCommonCfg

type RWCommonCfg struct {
	RateLimiterCfg     *ratelimit.RateLimiterCfg `json:"rateLimiterCfg"     yaml:"rateLimiterCfg"`
	BufSize            int                       `json:"bufSize"            yaml:"bufSize"`
	Deadline           int                       `json:"deadline"           yaml:"deadline"`
	AsyncChanBufSize   int                       `json:"asyncChanBufSize"   yaml:"asyncChanBufSize"`
	EnableMonitorSpeed bool                      `json:"enableMonitorSpeed" yaml:"enableMonitorSpeed"`
	EnableCalcHash     bool                      `json:"enableCalcHash"     yaml:"enableCalcHash"`
	EnableRateLimit    bool                      `json:"enableRateLimit"    yaml:"enableRateLimit"`
	EnableAsync        bool                      `json:"enableAsync"        yaml:"enableAsync"`
	Checksum           string                    `json:"checksum"           yaml:"checksum"`
	HashAlgo           string                    `json:"hashAlgo"           yaml:"hashAlgo"`
}

type RWGroup

type RWGroup struct {
	runner.Runner
	*RWGroupCfg
	// contains filtered or unexported fields
}

func NewRWGroup

func NewRWGroup() *RWGroup

func (*RWGroup) FirstReader

func (g *RWGroup) FirstReader() RW

func (*RWGroup) Init

func (g *RWGroup) Init() error

func (*RWGroup) LastWriter

func (g *RWGroup) LastWriter() RW

func (*RWGroup) Readers

func (g *RWGroup) Readers() []RW

func (*RWGroup) Size

func (g *RWGroup) Size() int

func (*RWGroup) Start

func (g *RWGroup) Start() error

func (*RWGroup) Starter

func (g *RWGroup) Starter() RW

func (*RWGroup) Stop

func (g *RWGroup) Stop() error

func (*RWGroup) Writers

func (g *RWGroup) Writers() []RW

type RWGroupCfg

type RWGroupCfg struct {
	Readers []*RWCfg `json:"readers" yaml:"readers"`
	Starter *RWCfg   `json:"starter" yaml:"starter"`
	Writers []*RWCfg `json:"writers" yaml:"writers"`
}

func NewRWGroupCfg

func NewRWGroupCfg() *RWGroupCfg

func (*RWGroupCfg) FromReader

func (c *RWGroupCfg) FromReader(typ RWType, cfg interface{}, commonCfg *RWCommonCfg) *RWGroupCfg

func (*RWGroupCfg) SetStarter

func (c *RWGroupCfg) SetStarter(typ RWType, cfg interface{}, commonCfg *RWCommonCfg) *RWGroupCfg

func (*RWGroupCfg) ToWriter

func (c *RWGroupCfg) ToWriter(typ RWType, cfg interface{}, commonCfg *RWCommonCfg) *RWGroupCfg

type RWRole

type RWRole string
const (
	RWRoleStarter RWRole = "starter"
	RWRoleReader  RWRole = "reader"
	RWRoleWriter  RWRole = "writer"
)

type RWType

type RWType string
const RWTypeCmd RWType = "cmd"
const RWTypeCopy RWType = "copy"
const RWTypeFile RWType = "file"
const RWTypeNop RWType = "nop"
const RWTypeTail RWType = "tail"

type ReadHook

type ReadHook func(n int, bs []byte, err error, cost int64, misc ...interface{}) error

type SSHCfg

type SSHCfg struct {
	Addr       string `json:"addr"       validate:"min=1"  yaml:"addr"`
	User       string `json:"user"       validate:"min=1"  yaml:"user"`
	Pwd        string `json:"pwd"        yaml:"pwd"`
	PrivateKey string `json:"privateKey" yaml:"privateKey"`
}

type StoreCfg added in v0.0.7

type StoreCfg struct {
	Type     StoreType   `json:"type"     validate:"required" yaml:"type"`
	Cfg      interface{} `json:"cfg"      validate:"required" yaml:"cfg"`
	URL      string      `json:"url"      validate:"min=1"    yaml:"url"`
	Checksum string      `json:"checksum" yaml:"checksum"`
	HashAlgo string      `json:"hashAlgo" yaml:"hashAlgo"`
	Retry    int         `json:"retry"    validate:"gte=1"    yaml:"retry"`
	Timeout  int         `json:"timeout"  validate:"gte=1"    yaml:"timeout"`
}

func NewStoreCfg

func NewStoreCfg() *StoreCfg

type StoreRW

type StoreRW struct {
	RW
	*StoreCfg
	// contains filtered or unexported fields
}

func NewStoreRW

func NewStoreRW() *StoreRW

func (*StoreRW) Close

func (s *StoreRW) Close() error

func (*StoreRW) EnableCalcHash added in v0.3.9

func (s *StoreRW) EnableCalcHash(hashAlgo string)

func (*StoreRW) EnableChecksum added in v0.3.9

func (s *StoreRW) EnableChecksum(checksum string, hashAlgo string)

func (*StoreRW) GetCfg

func (s *StoreRW) GetCfg() interface{}

func (*StoreRW) Init

func (s *StoreRW) Init() error

func (*StoreRW) Start

func (s *StoreRW) Start() error

func (*StoreRW) Type

func (s *StoreRW) Type() interface{}

type StoreType

type StoreType string

type TailRW

type TailRW struct {
	RW
	*TailRWCfg
	// contains filtered or unexported fields
}

func NewTailRW

func NewTailRW() *TailRW

func (*TailRW) GetCfg

func (t *TailRW) GetCfg() interface{}

func (*TailRW) Init

func (t *TailRW) Init() error

func (*TailRW) Stop

func (t *TailRW) Stop() error

func (*TailRW) Type

func (t *TailRW) Type() interface{}

type TailRWCfg

type TailRWCfg struct {
	Path string `json:"path" yaml:"path"`
	Pos  int64  `json:"pos"  yaml:"pos"`
}

func NewTailRWCfg

func NewTailRWCfg() *TailRWCfg

type WriteHook

type WriteHook func(n int, bs []byte, err error, cost int64, misc ...interface{}) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL