Documentation ¶
Index ¶
- Constants
- Variables
- func ApplyCommonCfgToRW(rw RW, cfg *RWCommonCfg)
- func FileExtFromCompressCfg(c *CompressRWCfg) string
- func NewCmdRWCfg() *cmd.Cfg
- func NewGzipReader(r io.ReadCloser, cfg *CompressRWCfg) *rwwrapper.ReaderWrapperr
- func NewGzipWritter(w io.WriteCloser, cfg *CompressRWCfg) *rwwrapper.WriterWrapperr
- func NewSnappyReader(r io.ReadCloser, _ *CompressRWCfg) *rwwrapper.ReaderWrapperr
- func NewSnappyWriter(w io.WriteCloser, cfg *CompressRWCfg) *rwwrapper.WriterWrapperr
- func NewZstdReader(r io.ReadCloser, cfg *CompressRWCfg) *rwwrapper.ReaderWrapperr
- func NewZstdWriter(w io.WriteCloser, cfg *CompressRWCfg) *rwwrapper.WriterWrapperr
- type BaseRW
- func (b *BaseRW) As(role RWRole)
- func (b *BaseRW) AsReader()
- func (b *BaseRW) AsStarter()
- func (b *BaseRW) AsWriter()
- func (b *BaseRW) AsyncChanCap() int
- func (b *BaseRW) AsyncChanLen() int
- func (b *BaseRW) Close() error
- func (b *BaseRW) EnableCalcHash(algo string)
- func (b *BaseRW) EnableChecksum(checksum string, algo string)
- func (b *BaseRW) EnableMonitorSpeed()
- func (b *BaseRW) EnableRateLimit(rl ratelimit.RxTxRateLimiter)
- func (b *BaseRW) EnableReadBuf(bufSize int, async bool, asyncChanBufSize int)
- func (b *BaseRW) EnableWriteBuf(bufSize int, deadline int, async bool, asyncChanBufSize int)
- func (b *BaseRW) Flush() error
- func (b *BaseRW) GetCfg() interface{}
- func (b *BaseRW) Hash() string
- func (b *BaseRW) Init() (err error)
- func (b *BaseRW) Is(role RWRole) bool
- func (b *BaseRW) IsAsyncOrDeadline() bool
- func (b *BaseRW) IsReader() bool
- func (b *BaseRW) IsStarter() bool
- func (b *BaseRW) IsWriter() bool
- func (b *BaseRW) NestReader(r io.ReadCloser) error
- func (b *BaseRW) NestWriter(w io.WriteCloser) error
- func (b *BaseRW) Nread() uint64
- func (b *BaseRW) Nwrite() uint64
- func (b *BaseRW) Read(p []byte) (int, error)
- func (b *BaseRW) Reader() io.ReadCloser
- func (b *BaseRW) RegisterReadHook(rh ...ReadHook)
- func (b *BaseRW) RegisterWriteHook(wh ...WriteHook)
- func (b *BaseRW) Role() RWRole
- func (b *BaseRW) Start() error
- func (b *BaseRW) Stop() error
- func (b *BaseRW) Type() interface{}
- func (b *BaseRW) Write(p []byte) (int, error)
- func (b *BaseRW) Writer() io.WriteCloser
- type Cfg
- type CmdRW
- type CompressLevel
- type CompressRW
- type CompressRWCfg
- type CompressType
- type CopyRW
- type CopyRWCfg
- type FileRW
- type FileRWCfg
- type FtpCfg
- type NopRW
- type NopRWCfg
- type OssCfg
- type Pipeline
- type RW
- type RWCfg
- type RWCommonCfg
- type RWGroup
- type RWGroupCfg
- type RWRole
- type RWType
- type ReadHook
- type SSHCfg
- type StoreCfg
- type StoreRW
- type StoreType
- type TailRW
- type TailRWCfg
- type WriteHook
Constants ¶
View Source
const ( RWTypeCompress RWType = "compress" CompressTypeNop CompressType = "nop" CompressTypeGzip CompressType = "gzip" CompressTypeSnappy CompressType = "snappy" CompressTypeZstd CompressType = "zstd" CompressLevelFast CompressLevel = "fast" CompressLevelBetter CompressLevel = "better" CompressLevelBest CompressLevel = "best" )
View Source
const ( RWTypeStore RWType = "store" StoreTypeOss = "oss" StoreTypeFtp = "ftp" StoreTypeSSH = "ssh" )
View Source
const PluginTypePipeline plugin.Type = "pipeline"
View Source
const PluginTypeRWGroup plugin.Type = "rwgroup"
Variables ¶
View Source
var ( ErrStoppedManually = errors.New("RW stopped manually") ErrChecksumNotMatch = errors.New("checksum not match") )
Functions ¶
func ApplyCommonCfgToRW ¶
func ApplyCommonCfgToRW(rw RW, cfg *RWCommonCfg)
func FileExtFromCompressCfg ¶
func FileExtFromCompressCfg(c *CompressRWCfg) string
func NewCmdRWCfg ¶
func NewGzipReader ¶
func NewGzipReader(r io.ReadCloser, cfg *CompressRWCfg) *rwwrapper.ReaderWrapperr
func NewGzipWritter ¶
func NewGzipWritter(w io.WriteCloser, cfg *CompressRWCfg) *rwwrapper.WriterWrapperr
func NewSnappyReader ¶
func NewSnappyReader(r io.ReadCloser, _ *CompressRWCfg) *rwwrapper.ReaderWrapperr
func NewSnappyWriter ¶
func NewSnappyWriter(w io.WriteCloser, cfg *CompressRWCfg) *rwwrapper.WriterWrapperr
func NewZstdReader ¶
func NewZstdReader(r io.ReadCloser, cfg *CompressRWCfg) *rwwrapper.ReaderWrapperr
func NewZstdWriter ¶
func NewZstdWriter(w io.WriteCloser, cfg *CompressRWCfg) *rwwrapper.WriterWrapperr
Types ¶
type BaseRW ¶
BaseRW implement RW interface werr: async或deadline只要写失败就存werr,如果werr!=nil,之后的async写都skip掉 rerr: async或buf的err
func (*BaseRW) AsyncChanCap ¶
func (*BaseRW) AsyncChanLen ¶
func (*BaseRW) EnableCalcHash ¶
func (*BaseRW) EnableChecksum ¶
func (*BaseRW) EnableMonitorSpeed ¶
func (b *BaseRW) EnableMonitorSpeed()
func (*BaseRW) EnableRateLimit ¶
func (b *BaseRW) EnableRateLimit(rl ratelimit.RxTxRateLimiter)
func (*BaseRW) EnableReadBuf ¶
func (*BaseRW) EnableWriteBuf ¶
func (*BaseRW) IsAsyncOrDeadline ¶
func (*BaseRW) NestReader ¶
func (b *BaseRW) NestReader(r io.ReadCloser) error
func (*BaseRW) NestWriter ¶
func (b *BaseRW) NestWriter(w io.WriteCloser) error
func (*BaseRW) Reader ¶
func (b *BaseRW) Reader() io.ReadCloser
func (*BaseRW) RegisterReadHook ¶
func (*BaseRW) RegisterWriteHook ¶
func (*BaseRW) Stop ¶
Stop 通知RW停止 不在Stop方法中做CloseReader和CloseWriter的操作的原因是想做到优雅关闭 由Starter检测到stop信号后停止read,然后再进行Close,参考Start方法中的example
存在一种特殊情况,例如 b.Reader()是TailRW或者其他类似的ReadCloser,当执行Read方法的时候,会存在阻塞的情况 如果不执行Close方法且一直没有新内容的话,那么Read会一直阻塞住 所以对b.Reader()和b.Writer()也执行一次runner.Stop通知,由TailRW自己在Stop方法中做Close操作.
func (*BaseRW) Writer ¶
func (b *BaseRW) Writer() io.WriteCloser
type CompressLevel ¶
type CompressLevel string
type CompressRW ¶
type CompressRW struct { RW *CompressRWCfg }
func NewCompressRW ¶
func NewCompressRW() *CompressRW
func (*CompressRW) GetCfg ¶
func (c *CompressRW) GetCfg() interface{}
func (*CompressRW) Init ¶
func (c *CompressRW) Init() error
func (*CompressRW) NestReader ¶
func (c *CompressRW) NestReader(r io.ReadCloser) error
func (*CompressRW) NestWriter ¶
func (c *CompressRW) NestWriter(w io.WriteCloser) error
func (*CompressRW) Nwrite ¶
func (c *CompressRW) Nwrite() uint64
func (*CompressRW) Type ¶
func (c *CompressRW) Type() interface{}
type CompressRWCfg ¶
type CompressRWCfg struct { Type CompressType `json:"type" validate:"required" yaml:"type"` Level CompressLevel `json:"level" validate:"required" yaml:"level"` Concurrency int `json:"concurrency" yaml:"concurrency"` }
func NewCompressRWCfg ¶
func NewCompressRWCfg() *CompressRWCfg
type CompressType ¶
type CompressType string
type CopyRWCfg ¶
type CopyRWCfg struct {
BufSize int `json:"bufSize" yaml:"bufSize"`
}
func NewCopyRWCfg ¶
func NewCopyRWCfg() *CopyRWCfg
type FileRWCfg ¶
type FileRWCfg struct { Path string `json:"path" yaml:"path"` Perm uint32 `json:"perm" yaml:"perm"` }
func NewFileRWCfg ¶
func NewFileRWCfg() *FileRWCfg
type RW ¶
type RW interface { runner.Runner io.ReadWriteCloser plugin.Plugin NestReader(io.ReadCloser) error NestWriter(io.WriteCloser) error Reader() io.ReadCloser Writer() io.WriteCloser Flush() error RegisterReadHook(...ReadHook) RegisterWriteHook(...WriteHook) Nwrite() uint64 Nread() uint64 Hash() string EnableMonitorSpeed() EnableCalcHash(hashAlgo string) EnableChecksum(checksum string, hashAlgo string) EnableRateLimit(ratelimit.RxTxRateLimiter) EnableWriteBuf(bufSize int, deadline int, async bool, asyncChanBufSize int) EnableReadBuf(bufSize int, async bool, asyncChanBufSize int) IsAsyncOrDeadline() bool AsyncChanLen() int AsyncChanCap() int AsStarter() IsStarter() bool AsReader() IsReader() bool AsWriter() IsWriter() bool As(RWRole) Is(RWRole) bool Role() RWRole }
type RWCfg ¶
type RWCfg struct { Type RWType `json:"rwType" validate:"required" yaml:"rwType"` Cfg interface{} `json:"cfg" validate:"required" yaml:"cfg"` CommonCfg *RWCommonCfg `json:"commonCfg" yaml:"commonCfg"` Role RWRole `json:"role" validate:"required" yaml:"role"` }
type RWCommonCfg ¶
type RWCommonCfg struct { RateLimiterCfg *ratelimit.RateLimiterCfg `json:"rateLimiterCfg" yaml:"rateLimiterCfg"` BufSize int `json:"bufSize" yaml:"bufSize"` Deadline int `json:"deadline" yaml:"deadline"` AsyncChanBufSize int `json:"asyncChanBufSize" yaml:"asyncChanBufSize"` EnableMonitorSpeed bool `json:"enableMonitorSpeed" yaml:"enableMonitorSpeed"` EnableCalcHash bool `json:"enableCalcHash" yaml:"enableCalcHash"` EnableRateLimit bool `json:"enableRateLimit" yaml:"enableRateLimit"` EnableAsync bool `json:"enableAsync" yaml:"enableAsync"` Checksum string `json:"checksum" yaml:"checksum"` HashAlgo string `json:"hashAlgo" yaml:"hashAlgo"` }
type RWGroup ¶
type RWGroup struct { runner.Runner *RWGroupCfg // contains filtered or unexported fields }
func NewRWGroup ¶
func NewRWGroup() *RWGroup
func (*RWGroup) FirstReader ¶
func (*RWGroup) LastWriter ¶
type RWGroupCfg ¶
type RWGroupCfg struct { Readers []*RWCfg `json:"readers" yaml:"readers"` Starter *RWCfg `json:"starter" yaml:"starter"` Writers []*RWCfg `json:"writers" yaml:"writers"` }
func NewRWGroupCfg ¶
func NewRWGroupCfg() *RWGroupCfg
func (*RWGroupCfg) FromReader ¶
func (c *RWGroupCfg) FromReader(typ RWType, cfg interface{}, commonCfg *RWCommonCfg) *RWGroupCfg
func (*RWGroupCfg) SetStarter ¶
func (c *RWGroupCfg) SetStarter(typ RWType, cfg interface{}, commonCfg *RWCommonCfg) *RWGroupCfg
func (*RWGroupCfg) ToWriter ¶
func (c *RWGroupCfg) ToWriter(typ RWType, cfg interface{}, commonCfg *RWCommonCfg) *RWGroupCfg
type RWType ¶
type RWType string
const RWTypeCmd RWType = "cmd"
const RWTypeCopy RWType = "copy"
const RWTypeFile RWType = "file"
const RWTypeNop RWType = "nop"
const RWTypeTail RWType = "tail"
type StoreCfg ¶ added in v0.0.7
type StoreCfg struct { Type StoreType `json:"type" validate:"required" yaml:"type"` Cfg interface{} `json:"cfg" validate:"required" yaml:"cfg"` URL string `json:"url" validate:"min=1" yaml:"url"` Checksum string `json:"checksum" yaml:"checksum"` HashAlgo string `json:"hashAlgo" yaml:"hashAlgo"` Retry int `json:"retry" validate:"gte=1" yaml:"retry"` Timeout int `json:"timeout" validate:"gte=1" yaml:"timeout"` }
func NewStoreCfg ¶
func NewStoreCfg() *StoreCfg
type StoreRW ¶
func NewStoreRW ¶
func NewStoreRW() *StoreRW
func (*StoreRW) EnableCalcHash ¶ added in v0.3.9
func (*StoreRW) EnableChecksum ¶ added in v0.3.9
type TailRWCfg ¶
func NewTailRWCfg ¶
func NewTailRWCfg() *TailRWCfg
Click to show internal directories.
Click to hide internal directories.