util

package
v0.0.0-...-a4aaf8c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2024 License: Apache-2.0 Imports: 26 Imported by: 172

Documentation

Index

Constants

View Source
const (
	BUFFER_SIZE = 1024 * 512
)
View Source
const (
	MessageControlEOF = MessageControl(math.MinInt32)
)

Variables

View Source
var (
	Transport    *http.Transport
	SchemePrefix = "http://"
)

Functions

func BufWrites

func BufWrites(rawWriters []io.Writer, function func([]io.Writer))

BufWrites ensures all writers are bufio.Writer For any bufio.Writer created here, flush it before returning.

func ChannelToLineWriter

func ChannelToLineWriter(wg *sync.WaitGroup, stat *pb.InstructionStat, name string, reader io.Reader, writer io.WriteCloser, errorOutput io.Writer)

func ChannelToWriter

func ChannelToWriter(wg *sync.WaitGroup, name string, reader io.Reader, writer io.WriteCloser, errorOutput io.Writer) error

func CleanPath

func CleanPath(path string) string

func Compare

func Compare(a interface{}, b interface{}) (ret int)

func ConvertLineReaderToRowReader

func ConvertLineReaderToRowReader(lineReader io.Reader, name string, errorOutput io.Writer) (rowReader io.Reader)

func CopyMultipleReaders

func CopyMultipleReaders(readers []io.Reader, writer io.Writer) (inCounter int64, outCounter int64, e error)

setup asynchronously to merge multiple channels into one channel

func DownloadUrl

func DownloadUrl(fileUrl string) (filename string, content []byte, e error)

func EncodeKeys

func EncodeKeys(anyObject ...interface{}) ([]byte, error)

EncodeKeys encode keys to a blob, for comparing or sorting

func Error

func Error(w http.ResponseWriter, r *http.Request, httpStatus int, obj string) (err error)

func Execute

func Execute(ctx context.Context, executeWaitGroup *sync.WaitGroup, stat *pb.InstructionStat,
	name string, command *exec.Cmd,
	reader io.Reader, writer io.Writer, prevIsPipe, isPipe bool, closeOutput bool,
	errWriter io.Writer) error

all data passing through pipe are all (size, msgpack_encoded) tuples The input and output should all be this msgpack format. Only the stdin and stdout of Pipe() is line based text.

func ExecuteWithCleanup

func ExecuteWithCleanup(parentContext context.Context, onExecute func() error, onCleanup func()) error

func Fprintf

func Fprintf(writer io.Writer, reader io.Reader, format string) error

Fprintf reads MessagePack encoded messages from reader, and formats according to a format specifier and writes to writer.

func Get

func Get(url string) ([]byte, error)

func GleamGrpcDial

func GleamGrpcDial(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error)

func Hash

func Hash(bytes []byte) uint32

func HashByKeys

func HashByKeys(data []interface{}) int

func Json

func Json(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error)

func LessThan

func LessThan(a interface{}, b interface{}) bool

func LineReaderToChannel

func LineReaderToChannel(wg *sync.WaitGroup, stat *pb.InstructionStat, name string, reader io.Reader, ch io.WriteCloser, closeOutput bool, errorOutput io.Writer)

func ListFiles

func ListFiles(dir string, pattern string) (fileNames []string)

func Now

func Now() (ts int64)

func ParseServerToGrpcAddress

func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error)

func PartitionByKeys

func PartitionByKeys(shardCount int, data []interface{}) int

func Post

func Post(url string, values url.Values) ([]byte, error)

func PrintDelimited

func PrintDelimited(stat *pb.InstructionStat, reader io.Reader, writer io.Writer, delimiter string, lineSperator string) error

PrintDelimited Reads and formats MessagePack encoded messages with delimiter and lineSeparator.

func ProcessMessage

func ProcessMessage(reader io.Reader, f func([]byte) error) (err error)

ProcessMessage Reads and processes MessagePack encoded messages until EOF

func ProcessRow

func ProcessRow(reader io.Reader, indexes []int, f func(*Row) error) (err error)

ProcessRow Reads and processes rows until EOF

func Range

func Range(from, to int) func(io.Writer, *pb.InstructionStat) error

func ReadMessage

func ReadMessage(reader io.Reader) (m []byte, err error)

ReadMessage reads out the []byte for one message

func ReaderToChannel

func ReaderToChannel(wg *sync.WaitGroup, name string, readCloser io.ReadCloser, writer io.WriteCloser, closeOutput bool, errorOutput io.Writer) error

func Retry

func Retry(fn func() error) error

func TakeMessage

func TakeMessage(reader io.Reader, count int, f func([]byte) error) (err error)

TakeMessage Reads and processes MessagePack encoded messages. If count is less than 0, all lines are processed.

func TakeTsv

func TakeTsv(reader io.Reader, count int, f func([]string) error) (err error)

TakeTsv Reads and processes TSV lines. If count is less than 0, all lines are processed.

func TimeDelayedRetry

func TimeDelayedRetry(fn func() error, waitTimes ...time.Duration) error

func ToBytes

func ToBytes(val interface{}) []byte

func ToFloat64

func ToFloat64(val interface{}) float64

func ToInt64

func ToInt64(val interface{}) int64

func ToString

func ToString(val interface{}) string

func TsvPrintf

func TsvPrintf(writer io.Writer, reader io.Reader, format string) error

TsvPrintf reads TSV lines from reader, and formats according to a format specifier and writes to writer.

func UserHomeDir

func UserHomeDir() string

func WriteEOFMessage

func WriteEOFMessage(writer io.Writer) (err error)

func WriteMessage

func WriteMessage(writer io.Writer, m []byte) (err error)

Types

type BufferedMessageWriter

type BufferedMessageWriter struct {
	// contains filtered or unexported fields
}

func NewBufferedMessageWriter

func NewBufferedMessageWriter(w io.Writer, size int) *BufferedMessageWriter

func (*BufferedMessageWriter) Available

func (b *BufferedMessageWriter) Available() int

func (*BufferedMessageWriter) Buffered

func (b *BufferedMessageWriter) Buffered() int

func (*BufferedMessageWriter) Flush

func (b *BufferedMessageWriter) Flush() error

func (*BufferedMessageWriter) WriteMessage

func (b *BufferedMessageWriter) WriteMessage(m []byte) (err error)

type Item

type Item struct {
	// contains filtered or unexported fields
}

An Item is something we manage in a priority queue.

type MessageControl

type MessageControl int32

type Piper

type Piper struct {
	Reader  *io.PipeReader
	Writer  *io.PipeWriter
	Counter int64
	Error   error
}

func NewPiper

func NewPiper() *Piper

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

A PriorityQueue implements heap.Interface and holds Items.

func NewPriorityQueue

func NewPriorityQueue(lessFunc func(a, b interface{}) bool) *PriorityQueue

func (*PriorityQueue) Dequeue

func (pq *PriorityQueue) Dequeue() (interface{}, int)

func (*PriorityQueue) Enqueue

func (pq *PriorityQueue) Enqueue(x interface{}, sourceId int)

func (*PriorityQueue) Len

func (pq *PriorityQueue) Len() int

func (*PriorityQueue) Less

func (pq *PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

func (*PriorityQueue) Swap

func (pq *PriorityQueue) Swap(i, j int)

func (*PriorityQueue) Top

func (pq *PriorityQueue) Top() interface{}

type Row

type Row struct {
	K []interface{} `msg:"K"`
	V []interface{} `msg:"V"`
	T int64         `msg:"T"`
}

func DecodeRow

func DecodeRow(encodedBytes []byte) (*Row, error)

DecodeRow decodes one row of data from a blob

func NewRow

func NewRow(timestamp int64, objects ...interface{}) *Row

func ReadRow

func ReadRow(reader io.Reader) (row *Row, err error)

ReadRow read and decode one row of data

func (*Row) AppendKey

func (row *Row) AppendKey(objects ...interface{}) *Row

func (*Row) AppendValue

func (row *Row) AppendValue(objects ...interface{}) *Row

func (*Row) DecodeMsg

func (z *Row) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable We treat empty fields as if we read a Nil from the wire.

func (*Row) EncodeMsg

func (z *Row) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*Row) MarshalMsg

func (z *Row) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Row) Msgsize

func (z *Row) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Row) UnmarshalMsg

func (z *Row) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

func (*Row) UnmarshalMsgWithCfg

func (z *Row) UnmarshalMsgWithCfg(bts []byte, cfg *msgp.RuntimeConfig) (o []byte, err error)

func (*Row) UseKeys

func (row *Row) UseKeys(indexes []int) (err error)

UseKeys use the indexes[] specified fields as key fields and the rest of fields as value fields

func (Row) WriteTo

func (row Row) WriteTo(writer io.Writer) (err error)

WriteTo encode and write a row of data to the writer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL