Documentation ¶
Overview ¶
Example (Jq_Pipeline) ¶
package main import ( "bytes" "fmt" "strings" "go.bobheadxi.dev/streamline" "go.bobheadxi.dev/streamline/jq" "go.bobheadxi.dev/streamline/pipeline" ) func main() { data := strings.NewReader(`Loading... Still loading... {"message": "hello"} {"message":"world"} {"message":"robert"}`) lines, _ := streamline.New(data). WithPipeline(pipeline.MultiPipeline{ // Pipeline to discard non-JSON lines pipeline.Filter(func(line []byte) bool { return bytes.HasPrefix(line, []byte{'{'}) }), // Execute JQ query for each line jq.Pipeline(".message"), }). Lines() fmt.Println(lines) }
Output: ["hello" "world" "robert"]
Example (Streamexec_Attach) ¶
package main import ( "fmt" "os/exec" "strings" "go.bobheadxi.dev/streamline/streamexec" ) func main() { cmd := exec.Command("echo", "hello world\nthis is a line\nand another line!") stream, _ := streamexec.Attach(cmd, streamexec.Combined).Start() _ = stream.Stream(func(line string) error { if !strings.Contains(line, "hello") { fmt.Println("received output:", line) } return nil }) }
Output: received output: this is a line received output: and another line!
Example (Streamexec_JQQuery) ¶
package main import ( "bytes" "fmt" "strings" "go.bobheadxi.dev/streamline" "go.bobheadxi.dev/streamline/jq" "go.bobheadxi.dev/streamline/pipeline" ) func main() { data := strings.NewReader(`Loading... Still loading... { "message": "this is the real data!" }`) stream := streamline.New(data). // Pipeline to discard loading indicators WithPipeline(pipeline.Filter(func(line []byte) bool { return !bytes.Contains(line, []byte("...")) })) message, _ := jq.Query(stream, ".message") fmt.Println(string(message)) }
Output: "this is the real data!"
Index ¶
- type LineHandler
- type Stream
- func (o *Stream) Bytes() ([]byte, error)
- func (o *Stream) Lines() ([]string, error)
- func (o *Stream) Read(p []byte) (int, error)
- func (o *Stream) Stream(dst LineHandler[string]) error
- func (o *Stream) StreamBytes(dst LineHandler[[]byte]) error
- func (o *Stream) String() (string, error)
- func (s *Stream) WithPipeline(p pipeline.Pipeline) *Stream
- func (o *Stream) WriteTo(dst io.Writer) (int64, error)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type LineHandler ¶
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream enables live, line-by-line manipulation and handling of data through (*Stream).WithPipeline(...) and Stream's various aggregation methods. Stream also supports standard library features like io.Copy and io.ReadAll.
Stream's aggregation methods ((*Stream).Stream(...), (*Stream).Lines(...), etc) may only be used once. Incremental consumers like (*Stream).Read(...) may need to be called multiple times to consume all data but should not be used in conjunction with other methods.
func (*Stream) Bytes ¶
Bytes collects all processed output as a bytes slice.
This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.
func (*Stream) Lines ¶
Lines collects all processed output as a slice of strings.
This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.
func (*Stream) Read ¶
Read populates p with processed data. It allows Stream to effectively be compatible with anything that accepts an io.Reader.
WARNING: This implementation is currently VERY inefficient with Pipeline configured. Prefer to avoid calling Read() by calling WriteTo() instead, e.g. via io.Copy instead of io.ReadAll.
func (*Stream) Stream ¶
func (o *Stream) Stream(dst LineHandler[string]) error
Stream passes lines read from the input to the handler as it processes them.
This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.
func (*Stream) StreamBytes ¶
func (o *Stream) StreamBytes(dst LineHandler[[]byte]) error
StreamBytes passes lines read from the input to the handler as it processes them.
This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.
func (*Stream) String ¶
String collects all processed output as a string.
This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.
func (*Stream) WithPipeline ¶
WithPipeline configures this Stream to process the input data with the given Pipeline in all output methods ((*Stream).Stream(...), (*Stream).Lines(...), io.Copy, etc.).
If you want to use multiple Pipelines, you can provide a pipeline.MultiPipeline.
Example ¶
package main import ( "fmt" "strconv" "strings" "go.bobheadxi.dev/streamline" "go.bobheadxi.dev/streamline/pipeline" ) func main() { data := strings.NewReader(`3 4 4.8 7 5 2`) lines, _ := streamline.New(data). // Pipeline to discard even and non-integer numbers WithPipeline(pipeline.Filter(func(line []byte) bool { v, err := strconv.Atoi(string(line)) return err == nil && v%2 == 0 })). Lines() fmt.Println(lines) }
Output: [4 2]