Documentation ¶
Index ¶
- Constants
- Variables
- func BufWrites(rawWriters []io.Writer, function func([]io.Writer))
- func ChannelToLineWriter(wg *sync.WaitGroup, stat *pb.InstructionStat, name string, reader io.Reader, ...)
- func ChannelToWriter(wg *sync.WaitGroup, name string, reader io.Reader, writer io.WriteCloser, ...) error
- func CleanPath(path string) string
- func Compare(a interface{}, b interface{}) (ret int)
- func ConvertLineReaderToRowReader(lineReader io.Reader, name string, errorOutput io.Writer) (rowReader io.Reader)
- func CopyMultipleReaders(readers []io.Reader, writer io.Writer) (inCounter int64, outCounter int64, e error)
- func DownloadUrl(fileUrl string) (filename string, content []byte, e error)
- func EncodeKeys(anyObject ...interface{}) ([]byte, error)
- func Error(w http.ResponseWriter, r *http.Request, httpStatus int, obj string) (err error)
- func Execute(ctx context.Context, executeWaitGroup *sync.WaitGroup, ...) error
- func ExecuteWithCleanup(parentContext context.Context, onExecute func() error, onCleanup func()) error
- func Fprintf(writer io.Writer, reader io.Reader, format string) error
- func Get(url string) ([]byte, error)
- func GleamGrpcDial(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error)
- func Hash(bytes []byte) uint32
- func HashByKeys(data []interface{}) int
- func Json(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error)
- func LessThan(a interface{}, b interface{}) bool
- func LineReaderToChannel(wg *sync.WaitGroup, stat *pb.InstructionStat, name string, reader io.Reader, ...)
- func ListFiles(dir string, pattern string) (fileNames []string)
- func Now() (ts int64)
- func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error)
- func PartitionByKeys(shardCount int, data []interface{}) int
- func Post(url string, values url.Values) ([]byte, error)
- func PrintDelimited(stat *pb.InstructionStat, reader io.Reader, writer io.Writer, delimiter string, ...) error
- func ProcessMessage(reader io.Reader, f func([]byte) error) (err error)
- func ProcessRow(reader io.Reader, indexes []int, f func(*Row) error) (err error)
- func Range(from, to int) func(io.Writer, *pb.InstructionStat) error
- func ReadMessage(reader io.Reader) (m []byte, err error)
- func ReaderToChannel(wg *sync.WaitGroup, name string, readCloser io.ReadCloser, ...) error
- func Retry(fn func() error) error
- func TakeMessage(reader io.Reader, count int, f func([]byte) error) (err error)
- func TakeTsv(reader io.Reader, count int, f func([]string) error) (err error)
- func TimeDelayedRetry(fn func() error, waitTimes ...time.Duration) error
- func ToBytes(val interface{}) []byte
- func ToFloat64(val interface{}) float64
- func ToInt64(val interface{}) int64
- func ToString(val interface{}) string
- func TsvPrintf(writer io.Writer, reader io.Reader, format string) error
- func UserHomeDir() string
- func WriteEOFMessage(writer io.Writer) (err error)
- func WriteMessage(writer io.Writer, m []byte) (err error)
- type BufferedMessageWriter
- type Item
- type MessageControl
- type Piper
- type PriorityQueue
- func (pq *PriorityQueue) Dequeue() (interface{}, int)
- func (pq *PriorityQueue) Enqueue(x interface{}, sourceId int)
- func (pq *PriorityQueue) Len() int
- func (pq *PriorityQueue) Less(i, j int) bool
- func (pq *PriorityQueue) Pop() interface{}
- func (pq *PriorityQueue) Push(x interface{})
- func (pq *PriorityQueue) Swap(i, j int)
- func (pq *PriorityQueue) Top() interface{}
- type Row
- func (row *Row) AppendKey(objects ...interface{}) *Row
- func (row *Row) AppendValue(objects ...interface{}) *Row
- func (z *Row) DecodeMsg(dc *msgp.Reader) (err error)
- func (z *Row) EncodeMsg(en *msgp.Writer) (err error)
- func (z *Row) MarshalMsg(b []byte) (o []byte, err error)
- func (z *Row) Msgsize() (s int)
- func (z *Row) UnmarshalMsg(bts []byte) (o []byte, err error)
- func (z *Row) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)
- func (row *Row) UseKeys(indexes []int) (err error)
- func (row Row) WriteTo(writer io.Writer) (err error)
Constants ¶
const (
BUFFER_SIZE = 1024 * 512
)
const (
MessageControlEOF = MessageControl(math.MinInt32)
)
Variables ¶
var ( Transport *http.Transport SchemePrefix = "http://" )
Functions ¶
func BufWrites ¶
BufWrites ensures all writers are bufio.Writer For any bufio.Writer created here, flush it before returning.
func ChannelToLineWriter ¶
func ChannelToWriter ¶
func CopyMultipleReaders ¶
func CopyMultipleReaders(readers []io.Reader, writer io.Writer) (inCounter int64, outCounter int64, e error)
setup asynchronously to merge multiple channels into one channel
func EncodeKeys ¶
EncodeKeys encode keys to a blob, for comparing or sorting
func Execute ¶
func Execute(ctx context.Context, executeWaitGroup *sync.WaitGroup, stat *pb.InstructionStat, name string, command *exec.Cmd, reader io.Reader, writer io.Writer, prevIsPipe, isPipe bool, closeOutput bool, errWriter io.Writer) error
all data passing through pipe are all (size, msgpack_encoded) tuples The input and output should all be this msgpack format. Only the stdin and stdout of Pipe() is line based text.
func ExecuteWithCleanup ¶
func Fprintf ¶
Fprintf reads MessagePack encoded messages from reader, and formats according to a format specifier and writes to writer.
func GleamGrpcDial ¶
func GleamGrpcDial(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error)
func HashByKeys ¶
func HashByKeys(data []interface{}) int
func LineReaderToChannel ¶
func PartitionByKeys ¶
func PrintDelimited ¶
func PrintDelimited(stat *pb.InstructionStat, reader io.Reader, writer io.Writer, delimiter string, lineSperator string) error
PrintDelimited Reads and formats MessagePack encoded messages with delimiter and lineSeparator.
func ProcessMessage ¶
ProcessMessage Reads and processes MessagePack encoded messages until EOF
func ProcessRow ¶
ProcessRow Reads and processes rows until EOF
func ReadMessage ¶
ReadMessage reads out the []byte for one message
func ReaderToChannel ¶
func TakeMessage ¶
TakeMessage Reads and processes MessagePack encoded messages. If count is less than 0, all lines are processed.
func TakeTsv ¶
TakeTsv Reads and processes TSV lines. If count is less than 0, all lines are processed.
func TsvPrintf ¶
TsvPrintf reads TSV lines from reader, and formats according to a format specifier and writes to writer.
func UserHomeDir ¶
func UserHomeDir() string
func WriteEOFMessage ¶
Types ¶
type BufferedMessageWriter ¶
type BufferedMessageWriter struct {
// contains filtered or unexported fields
}
func NewBufferedMessageWriter ¶
func NewBufferedMessageWriter(w io.Writer, size int) *BufferedMessageWriter
func (*BufferedMessageWriter) Available ¶
func (b *BufferedMessageWriter) Available() int
func (*BufferedMessageWriter) Buffered ¶
func (b *BufferedMessageWriter) Buffered() int
func (*BufferedMessageWriter) Flush ¶
func (b *BufferedMessageWriter) Flush() error
func (*BufferedMessageWriter) WriteMessage ¶
func (b *BufferedMessageWriter) WriteMessage(m []byte) (err error)
type Item ¶
type Item struct {
// contains filtered or unexported fields
}
An Item is something we manage in a priority queue.
type MessageControl ¶
type MessageControl int32
type Piper ¶
type Piper struct { Reader *io.PipeReader Writer *io.PipeWriter Counter int64 Error error }
type PriorityQueue ¶
type PriorityQueue struct {
// contains filtered or unexported fields
}
A PriorityQueue implements heap.Interface and holds Items.
func NewPriorityQueue ¶
func NewPriorityQueue(lessFunc func(a, b interface{}) bool) *PriorityQueue
func (*PriorityQueue) Dequeue ¶
func (pq *PriorityQueue) Dequeue() (interface{}, int)
func (*PriorityQueue) Enqueue ¶
func (pq *PriorityQueue) Enqueue(x interface{}, sourceId int)
func (*PriorityQueue) Len ¶
func (pq *PriorityQueue) Len() int
func (*PriorityQueue) Less ¶
func (pq *PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
func (*PriorityQueue) Swap ¶
func (pq *PriorityQueue) Swap(i, j int)
func (*PriorityQueue) Top ¶
func (pq *PriorityQueue) Top() interface{}
type Row ¶
type Row struct { K []interface{} `msg:"K"` V []interface{} `msg:"V"` T int64 `msg:"T"` }
func (*Row) AppendValue ¶
func (*Row) DecodeMsg ¶
DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.
func (*Row) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
func (*Row) Msgsize ¶
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*Row) UnmarshalMsg ¶
UnmarshalMsg implements msgp.Unmarshaler