Documentation ¶
Index ¶
- Variables
- func Bytes(iv interface{}) []byte
- func GetGraphString(tk Task) string
- func IsFileExists(path string) bool
- func String(iv interface{}) string
- type ChannelOutput
- func (co *ChannelOutput) Channel() chan interface{}
- func (co *ChannelOutput) Close() error
- func (co *ChannelOutput) Destroy()
- func (co *ChannelOutput) IsSkip() bool
- func (co *ChannelOutput) Read() (interface{}, error)
- func (co *ChannelOutput) Ready() chan struct{}
- func (co *ChannelOutput) String() string
- func (co *ChannelOutput) Write(v interface{}) error
- type DeserializeFunc
- type EmptyInput
- type FileOutput
- func (out *FileOutput) Channel() chan interface{}
- func (out *FileOutput) Close() error
- func (out *FileOutput) Destroy()
- func (out *FileOutput) IsSkip() bool
- func (out *FileOutput) Read() (interface{}, error)
- func (out *FileOutput) Ready() chan struct{}
- func (out *FileOutput) String() string
- func (out *FileOutput) Write(v interface{}) error
- type FileStreaming
- func (fs *FileStreaming) Channel() chan interface{}
- func (fs *FileStreaming) Close() error
- func (fs *FileStreaming) Destroy()
- func (fs *FileStreaming) IsSkip() bool
- func (fs *FileStreaming) Read() (interface{}, error)
- func (fs *FileStreaming) Ready() chan struct{}
- func (fs *FileStreaming) String() string
- func (fs *FileStreaming) Write(v interface{}) error
- type Flow
- type Input
- type Line
- type Metric
- type Options
- type Output
- type Result
- type S3Output
- func (out *S3Output) Channel() chan interface{}
- func (out *S3Output) Close() error
- func (out *S3Output) Destroy()
- func (out *S3Output) IsSkip() bool
- func (out *S3Output) Read() (interface{}, error)
- func (out *S3Output) Ready() chan struct{}
- func (out *S3Output) String() string
- func (out *S3Output) Write(v interface{}) error
- type SerializeFunc
- type Serializer
- type Stats
- type Task
- type TaskInput
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrSerializeValue = errors.New("ErrSerializeValue") DefaultSerializer = &Serializer{ Serialize: defaultSerialize, Deserialize: defaultDeserialize, } )
View Source
var ( // If you don't want to print debug logs, please disable the output of this Logger Logger = log.New(os.Stdout, "", log.LstdFlags) // Polling interval when EOF is reached TailPollInterval = 250 * time.Millisecond )
View Source
var GraphName = "flow"
Functions ¶
func GetGraphString ¶
GetGraphString returns a graph string string which is written by dot language
func IsFileExists ¶
Types ¶
type ChannelOutput ¶
type ChannelOutput struct {
// contains filtered or unexported fields
}
func NewChannelOutput ¶
func NewChannelOutput(name string, ch chan interface{}) *ChannelOutput
func (*ChannelOutput) Channel ¶
func (co *ChannelOutput) Channel() chan interface{}
func (*ChannelOutput) Close ¶
func (co *ChannelOutput) Close() error
func (*ChannelOutput) Destroy ¶
func (co *ChannelOutput) Destroy()
func (*ChannelOutput) IsSkip ¶
func (co *ChannelOutput) IsSkip() bool
func (*ChannelOutput) Read ¶
func (co *ChannelOutput) Read() (interface{}, error)
func (*ChannelOutput) Ready ¶
func (co *ChannelOutput) Ready() chan struct{}
func (*ChannelOutput) String ¶
func (co *ChannelOutput) String() string
func (*ChannelOutput) Write ¶
func (co *ChannelOutput) Write(v interface{}) error
type DeserializeFunc ¶
type EmptyInput ¶
type EmptyInput struct{}
func (*EmptyInput) Channel ¶
func (in *EmptyInput) Channel() chan interface{}
func (*EmptyInput) Read ¶
func (in *EmptyInput) Read() (interface{}, error)
func (*EmptyInput) Ready ¶
func (in *EmptyInput) Ready() chan struct{}
func (*EmptyInput) String ¶
func (in *EmptyInput) String() string
type FileOutput ¶
type FileOutput struct {
// contains filtered or unexported fields
}
func NewFileOutput ¶
func NewFileOutput(path string, srz *Serializer) (*FileOutput, error)
func (*FileOutput) Channel ¶
func (out *FileOutput) Channel() chan interface{}
func (*FileOutput) Close ¶
func (out *FileOutput) Close() error
func (*FileOutput) Destroy ¶
func (out *FileOutput) Destroy()
func (*FileOutput) IsSkip ¶
func (out *FileOutput) IsSkip() bool
func (*FileOutput) Read ¶
func (out *FileOutput) Read() (interface{}, error)
func (*FileOutput) Ready ¶
func (out *FileOutput) Ready() chan struct{}
func (*FileOutput) String ¶
func (out *FileOutput) String() string
func (*FileOutput) Write ¶
func (out *FileOutput) Write(v interface{}) error
type FileStreaming ¶
type FileStreaming struct {
// contains filtered or unexported fields
}
streaming I/O
func NewFileStreaming ¶
func NewFileStreaming(path string, srz *Serializer) (*FileStreaming, error)
func (*FileStreaming) Channel ¶
func (fs *FileStreaming) Channel() chan interface{}
func (*FileStreaming) Close ¶
func (fs *FileStreaming) Close() error
func (*FileStreaming) Destroy ¶
func (fs *FileStreaming) Destroy()
func (*FileStreaming) IsSkip ¶
func (fs *FileStreaming) IsSkip() bool
func (*FileStreaming) Read ¶
func (fs *FileStreaming) Read() (interface{}, error)
func (*FileStreaming) Ready ¶
func (fs *FileStreaming) Ready() chan struct{}
func (*FileStreaming) String ¶
func (fs *FileStreaming) String() string
func (*FileStreaming) Write ¶
func (fs *FileStreaming) Write(v interface{}) error
type Flow ¶
type Flow struct {
// contains filtered or unexported fields
}
func (*Flow) StatsHandler ¶
func (fl *Flow) StatsHandler(w http.ResponseWriter, r *http.Request)
type Input ¶
type Input interface { Channel() chan interface{} Ready() chan struct{} Read() (interface{}, error) String() string }
func CombineInputs ¶
CombineInputs combines multiple inputs into single input
type Options ¶
type Options func(*options)
func WithInputs ¶
func WithOutputs ¶
func WithProcessor ¶
func WithWorker ¶
type Output ¶
type Output interface { Write(interface{}) error Close() error Destroy() Channel() chan interface{} Read() (interface{}, error) IsSkip() bool Ready() chan struct{} String() string }
Output is output interface
type Result ¶
type Result struct {
// contains filtered or unexported fields
}
type S3Output ¶
type S3Output struct {
// contains filtered or unexported fields
}
func NewS3Output ¶
func NewS3Output(c client.ConfigProvider, bucket, path string, srz *Serializer) (*S3Output, error)
type SerializeFunc ¶
type Serializer ¶
type Serializer struct { Serialize SerializeFunc Deserialize DeserializeFunc }
type Task ¶
type Task interface { // Name returns task name Name() string // In returns a reader that can accept the output of other task In(...int) Input // Out returns a writer that can input to other task Out(...int) Output // Requres returns the task list on which this task depends Requires() []Task // contains filtered or unexported methods }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.