Documentation ¶
Index ¶
- Constants
- Variables
- func BufWrites(rawWriters []io.Writer, funciton func([]io.Writer))
- func ChannelToLineWriter(wg *sync.WaitGroup, name string, reader io.Reader, writer io.WriteCloser, ...)
- func ChannelToWriter(wg *sync.WaitGroup, name string, reader io.Reader, writer io.WriteCloser, ...) error
- func CleanPath(path string) string
- func Compare(a interface{}, b interface{}) (ret int)
- func CopyMultipleReaders(readers []io.Reader, writer io.Writer) error
- func DecodeRow(encodedBytes []byte) (objects []interface{}, err error)
- func DecodeRowKeys(encodedBytes []byte, indexes []int) (keys []interface{}, err error)
- func DecodeRowKeysValues(encodedBytes []byte, indexes []int) (keys, values []interface{}, err error)
- func DecodeRowTo(encodedBytes []byte, objects ...interface{}) error
- func DownloadUrl(fileUrl string) (filename string, content []byte, e error)
- func EncodeRow(anyObject ...interface{}) ([]byte, error)
- func Error(w http.ResponseWriter, r *http.Request, httpStatus int, obj string) (err error)
- func Execute(executeWaitGroup *sync.WaitGroup, name string, command *exec.Cmd, ...)
- func FprintRowsFromChannel(ch io.Reader, writer io.Writer, delimiter string, lineSperator string) error
- func Fprintf(inChan io.Reader, writer io.Writer, format string) error
- func Get(url string) ([]byte, error)
- func Hash(bytes []byte) uint32
- func HashByKeys(data []interface{}) int
- func Json(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error)
- func LessThan(a interface{}, b interface{}) bool
- func LineReaderToChannel(wg *sync.WaitGroup, name string, reader io.Reader, ch io.WriteCloser, ...)
- func LinkChannel(wg *sync.WaitGroup, inChan, outChan chan []byte)
- func ListFiles(dir string, pattern string) (fileNames []string)
- func PartitionByKeys(shardCount int, data []interface{}) int
- func Post(url string, values url.Values) ([]byte, error)
- func ProcessMessage(reader io.Reader, f func([]byte) error) (err error)
- func Range(from, to int) func(io.Writer) error
- func ReadMessage(reader io.Reader) (m []byte, err error)
- func ReadRow(ch io.Reader) (row []interface{}, err error)
- func ReaderToChannel(wg *sync.WaitGroup, name string, reader io.ReadCloser, writer io.WriteCloser, ...) error
- func Retry(fn func() error) error
- func TakeMessage(reader io.Reader, count int, f func([]byte) error) (err error)
- func TakeTsv(reader io.Reader, count int, f func([]string) error) (err error)
- func TimeDelayedRetry(fn func() error, waitTimes ...time.Duration) error
- func TsvPrintf(inChan io.Reader, writer io.Writer, format string) error
- func UserHomeDir() string
- func WriteEOFMessage(writer io.Writer) (err error)
- func WriteMessage(writer io.Writer, m []byte) (err error)
- func WriteRow(outChan io.Writer, anyObject ...interface{}) error
- type BufferedMessageWriter
- type ChannelStatus
- type ExecutorStatus
- type Item
- type Piper
- type PriorityQueue
- func (pq *PriorityQueue) Dequeue() (interface{}, int)
- func (pq *PriorityQueue) Enqueue(x interface{}, sourceId int)
- func (pq *PriorityQueue) Len() int
- func (pq *PriorityQueue) Less(i, j int) bool
- func (pq *PriorityQueue) Pop() interface{}
- func (pq *PriorityQueue) Push(x interface{})
- func (pq *PriorityQueue) Swap(i, j int)
- func (pq *PriorityQueue) Top() interface{}
Constants ¶
View Source
const (
BUFFER_SIZE = 1024 * 512
)
Variables ¶
View Source
var ( Transport *http.Transport SchemePrefix = "http://" )
Functions ¶
func BufWrites ¶
BufWrites ensures all writers are bufio.Writer For any bufio.Writer created here, flush it before returning.
func ChannelToLineWriter ¶
func ChannelToWriter ¶
func CopyMultipleReaders ¶
setup asynchronously to merge multiple channels into one channel
func DecodeRowKeys ¶
DecodeRowKeys decode key fields by index[], starting from 1
func DecodeRowKeysValues ¶
func DecodeRowKeysValues(encodedBytes []byte, indexes []int) (keys, values []interface{}, err error)
DecodeRowKeysValues decode a row of data, with the indexes[] specified fields as key fields and the rest of fields as value fields
func DecodeRowTo ¶
func Execute ¶
func Execute(executeWaitGroup *sync.WaitGroup, name string, command *exec.Cmd, reader io.Reader, writer io.Writer, prevIsPipe, isPipe bool, closeOutput bool, errWriter io.Writer)
all data passing through pipe are all (size, msgpack_encoded) tuples The input and output should all be this msgpack format. Only the stdin and stdout of Pipe() is line based text.
func FprintRowsFromChannel ¶
func HashByKeys ¶
func HashByKeys(data []interface{}) int
func LineReaderToChannel ¶
func LinkChannel ¶
func PartitionByKeys ¶
func ReaderToChannel ¶
func UserHomeDir ¶
func UserHomeDir() string
func WriteEOFMessage ¶
Types ¶
type BufferedMessageWriter ¶
type BufferedMessageWriter struct {
// contains filtered or unexported fields
}
func NewBufferedMessageWriter ¶
func NewBufferedMessageWriter(w io.Writer, size int) *BufferedMessageWriter
func (*BufferedMessageWriter) Available ¶
func (b *BufferedMessageWriter) Available() int
func (*BufferedMessageWriter) Buffered ¶
func (b *BufferedMessageWriter) Buffered() int
func (*BufferedMessageWriter) Flush ¶
func (b *BufferedMessageWriter) Flush() error
func (*BufferedMessageWriter) WriteMessage ¶
func (b *BufferedMessageWriter) WriteMessage(m []byte) (err error)
type ChannelStatus ¶
func NewChannelStatus ¶
func NewChannelStatus() *ChannelStatus
func (*ChannelStatus) ReportAdd ¶
func (s *ChannelStatus) ReportAdd(delta int)
func (*ChannelStatus) ReportClose ¶
func (s *ChannelStatus) ReportClose()
func (*ChannelStatus) ReportStart ¶
func (s *ChannelStatus) ReportStart()
type ExecutorStatus ¶
type ExecutorStatus struct { InputChannelStatuses []*ChannelStatus OutputChannelStatuses []*ChannelStatus RequestTime time.Time StartTime time.Time StopTime time.Time }
func (*ExecutorStatus) IsClosed ¶
func (s *ExecutorStatus) IsClosed() bool
func (*ExecutorStatus) TimeTaken ¶
func (s *ExecutorStatus) TimeTaken() time.Duration
type Item ¶
type Item struct {
// contains filtered or unexported fields
}
An Item is something we manage in a priority queue.
type Piper ¶
type Piper struct { Reader *io.PipeReader Writer *io.PipeWriter Counter int64 Error error }
type PriorityQueue ¶
type PriorityQueue struct {
// contains filtered or unexported fields
}
A PriorityQueue implements heap.Interface and holds Items.
func NewPriorityQueue ¶
func NewPriorityQueue(lessFunc func(a, b interface{}) bool) *PriorityQueue
func (*PriorityQueue) Dequeue ¶
func (pq *PriorityQueue) Dequeue() (interface{}, int)
func (*PriorityQueue) Enqueue ¶
func (pq *PriorityQueue) Enqueue(x interface{}, sourceId int)
func (*PriorityQueue) Len ¶
func (pq *PriorityQueue) Len() int
func (*PriorityQueue) Less ¶
func (pq *PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
func (*PriorityQueue) Swap ¶
func (pq *PriorityQueue) Swap(i, j int)
func (*PriorityQueue) Top ¶
func (pq *PriorityQueue) Top() interface{}
Source Files ¶
Click to show internal directories.
Click to hide internal directories.