streamline

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2023 License: MIT Imports: 6 Imported by: 1

README

streamline go reference Sourcegraph

pipeline codecov Go Report Card benchmarks

Handle your data, line by line.

go get go.bobheadxi.dev/streamline

Overview

streamline offers a variety of primitives to make working with data line by line a breeze:

  • streamline.Stream offers the ability to add hooks that handle an io.Reader line-by-line with (*Stream).Stream, (*Stream).StreamBytes, and other utilities.
  • pipeline.Pipeline offers a way to build pipelines that transform the data in a streamline.Stream, such as cleaning, filtering, mapping, or sampling data.
  • pipe.NewStream offers a way to create a buffered pipe between a writer and a Stream.
    • streamexec.Start uses this to attach a Stream to an exec.Cmd to work with command output.

When working with data streams in Go, you typically get an io.Reader, which is great for arbitrary data - but in many cases, especially when scripting, it's common to either end up with data and outputs that are structured line by line, or want to handle data line by line, for example to send to a structured logging library. You can set up a bufio.Reader or bufio.Scanner to do this, but for cases like exec.Cmd you will also need boilerplate to configure the command and set up pipes, and for additional functionality like transforming, filtering, or sampling output you will need to write your own additional handlers. streamline aims to provide succint ways to do all of the above and more.

Add prefixes to command output
bufio.Scanner streamline/streamexec
func PrefixCommandOutput(cmd *exec.Cmd) error {
    reader, writer := io.Pipe()
    cmd.Stdout = writer
    cmd.Stderr = writer
    if err := cmd.Start(); err != nil {
        return err
    }
    errC := make(chan error)
    go func() {
        err := cmd.Wait()
        writer.Close()
        errC <- err
    }()
    scanner := bufio.NewScanner(reader)
    for scanner.Scan() {
        println("PREFIX: ", scanner.Text())
    }
    if err := scanner.Err(); err != nil {
        return err
    }
    return <-errC
}
func PrefixCommandOutput(cmd *exec.Cmd) error {
    stream, err := streamexec.Start(cmd)
    if err != nil {
        return err
    }
    return stream.Stream(func(line string) {
        println("PREFIX: ", line)
    })
}
Process JSON on the fly
bufio.Scanner streamline
func GetMessages(r io.Reader) error {
    scanner := bufio.NewScanner(r)
    for scanner.Scan() {
        var result bytes.Buffer
        cmd := exec.Command("jq", ".msg")
        cmd.Stdin = bytes.NewReader(scanner.Bytes())
        cmd.Stdout = &result
        if err := cmd.Run(); err != nil {
            return err
        }
        line := result.String()
        println(strings.TrimSuffix(line, "\n"))
    }
    return scanner.Err()
}
func GetMessages(r io.Reader) error {
    return streamline.New(r).
        WithPipeline(jq.Pipeline(".msg")).
        Stream(func(line string) {
            println(line)
        })
}
Sample noisy output
bufio.Scanner streamline
func PrintEvery10th(r io.Reader) error {
    scanner := bufio.NewScanner(r)
    var count int
    for scanner.Scan() {
        count++
        if count%10 != 0 {
            continue
        }
        println(scanner.Text())
    }
    return scanner.Err()
}
func PrintEvery10th(r io.Reader) error {
    return streamline.New(r).
        WithPipeline(pipeline.Sample(10)).
        Stream(func(line string) {
            println(line)
        })
}

Background

Some of the ideas in this package started in sourcegraph/run, where we were trying to build utilities that made it easier to write bash-esque scripts using Go, namely being able to do things you would often to in scripts such as grepping and iterating over lines. streamline generalizes on the ideas for working with newline-delimited data to work for arbitrary inputs.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type LineReader added in v0.9.0

type LineReader interface {
	// ReadSlice should have the behaviour of (*bufio.Reader).ReadSlice.
	ReadSlice(delim byte) ([]byte, error)

	io.WriterTo
	io.Reader
}

LineReader is a reader that implements the ability to read up to a line delimiter. It is used for internal assertion only, to determine if io.Readers provided to streamline.New already implement the desired functionality - it is exported for reference, and should not be depended upon, since the interface may change in the future.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream enables live, line-by-line manipulation and handling of data through (*Stream).WithPipeline(...) and Stream's various aggregation methods. Stream also supports standard library features like io.Copy and io.ReadAll.

Stream's aggregation methods ((*Stream).Stream(...), (*Stream).Lines(...), etc) may only be used once. Incremental consumers like (*Stream).Read(...) may need to be called multiple times to consume all data but should not be used in conjunction with other methods.

Example (Jq)
package main

import (
	"bytes"
	"fmt"
	"strings"

	"go.bobheadxi.dev/streamline"
	"go.bobheadxi.dev/streamline/jq"
	"go.bobheadxi.dev/streamline/pipeline"
)

func main() {
	data := strings.NewReader(`Loading...
Still loading...
{
	"message": "this is the real data!"
}`)

	stream := streamline.New(data).
		// Pipeline to discard loading indicators
		WithPipeline(pipeline.Filter(func(line []byte) bool {
			return !bytes.Contains(line, []byte("..."))
		}))

	// stream is just an io.Reader
	message, _ := jq.Query(stream, ".message")

	fmt.Println(string(message))
}
Output:

"this is the real data!"
Example (Streamexec)
package main

import (
	"fmt"
	"os/exec"

	"go.bobheadxi.dev/streamline/streamexec"
)

func main() {
	cmd := exec.Command("echo", "hello world\nthis is a line\nand another line!")

	stream, _ := streamexec.Start(cmd, streamexec.Combined)
	_ = stream.Stream(func(line string) {
		fmt.Println("received output:", line)
	})
}
Output:

received output: hello world
received output: this is a line
received output: and another line!

func New

func New(input io.Reader) *Stream

New creates a Stream that consumes, processes, and emits data from the input. If the input also implements LineReader, then it will use the input directly - otherwise, it will wrap the input in a bufio.Reader.

func (*Stream) Bytes

func (s *Stream) Bytes() ([]byte, error)

Bytes collects all processed output as a bytes slice.

This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.

func (*Stream) Lines

func (s *Stream) Lines() ([]string, error)

Lines collects all processed output as a slice of strings.

This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.

func (*Stream) Read

func (s *Stream) Read(p []byte) (int, error)

Read populates p with processed data. It allows Stream to effectively be compatible with anything that accepts an io.Reader.

func (*Stream) Stream

func (s *Stream) Stream(dst func(line string)) error

Stream passes lines read from the input to the handler as it processes them. It is intended for simple use cases - to be able to provide errors from the line handler, use StreamBytes instead.

This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.

func (*Stream) StreamBytes

func (s *Stream) StreamBytes(dst func(line []byte) error) error

StreamBytes passes lines read from the input to the handler as it processes them, and allows the handler to return an error.

This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.

Handlers must not retain line.

func (*Stream) String

func (s *Stream) String() (string, error)

String collects all processed output as a string.

This method will block until the input returns an error. Unless the error is io.EOF, it will also propagate the error.

func (*Stream) WithLineSeparator added in v0.7.0

func (s *Stream) WithLineSeparator(separator byte) *Stream

WithLineSeparator configures a custom line separator for this stream. The default is '\n'.

func (*Stream) WithPipeline

func (s *Stream) WithPipeline(p pipeline.Pipeline) *Stream

WithPipeline configures this Stream to process the input data with the given Pipeline in all output methods ((*Stream).Stream(...), (*Stream).Lines(...), io.Copy, etc.).

If one or more Pipelines are already configured on this Stream, the given Pipeline is applied sequentially after the preconfigured pipelines.

Example
package main

import (
	"fmt"
	"strconv"
	"strings"

	"go.bobheadxi.dev/streamline"
	"go.bobheadxi.dev/streamline/pipeline"
)

func main() {
	data := strings.NewReader("3\n4\n4.8\n7\n5\n2")

	lines, _ := streamline.New(data).
		// Pipeline to discard even and non-integer numbers
		WithPipeline(pipeline.Filter(func(line []byte) bool {
			v, err := strconv.Atoi(string(line))
			return err == nil && v%2 == 0
		})).
		Lines()
	fmt.Println(lines)
}
Output:

[4 2]
Example (Jq)
package main

import (
	"bytes"
	"fmt"
	"strings"

	"go.bobheadxi.dev/streamline"
	"go.bobheadxi.dev/streamline/jq"
	"go.bobheadxi.dev/streamline/pipeline"
)

func main() {
	data := strings.NewReader(`Loading...
Still loading...
{"message": "hello"}
{"message":"world"}
{"message":"robert"}`)

	lines, _ := streamline.New(data).
		WithPipeline(pipeline.MultiPipeline{
			// Pipeline to discard non-JSON lines
			pipeline.Filter(func(line []byte) bool {
				return bytes.HasPrefix(line, []byte{'{'})
			}),
			// Execute JQ query for each line
			jq.Pipeline(".message"),
		}).
		Lines()
	fmt.Println(lines)
}
Output:

["hello" "world" "robert"]

func (*Stream) WriteTo

func (s *Stream) WriteTo(dst io.Writer) (int64, error)

WriteTo writes processed data to dst. It allows Stream to effectively implement io.Copy handling.

Directories

Path Synopsis
Package jq provides compatibility with JQ queries for working with streams of JSON data.
Package jq provides compatibility with JQ queries for working with streams of JSON data.
Package pipe provides implementations of unbounded and in-memory pipes, which provides a writer that the caller can use to collect data and a streamline.Stream instance that can be used to consume the data.
Package pipe provides implementations of unbounded and in-memory pipes, which provides a writer that the caller can use to collect data and a streamline.Stream instance that can be used to consume the data.
Package pipeline provides simple implementations of Pipeline for use in pre-processing streamline.Stream output.
Package pipeline provides simple implementations of Pipeline for use in pre-processing streamline.Stream output.
Package streamexec provides integrations with os/exec to stream command output.
Package streamexec provides integrations with os/exec to stream command output.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL