stream

package
v0.5.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2022 License: Apache-2.0, MIT Imports: 7 Imported by: 0

README

stream

GoDoc Release Software License Build Status Coverage Status

Usage

Write and Read concurrently, and independently.

To explain further, if you need to write to multiple places you can use io.MultiWriter, if you need multiple Readers on something you can use io.TeeReader. If you want concurrency you can use io.Pipe().

However all of these methods "tie" each Read/Write together, your readers can't read from different places in the stream, each write must be distributed to all readers in sequence.

This package provides a way for multiple Readers to read off the same Writer, without waiting for the others. This is done by writing to a "File" interface which buffers the input so it can be read at any time from many independent readers. Readers can even be created while writing or after the stream is closed. They will all see a consistent view of the stream and will block until the section of the stream they request is written, all while being unaffected by the actions of the other readers.

The use case for this stems from my other project djherbis/fscache. I needed a byte caching mechanism which allowed many independent clients to have access to the data while it was being written, rather than re-generating the byte stream for each of them or waiting for a complete copy of the stream which could be stored and then re-used.

import(
	"io"
	"log"
	"os"
	"time"

	"github.com/djherbis/stream"
)

func main(){
	w, err := stream.New("mystream")
	if err != nil {
		log.Fatal(err)
	}

	go func(){
		io.WriteString(w, "Hello World!")
		<-time.After(time.Second)
		io.WriteString(w, "Streaming updates...")
		w.Close()
	}()

	waitForReader := make(chan struct{})
	go func(){
		// Read from the stream
		r, err := w.NextReader()
		if err != nil {
			log.Fatal(err)
		}
		io.Copy(os.Stdout, r) // Hello World! (1 second) Streaming updates...
		r.Close()
		close(waitForReader)
	}()

  // Full copy of the stream!
	r, err := w.NextReader() 
	if err != nil {
		log.Fatal(err)
	}
	io.Copy(os.Stdout, r) // Hello World! (1 second) Streaming updates...

	// r supports io.ReaderAt too.
	p := make([]byte, 4)
	r.ReadAt(p, 1) // Read "ello" into p

	r.Close()

	<-waitForReader // don't leave main before go-routine finishes
}

Installation

go get github.com/djherbis/stream

Documentation

Overview

Package stream provides a way to read and write to a synchronous buffered pipe, with multiple reader support.

Index

Constants

This section is empty.

Variables

View Source
var ErrNotFoundInMem = errors.New("not found")

ErrNotFoundInMem is returned when an in-memory FileSystem cannot find a file.

View Source
var ErrRemoving = errors.New("cannot open a new reader while removing file")

ErrRemoving is returned when requesting a Reader on a Stream which is being Removed.

Functions

This section is empty.

Types

type File

type File interface {
	Name() string // The name used to Create/Open the File
	io.Reader     // Reader must continue reading after EOF on subsequent calls after more Writes.
	io.ReaderAt   // Similarly to Reader
	io.Writer     // Concurrent reading/writing must be supported.
	io.Closer     // Close should do any cleanup when done with the File.
}

File is a backing data-source for a Stream.

type FileSystem

type FileSystem interface {
	Create(name string) (File, error) // Create must return a new File for Writing
	Open(name string) (File, error)   // Open must return an existing File for Reading
	Remove(name string) error         // Remove deletes an existing File
}

FileSystem is used to manage Files

var StdFileSystem FileSystem = stdFS{}

StdFileSystem is backed by the os package.

func NewMemFS

func NewMemFS() FileSystem

NewMemFS returns a New in-memory FileSystem

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader is a concurrent-safe Stream Reader.

func (*Reader) Close

func (r *Reader) Close() error

Close closes this Reader on the Stream. This must be called when done with the Reader or else the Stream cannot be Removed.

func (*Reader) Name

func (r *Reader) Name() string

Name returns the name of the underlying File in the FileSystem.

func (*Reader) Read

func (r *Reader) Read(p []byte) (n int, err error)

Read reads from the Stream. If the end of an open Stream is reached, Read blocks until more data is written or the Stream is Closed.

func (*Reader) ReadAt

func (r *Reader) ReadAt(p []byte, off int64) (n int, err error)

ReadAt lets you Read from specific offsets in the Stream. ReadAt blocks while waiting for the requested section of the Stream to be written, unless the Stream is closed in which case it will always return immediately.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is used to concurrently Write and Read from a File.

func New

func New(name string) (*Stream, error)

New creates a new Stream from the StdFileSystem with Name "name".

func NewStream

func NewStream(name string, fs FileSystem) (*Stream, error)

NewStream creates a new Stream with Name "name" in FileSystem fs.

func (*Stream) Close

func (s *Stream) Close() error

Close will close the active stream. This will cause Readers to return EOF once they have read the entire stream.

func (*Stream) Name

func (s *Stream) Name() string

Name returns the name of the underlying File in the FileSystem.

func (*Stream) NextReader

func (s *Stream) NextReader() (*Reader, error)

NextReader will return a concurrent-safe Reader for this stream. Each Reader will see a complete and independent view of the stream, and can Read will the stream is written to.

func (*Stream) Remove

func (s *Stream) Remove() error

Remove will block until the Stream and all its Readers have been Closed, at which point it will delete the underlying file. NextReader() will return ErrRemoving if called after Remove.

func (*Stream) Write

func (s *Stream) Write(p []byte) (int, error)

Write writes p to the Stream. It's concurrent safe to be called with Stream's other methods.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL