streamline

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2023 License: MIT Imports: 6 Imported by: 1

README

streamline go reference Sourcegraph

pipeline codecov Go Report Card benchmarks

Handle your data, line by line.

go get go.bobheadxi.dev/streamline

This package is currently experimental, and the API may change.

Overview

When working with data streams in Go, you typically get an io.Reader, which is great for arbitrary data - but in many cases, especially when scripting, it's common to end up with data and outputs that are structured line-by-line. Typically, you might set up an bufio.Reader or bufio.Scanner to read data line by line, but for cases like exec.Cmd you will also need boilerplate to configure the command and set up pipes.

streamline offers a variety of primitives that aim to make working with data line by line a breeze:

  • streamline.Stream offers the ability to add hooks that handle an io.Reader line-by-line with (*Stream).Stream(LineHandler[string]) and (*Stream).StreamBytes(LineHandler[[]byte]).
  • pipeline.Pipeline offers a way to build pipelines that transform the data in a streamline.Stream, such as cleaning and mapping data.
    • jq.Pipeline can be used to map every line to the output of a JQ query, for example.
  • pipe.NewStream offers a way to create a buffered pipe between a writer and a Stream.
    • Package streamexec uses this to attach a Stream to an exec.Cmd.

Background

Some of the ideas in this package started in sourcegraph/run, where we were trying to build utilities that made it easier to write bash-esque scripts using Go, namely being able to do things you would often to in scripts such as grepping and iterating over lines.

Documentation

Overview

Example (Jq_Pipeline)
package main

import (
	"bytes"
	"fmt"
	"strings"

	"go.bobheadxi.dev/streamline"
	"go.bobheadxi.dev/streamline/jq"
	"go.bobheadxi.dev/streamline/pipeline"
)

func main() {
	data := strings.NewReader(`Loading...
Still loading...
{"message": "hello"}
{"message":"world"}
{"message":"robert"}`)

	lines, _ := streamline.New(data).
		WithPipeline(pipeline.MultiPipeline{
			// Pipeline to discard non-JSON lines
			pipeline.Filter(func(line []byte) bool {
				return bytes.HasPrefix(line, []byte{'{'})
			}),
			// Execute JQ query for each line
			jq.Pipeline(".message"),
		}).
		Lines()
	fmt.Println(lines)
}
Output:

["hello" "world" "robert"]
Example (Streamexec_Attach)
package main

import (
	"fmt"
	"os/exec"
	"strings"

	"go.bobheadxi.dev/streamline/streamexec"
)

func main() {
	cmd := exec.Command("echo", "hello world\nthis is a line\nand another line!")

	stream, _ := streamexec.Attach(cmd, streamexec.Combined).Start()
	_ = stream.Stream(func(line string) error {
		if !strings.Contains(line, "hello") {
			fmt.Println("received output:", line)
		}
		return nil
	})
}
Output:

received output: this is a line
received output: and another line!
Example (Streamexec_JQQuery)
package main

import (
	"bytes"
	"fmt"
	"strings"

	"go.bobheadxi.dev/streamline"
	"go.bobheadxi.dev/streamline/jq"
	"go.bobheadxi.dev/streamline/pipeline"
)

func main() {
	data := strings.NewReader(`Loading...
Still loading...
{
	"message": "this is the real data!"
}`)

	stream := streamline.New(data).
		// Pipeline to discard loading indicators
		WithPipeline(pipeline.Filter(func(line []byte) bool {
			return !bytes.Contains(line, []byte("..."))
		}))

	message, _ := jq.Query(stream, ".message")
	fmt.Println(string(message))
}
Output:

"this is the real data!"

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type LineHandler

type LineHandler[T string | []byte] func(line T) error

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream enables live, line-by-line manipulation and handling of data through (*Stream).WithPipeline(...) and Stream's various aggregation methods. Stream also supports standard library features like io.Copy and io.ReadAll.

Stream's aggregation methods ((*Stream).Stream(...), (*Stream).Lines(...), etc) may only be used once. Incremental consumers like (*Stream).Read(...) may need to be called multiple times to consume all data but should not be used in conjunction with other methods.

func New

func New(input io.Reader) *Stream

New creates a Stream that consumes, processes, and emits data from the input.

func (*Stream) Bytes

func (o *Stream) Bytes() ([]byte, error)

Bytes collects all processed output as a bytes slice.

This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.

func (*Stream) Lines

func (o *Stream) Lines() ([]string, error)

Lines collects all processed output as a slice of strings.

This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.

func (*Stream) Read

func (o *Stream) Read(p []byte) (int, error)

Read populates p with processed data. It allows Stream to effectively be compatible with anything that accepts an io.Reader.

WARNING: This implementation is currently VERY inefficient with Pipeline configured. Prefer to avoid calling Read() by calling WriteTo() instead, e.g. via io.Copy instead of io.ReadAll.

func (*Stream) Stream

func (o *Stream) Stream(dst LineHandler[string]) error

Stream passes lines read from the input to the handler as it processes them.

This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.

func (*Stream) StreamBytes

func (o *Stream) StreamBytes(dst LineHandler[[]byte]) error

StreamBytes passes lines read from the input to the handler as it processes them.

This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.

func (*Stream) String

func (o *Stream) String() (string, error)

String collects all processed output as a string.

This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.

func (*Stream) WithPipeline

func (s *Stream) WithPipeline(p pipeline.Pipeline) *Stream

WithPipeline configures this Stream to process the input data with the given Pipeline in all output methods ((*Stream).Stream(...), (*Stream).Lines(...), io.Copy, etc.).

If you want to use multiple Pipelines, you can provide a pipeline.MultiPipeline.

Example
package main

import (
	"fmt"
	"strconv"
	"strings"

	"go.bobheadxi.dev/streamline"
	"go.bobheadxi.dev/streamline/pipeline"
)

func main() {
	data := strings.NewReader(`3
4
4.8
7
5
2`)

	lines, _ := streamline.New(data).
		// Pipeline to discard even and non-integer numbers
		WithPipeline(pipeline.Filter(func(line []byte) bool {
			v, err := strconv.Atoi(string(line))
			return err == nil && v%2 == 0
		})).
		Lines()
	fmt.Println(lines)
}
Output:

[4 2]

func (*Stream) WriteTo

func (o *Stream) WriteTo(dst io.Writer) (int64, error)

WriteTo writes processed data to dst. It allows Stream to effectively implement io.Copy handling.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL