Documentation ¶
Index ¶
- type LineReader
- type Stream
- func (s *Stream) Bytes() ([]byte, error)
- func (s *Stream) Lines() ([]string, error)
- func (s *Stream) Read(p []byte) (int, error)
- func (s *Stream) Stream(dst func(line string)) error
- func (s *Stream) StreamBytes(dst func(line []byte) error) error
- func (s *Stream) String() (string, error)
- func (s *Stream) WithLineSeparator(separator byte) *Stream
- func (s *Stream) WithPipeline(p pipeline.Pipeline) *Stream
- func (s *Stream) WriteTo(dst io.Writer) (int64, error)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type LineReader ¶ added in v0.9.0
type LineReader interface { // ReadSlice should have the behaviour of (*bufio.Reader).ReadSlice. ReadSlice(delim byte) ([]byte, error) io.WriterTo io.Reader }
LineReader is a reader that implements the ability to read up to a line delimiter. It is used for internal assertion only, to determine if io.Readers provided to streamline.New already implement the desired functionality - it is exported for reference, and should not be depended upon, since the interface may change in the future.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream enables live, line-by-line manipulation and handling of data through (*Stream).WithPipeline(...) and Stream's various aggregation methods. Stream also supports standard library features like io.Copy and io.ReadAll.
Stream's aggregation methods ((*Stream).Stream(...), (*Stream).Lines(...), etc) may only be used once. Incremental consumers like (*Stream).Read(...) may need to be called multiple times to consume all data but should not be used in conjunction with other methods.
Example (Jq) ¶
package main import ( "bytes" "fmt" "strings" "go.bobheadxi.dev/streamline" "go.bobheadxi.dev/streamline/jq" "go.bobheadxi.dev/streamline/pipeline" ) func main() { data := strings.NewReader(`Loading... Still loading... { "message": "this is the real data!" }`) stream := streamline.New(data). // Pipeline to discard loading indicators WithPipeline(pipeline.Filter(func(line []byte) bool { return !bytes.Contains(line, []byte("...")) })) // stream is just an io.Reader message, _ := jq.Query(stream, ".message") fmt.Println(string(message)) }
Output: "this is the real data!"
Example (Streamexec) ¶
package main import ( "fmt" "os/exec" "go.bobheadxi.dev/streamline/streamexec" ) func main() { cmd := exec.Command("echo", "hello world\nthis is a line\nand another line!") stream, _ := streamexec.Start(cmd, streamexec.Combined) _ = stream.Stream(func(line string) { fmt.Println("received output:", line) }) }
Output: received output: hello world received output: this is a line received output: and another line!
func New ¶
New creates a Stream that consumes, processes, and emits data from the input. If the input also implements LineReader, then it will use the input directly - otherwise, it will wrap the input in a bufio.Reader.
func (*Stream) Bytes ¶
Bytes collects all processed output as a bytes slice.
This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.
func (*Stream) Lines ¶
Lines collects all processed output as a slice of strings.
This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.
func (*Stream) Read ¶
Read populates p with processed data. It allows Stream to effectively be compatible with anything that accepts an io.Reader.
func (*Stream) Stream ¶
Stream passes lines read from the input to the handler as it processes them. It is intended for simple use cases - to be able to provide errors from the line handler, use StreamBytes instead.
This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.
func (*Stream) StreamBytes ¶
StreamBytes passes lines read from the input to the handler as it processes them, and allows the handler to return an error.
This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.
Handlers must not retain line.
func (*Stream) String ¶
String collects all processed output as a string.
This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.
func (*Stream) WithLineSeparator ¶ added in v0.7.0
WithLineSeparator configures a custom line separator for this stream. The default is '\n'.
func (*Stream) WithPipeline ¶
WithPipeline configures this Stream to process the input data with the given Pipeline in all output methods ((*Stream).Stream(...), (*Stream).Lines(...), io.Copy, etc.).
If one or more Pipelines are already configured on this Stream, the given Pipeline is applied sequentially after the preconfigured pipelines.
Example ¶
package main import ( "fmt" "strconv" "strings" "go.bobheadxi.dev/streamline" "go.bobheadxi.dev/streamline/pipeline" ) func main() { data := strings.NewReader("3\n4\n4.8\n7\n5\n2") lines, _ := streamline.New(data). // Pipeline to discard even and non-integer numbers WithPipeline(pipeline.Filter(func(line []byte) bool { v, err := strconv.Atoi(string(line)) return err == nil && v%2 == 0 })). Lines() fmt.Println(lines) }
Output: [4 2]
Example (Jq) ¶
package main import ( "bytes" "fmt" "strings" "go.bobheadxi.dev/streamline" "go.bobheadxi.dev/streamline/jq" "go.bobheadxi.dev/streamline/pipeline" ) func main() { data := strings.NewReader(`Loading... Still loading... {"message": "hello"} {"message":"world"} {"message":"robert"}`) lines, _ := streamline.New(data). WithPipeline(pipeline.MultiPipeline{ // Pipeline to discard non-JSON lines pipeline.Filter(func(line []byte) bool { return bytes.HasPrefix(line, []byte{'{'}) }), // Execute JQ query for each line jq.Pipeline(".message"), }). Lines() fmt.Println(lines) }
Output: ["hello" "world" "robert"]
Directories ¶
Path | Synopsis |
---|---|
Package jq provides compatibility with JQ queries for working with streams of JSON data.
|
Package jq provides compatibility with JQ queries for working with streams of JSON data. |
Package pipe provides implementations of unbounded and in-memory pipes, which provides a writer that the caller can use to collect data and a streamline.Stream instance that can be used to consume the data.
|
Package pipe provides implementations of unbounded and in-memory pipes, which provides a writer that the caller can use to collect data and a streamline.Stream instance that can be used to consume the data. |
Package pipeline provides simple implementations of Pipeline for use in pre-processing streamline.Stream output.
|
Package pipeline provides simple implementations of Pipeline for use in pre-processing streamline.Stream output. |
Package streamexec provides integrations with os/exec to stream command output.
|
Package streamexec provides integrations with os/exec to stream command output. |