Documentation ¶
Index ¶
- Variables
- func MakeAMQPReader(config map[string]interface{}) (kodex.Reader, error)
- func MakeBytesReader(config map[string]interface{}) (kodex.Reader, error)
- func MakeFileReader(config map[string]interface{}) (kodex.Reader, error)
- func MakeGenerateReader(config map[string]interface{}) (kodex.Reader, error)
- func MakeStdinReader(config map[string]interface{}) (kodex.Reader, error)
- type AMQPPayload
- type AMQPReader
- type BytesPayload
- type BytesReader
- type FilePayload
- type FileReader
- type GeneratePayload
- type GenerateReader
- type Generator
- type GeneratorMaker
- type IsGenerator
- type StdinPayload
- type StdinReader
Constants ¶
This section is empty.
Variables ¶
View Source
var AMQPReaderForm = forms.Form{ ErrorMsg: "invalid data encountered in the AMQP reader form", Fields: append([]forms.Field{ { Name: "consumer", Validators: []forms.Validator{ forms.IsOptional{Default: ""}, forms.IsString{}, }, }, }, writers.AMQPBaseForm.Fields...), }
View Source
var BytesReaderForm = forms.Form{ ErrorMsg: "invalid data encountered in the bytes reader form", Fields: []forms.Field{ { Name: "input", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsBytes{ Encoding: "base64", }, }, }, { Name: "format", Validators: []forms.Validator{ forms.IsIn{Choices: []interface{}{"json"}}, }, }, { Name: "compressed", Validators: []forms.Validator{ forms.IsOptional{Default: false}, forms.IsBoolean{}, }, }, { Name: "chunk-size", Validators: []forms.Validator{ forms.IsOptional{Default: 100}, forms.IsInteger{HasMin: true, Min: 0, HasMax: true, Max: 10000}, }, }, { Name: "headers", Validators: []forms.Validator{ forms.IsOptional{Default: map[string]interface{}{}}, forms.IsStringMap{}, }, }, }, }
View Source
var FileReaderForm = forms.Form{ ErrorMsg: "invalid data encountered in the file reader form", Fields: []forms.Field{ { Name: "path", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsString{}, }, }, { Name: "format", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsIn{Choices: []interface{}{"json"}}, }, }, { Name: "compressed", Validators: []forms.Validator{ forms.IsOptional{Default: false}, forms.IsBoolean{}, }, }, { Name: "chunk-size", Validators: []forms.Validator{ forms.IsOptional{Default: 10}, forms.IsInteger{HasMin: true, Min: 0, HasMax: true, Max: 10000}, }, }, { Name: "headers", Validators: []forms.Validator{ forms.IsOptional{Default: map[string]interface{}{}}, forms.IsStringMap{}, }, }, }, }
View Source
var GenerateForm = forms.Form{ ErrorMsg: "invalid data encountered in the generate reader form", Fields: []forms.Field{ { Name: "fields", Validators: []forms.Validator{ forms.IsStringMap{ Form: &forms.Form{ Fields: []forms.Field{ forms.Field{ Name: "*", Validators: []forms.Validator{ forms.IsStringMap{ Form: &GeneratorForm, }, IsGenerator{}, }, }, }, }, }, }, }, { Name: "frequency", Validators: []forms.Validator{ forms.IsOptional{Default: 1000}, forms.IsFloat{HasMin: true, HasMax: true, Min: 1e-3, Max: 1e6}, }, }, }, }
View Source
var GeneratorForm = forms.Form{ ErrorMsg: "invalid data encountered in the field config form", Fields: []forms.Field{ forms.Field{ Name: "type", Validators: []forms.Validator{ forms.IsString{}, forms.IsIn{Choices: keys(generators)}, }, }, forms.Field{ Name: "config", Validators: []forms.Validator{ forms.IsOptional{Default: map[string]interface{}{}}, forms.IsStringMap{}, }, }, }, }
View Source
var LiteralForm = forms.Form{ Fields: []forms.Field{ forms.Field{ Name: "value", Validators: []forms.Validator{ forms.IsRequired{}, }, }, }, }
View Source
var Readers = kodex.ReaderDefinitions{ "file": kodex.ReaderDefinition{ Maker: MakeFileReader, Form: FileReaderForm, Internal: true, }, "stdin": kodex.ReaderDefinition{ Maker: MakeStdinReader, Form: StdinReaderForm, Internal: true, }, "generate": kodex.ReaderDefinition{ Maker: MakeGenerateReader, Form: GenerateForm, Internal: true, }, "bytes": kodex.ReaderDefinition{ Maker: MakeBytesReader, Form: BytesReaderForm, Internal: true, }, "amqp": kodex.ReaderDefinition{ Maker: MakeAMQPReader, Form: AMQPReaderForm, Internal: false, }, }
View Source
var StdinReaderForm = forms.Form{ ErrorMsg: "invalid data encountered in the stdin reader form", Fields: []forms.Field{ { Name: "format", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsIn{Choices: []interface{}{"json"}}, }, }, { Name: "compressed", Validators: []forms.Validator{ forms.IsOptional{Default: false}, forms.IsBoolean{}, }, }, { Name: "chunk-size", Validators: []forms.Validator{ forms.IsOptional{Default: 10}, forms.IsInteger{HasMin: true, Min: 0, HasMax: true, Max: 10000}, }, }, { Name: "headers", Validators: []forms.Validator{ forms.IsOptional{Default: map[string]interface{}{}}, forms.IsStringMap{}, }, }, }, }
Functions ¶
func MakeGenerateReader ¶
Types ¶
type AMQPPayload ¶
type AMQPPayload struct {
// contains filtered or unexported fields
}
func (*AMQPPayload) Acknowledge ¶
func (f *AMQPPayload) Acknowledge() error
func (*AMQPPayload) EndOfStream ¶
func (f *AMQPPayload) EndOfStream() bool
func (*AMQPPayload) Headers ¶
func (f *AMQPPayload) Headers() map[string]interface{}
func (*AMQPPayload) Items ¶
func (f *AMQPPayload) Items() []*kodex.Item
func (*AMQPPayload) Reject ¶
func (f *AMQPPayload) Reject() error
type AMQPReader ¶
type AMQPReader struct { writers.AMQPBase ConsumerName string // contains filtered or unexported fields }
func (*AMQPReader) MakeAMQPPayload ¶
func (a *AMQPReader) MakeAMQPPayload(delivery amqp.Delivery) (*AMQPPayload, error)
func (*AMQPReader) Purge ¶
func (a *AMQPReader) Purge() error
type BytesPayload ¶
type BytesPayload struct {
// contains filtered or unexported fields
}
func MakeGeneratePayload ¶
func MakeGeneratePayload() *BytesPayload
func (*BytesPayload) Acknowledge ¶
func (f *BytesPayload) Acknowledge() error
func (*BytesPayload) EndOfStream ¶
func (f *BytesPayload) EndOfStream() bool
func (*BytesPayload) Headers ¶
func (f *BytesPayload) Headers() map[string]interface{}
func (*BytesPayload) Items ¶
func (f *BytesPayload) Items() []*kodex.Item
func (*BytesPayload) Reject ¶
func (f *BytesPayload) Reject() error
type BytesReader ¶
type BytesReader struct { Input []byte Reader *bufio.Reader Format string Compressed bool Headers map[string]interface{} ChunkSize int }
func (*BytesReader) MakeBytesPayload ¶
func (s *BytesReader) MakeBytesPayload() (*BytesPayload, error)
func (*BytesReader) Purge ¶
func (f *BytesReader) Purge() error
func (*BytesReader) Teardown ¶
func (s *BytesReader) Teardown() error
type FilePayload ¶
type FilePayload struct {
// contains filtered or unexported fields
}
func (*FilePayload) Acknowledge ¶
func (f *FilePayload) Acknowledge() error
func (*FilePayload) EndOfStream ¶
func (f *FilePayload) EndOfStream() bool
func (*FilePayload) Headers ¶
func (f *FilePayload) Headers() map[string]interface{}
func (*FilePayload) Items ¶
func (f *FilePayload) Items() []*kodex.Item
func (*FilePayload) Reject ¶
func (f *FilePayload) Reject() error
type FileReader ¶
type FileReader struct { Reader *bufio.Reader File *os.File GzReader *gzip.Reader Format string Compressed bool Headers map[string]interface{} Path string ChunkSize int }
func (*FileReader) MakeFilePayload ¶
func (s *FileReader) MakeFilePayload() (*FilePayload, error)
func (*FileReader) Purge ¶
func (f *FileReader) Purge() error
func (*FileReader) Teardown ¶
func (s *FileReader) Teardown() error
type GeneratePayload ¶
type GeneratePayload struct {
// contains filtered or unexported fields
}
func (*GeneratePayload) Acknowledge ¶
func (f *GeneratePayload) Acknowledge() error
func (*GeneratePayload) Headers ¶
func (f *GeneratePayload) Headers() map[string]interface{}
func (*GeneratePayload) Items ¶
func (f *GeneratePayload) Items() []*kodex.Item
func (*GeneratePayload) Reject ¶
func (f *GeneratePayload) Reject() error
func (*GeneratePayload) Teardown ¶
func (s *GeneratePayload) Teardown() error
type GenerateReader ¶
type GenerateReader struct {
// contains filtered or unexported fields
}
func (*GenerateReader) Purge ¶
func (f *GenerateReader) Purge() error
func (*GenerateReader) Teardown ¶
func (g *GenerateReader) Teardown() error
type GeneratorMaker ¶
type IsGenerator ¶
type IsGenerator struct{}
func (IsGenerator) MarshalJSON ¶
func (g IsGenerator) MarshalJSON() ([]byte, error)
func (IsGenerator) Validate ¶
func (g IsGenerator) Validate(input interface{}, values map[string]interface{}) (interface{}, error)
type StdinPayload ¶
type StdinPayload struct {
// contains filtered or unexported fields
}
func (*StdinPayload) Acknowledge ¶
func (f *StdinPayload) Acknowledge() error
func (*StdinPayload) EndOfStream ¶
func (f *StdinPayload) EndOfStream() bool
func (*StdinPayload) Headers ¶
func (f *StdinPayload) Headers() map[string]interface{}
func (*StdinPayload) Items ¶
func (f *StdinPayload) Items() []*kodex.Item
func (*StdinPayload) Reject ¶
func (f *StdinPayload) Reject() error
type StdinReader ¶
type StdinReader struct { Reader *bufio.Reader GzReader *gzip.Reader Format string Compressed bool Headers map[string]interface{} ChunkSize int }
func (*StdinReader) MakeStdinPayload ¶
func (s *StdinReader) MakeStdinPayload() (*StdinPayload, error)
func (*StdinReader) Purge ¶
func (f *StdinReader) Purge() error
func (*StdinReader) Teardown ¶
func (s *StdinReader) Teardown() error
Click to show internal directories.
Click to hide internal directories.