util

package
v0.0.0-...-624a38e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2016 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BUFFER_SIZE = 1024 * 512
)

Variables

View Source
var (
	Transport    *http.Transport
	SchemePrefix = "http://"
)

Functions

func BufWrites

func BufWrites(rawWriters []io.Writer, funciton func([]io.Writer))

BufWrites ensures all writers are bufio.Writer For any bufio.Writer created here, flush it before returning.

func ChannelToLineWriter

func ChannelToLineWriter(wg *sync.WaitGroup, name string, reader io.Reader, writer io.WriteCloser, errorOutput io.Writer)

func ChannelToWriter

func ChannelToWriter(wg *sync.WaitGroup, name string, reader io.Reader, writer io.WriteCloser, errorOutput io.Writer) error

func CleanPath

func CleanPath(path string) string

func Compare

func Compare(a interface{}, b interface{}) (ret int)

func CopyMultipleReaders

func CopyMultipleReaders(readers []io.Reader, writer io.Writer) error

setup asynchronously to merge multiple channels into one channel

func DecodeRow

func DecodeRow(encodedBytes []byte) (objects []interface{}, err error)

EncodeRow decode one row of data from a blob

func DecodeRowKeys

func DecodeRowKeys(encodedBytes []byte, indexes []int) (keys []interface{}, err error)

DecodeRowKeys decode key fields by index[], starting from 1

func DecodeRowKeysValues

func DecodeRowKeysValues(encodedBytes []byte, indexes []int) (keys, values []interface{}, err error)

DecodeRowKeysValues decode a row of data, with the indexes[] specified fields as key fields and the rest of fields as value fields

func DecodeRowTo

func DecodeRowTo(encodedBytes []byte, objects ...interface{}) error

func DownloadUrl

func DownloadUrl(fileUrl string) (filename string, content []byte, e error)

func EncodeRow

func EncodeRow(anyObject ...interface{}) ([]byte, error)

EncodeRow encode one row of data to a blob

func Error

func Error(w http.ResponseWriter, r *http.Request, httpStatus int, obj string) (err error)

func Execute

func Execute(executeWaitGroup *sync.WaitGroup, name string, command *exec.Cmd,
	reader io.Reader, writer io.Writer, prevIsPipe, isPipe bool, closeOutput bool, errWriter io.Writer)

all data passing through pipe are all (size, msgpack_encoded) tuples The input and output should all be this msgpack format. Only the stdin and stdout of Pipe() is line based text.

func FprintRowsFromChannel

func FprintRowsFromChannel(ch io.Reader, writer io.Writer, delimiter string, lineSperator string) error

func Fprintf

func Fprintf(inChan io.Reader, writer io.Writer, format string) error

func Get

func Get(url string) ([]byte, error)

func Hash

func Hash(bytes []byte) uint32

func HashByKeys

func HashByKeys(data []interface{}) int

func Json

func Json(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error)

func LessThan

func LessThan(a interface{}, b interface{}) bool

func LineReaderToChannel

func LineReaderToChannel(wg *sync.WaitGroup, name string, reader io.Reader, ch io.WriteCloser, closeOutput bool, errorOutput io.Writer)

func LinkChannel

func LinkChannel(wg *sync.WaitGroup, inChan, outChan chan []byte)

func ListFiles

func ListFiles(dir string, pattern string) (fileNames []string)

func PartitionByKeys

func PartitionByKeys(shardCount int, data []interface{}) int

func Post

func Post(url string, values url.Values) ([]byte, error)

func ProcessMessage

func ProcessMessage(reader io.Reader, f func([]byte) error) (err error)

func Range

func Range(from, to int) func(io.Writer) error

func ReadMessage

func ReadMessage(reader io.Reader) (m []byte, err error)

func ReadRow

func ReadRow(ch io.Reader) (row []interface{}, err error)

ReadRow read and decode one row of data

func ReaderToChannel

func ReaderToChannel(wg *sync.WaitGroup, name string, reader io.ReadCloser, writer io.WriteCloser, closeOutput bool, errorOutput io.Writer) error

func Retry

func Retry(fn func() error) error

func TakeMessage

func TakeMessage(reader io.Reader, count int, f func([]byte) error) (err error)

func TakeTsv

func TakeTsv(reader io.Reader, count int, f func([]string) error) (err error)

func TimeDelayedRetry

func TimeDelayedRetry(fn func() error, waitTimes ...time.Duration) error

func TsvPrintf

func TsvPrintf(inChan io.Reader, writer io.Writer, format string) error

func UserHomeDir

func UserHomeDir() string

func WriteEOFMessage

func WriteEOFMessage(writer io.Writer) (err error)

func WriteMessage

func WriteMessage(writer io.Writer, m []byte) (err error)

func WriteRow

func WriteRow(outChan io.Writer, anyObject ...interface{}) error

WriteRow encode and write a row of data

Types

type BufferedMessageWriter

type BufferedMessageWriter struct {
	// contains filtered or unexported fields
}

func NewBufferedMessageWriter

func NewBufferedMessageWriter(w io.Writer, size int) *BufferedMessageWriter

func (*BufferedMessageWriter) Available

func (b *BufferedMessageWriter) Available() int

func (*BufferedMessageWriter) Buffered

func (b *BufferedMessageWriter) Buffered() int

func (*BufferedMessageWriter) Flush

func (b *BufferedMessageWriter) Flush() error

func (*BufferedMessageWriter) WriteMessage

func (b *BufferedMessageWriter) WriteMessage(m []byte) (err error)

type ChannelStatus

type ChannelStatus struct {
	Length    int64
	StartTime time.Time
	StopTime  time.Time
	Name      string
}

func NewChannelStatus

func NewChannelStatus() *ChannelStatus

func (*ChannelStatus) ReportAdd

func (s *ChannelStatus) ReportAdd(delta int)

func (*ChannelStatus) ReportClose

func (s *ChannelStatus) ReportClose()

func (*ChannelStatus) ReportStart

func (s *ChannelStatus) ReportStart()

type ExecutorStatus

type ExecutorStatus struct {
	InputChannelStatuses  []*ChannelStatus
	OutputChannelStatuses []*ChannelStatus
	RequestTime           time.Time
	StartTime             time.Time
	StopTime              time.Time
}

func (*ExecutorStatus) IsClosed

func (s *ExecutorStatus) IsClosed() bool

func (*ExecutorStatus) TimeTaken

func (s *ExecutorStatus) TimeTaken() time.Duration

type Item

type Item struct {
	// contains filtered or unexported fields
}

An Item is something we manage in a priority queue.

type Piper

type Piper struct {
	Reader  *io.PipeReader
	Writer  *io.PipeWriter
	Counter int64
	Error   error
}

func NewPiper

func NewPiper() *Piper

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

A PriorityQueue implements heap.Interface and holds Items.

func NewPriorityQueue

func NewPriorityQueue(lessFunc func(a, b interface{}) bool) *PriorityQueue

func (*PriorityQueue) Dequeue

func (pq *PriorityQueue) Dequeue() (interface{}, int)

func (*PriorityQueue) Enqueue

func (pq *PriorityQueue) Enqueue(x interface{}, sourceId int)

func (*PriorityQueue) Len

func (pq *PriorityQueue) Len() int

func (*PriorityQueue) Less

func (pq *PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

func (*PriorityQueue) Swap

func (pq *PriorityQueue) Swap(i, j int)

func (*PriorityQueue) Top

func (pq *PriorityQueue) Top() interface{}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL