stream

package module
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2021 License: MIT Imports: 6 Imported by: 6

README

stream

GoDoc Release Software License Build Status Coverage Status Go Report Card

Usage

Write and Read concurrently, and independently.

To explain further, if you need to write to multiple places you can use io.MultiWriter, if you need multiple Readers on something you can use io.TeeReader. If you want concurrency you can use io.Pipe().

However all of these methods "tie" each Read/Write together, your readers can't read from different places in the stream, each write must be distributed to all readers in sequence.

This package provides a way for multiple Readers to read off the same Writer, without waiting for the others. This is done by writing to a "File" interface which buffers the input so it can be read at any time from many independent readers. Readers can even be created while writing or after the stream is closed. They will all see a consistent view of the stream and will block until the section of the stream they request is written, all while being unaffected by the actions of the other readers.

The use case for this stems from my other project djherbis/fscache. I needed a byte caching mechanism which allowed many independent clients to have access to the data while it was being written, rather than re-generating the byte stream for each of them or waiting for a complete copy of the stream which could be stored and then re-used.

package main

import(
	"io"
	"log"
	"os"
	"time"

	"github.com/djherbis/stream"
)

func main(){
	w, err := stream.New("mystream")
	if err != nil {
		log.Fatal(err)
	}

	go func(){
		io.WriteString(w, "Hello World!")
		<-time.After(time.Second)
		io.WriteString(w, "Streaming updates...")
		w.Close()
	}()

	waitForReader := make(chan struct{})
	go func(){
		// Read from the stream
		r, err := w.NextReader()
		if err != nil {
			log.Fatal(err)
		}
		io.Copy(os.Stdout, r) // Hello World! (1 second) Streaming updates...
		r.Close()
		close(waitForReader)
	}()

        // Full copy of the stream!
	r, err := w.NextReader() 
	if err != nil {
		log.Fatal(err)
	}
	io.Copy(os.Stdout, r) // Hello World! (1 second) Streaming updates...

	// r supports io.ReaderAt too.
	p := make([]byte, 4)
	r.ReadAt(p, 1) // Read "ello" into p

	r.Close()

	<-waitForReader // don't leave main before go-routine finishes
}

Installation

go get github.com/djherbis/stream

Documentation

Overview

Package stream provides a way to read and write to a synchronous buffered pipe, with multiple reader support.

Index

Constants

This section is empty.

Variables

View Source
var ErrCanceled = errors.New("stream has been canceled")

ErrCanceled indicates that stream has been canceled.

View Source
var ErrNotFoundInMem = errors.New("not found")

ErrNotFoundInMem is returned when an in-memory FileSystem cannot find a file.

View Source
var ErrRemoving = errors.New("cannot open a new reader while removing file")

ErrRemoving is returned when requesting a Reader on a Stream which is being Removed.

View Source
var ErrUnsupported = errors.New("unsupported")

ErrUnsupported is returned when an operation is not supported.

Functions

This section is empty.

Types

type File

type File interface {
	Name() string // The name used to Create/Open the File
	io.Reader     // Reader must continue reading after EOF on subsequent calls after more Writes.
	io.ReaderAt   // Similarly to Reader
	io.Writer     // Concurrent reading/writing must be supported.
	io.Closer     // Close should do any cleanup when done with the File.
}

File is a backing data-source for a Stream.

type FileSystem

type FileSystem interface {
	Create(name string) (File, error) // Create must return a new File for Writing
	Open(name string) (File, error)   // Open must return an existing File for Reading
	Remove(name string) error         // Remove deletes an existing File
}

FileSystem is used to manage Files

var StdFileSystem FileSystem = stdFS{}

StdFileSystem is backed by the os package.

func NewMemFS

func NewMemFS() FileSystem

NewMemFS returns a New in-memory FileSystem

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader is a concurrent-safe Stream Reader.

func (*Reader) Close

func (r *Reader) Close() error

Close closes this Reader on the Stream. This must be called when done with the Reader or else the Stream cannot be Removed.

func (*Reader) Name

func (r *Reader) Name() string

Name returns the name of the underlying File in the FileSystem.

func (*Reader) Read

func (r *Reader) Read(p []byte) (n int, err error)

Read reads from the Stream. If the end of an open Stream is reached, Read blocks until more data is written or the Stream is Closed.

func (*Reader) ReadAt

func (r *Reader) ReadAt(p []byte, off int64) (n int, err error)

ReadAt lets you Read from specific offsets in the Stream. ReadAt blocks while waiting for the requested section of the Stream to be written, unless the Stream is closed in which case it will always return immediately.

func (*Reader) Seek added in v1.3.0

func (r *Reader) Seek(offset int64, whence int) (int64, error)

Seek changes the offset of the next Read in the stream. Seeking to Start/Current does not block for the stream to reach that position, so it cannot guarantee that position exists. Seeking to End will block until the stream is closed and then seek to that position, UNLESS Stream.SetSeekEnd has specified the size, in which case Seek End will be relative that that size. Reads will still block if reading from unwritten portions of the stream. Seek is safe to call concurrently with all other methods, though calling it concurrently with Read will lead to an undefined order of the calls (ex. may Seek then Read or Read than Seek, changing which bytes are Read). Similarly, calling SetSeekEnd concurrently with calls to Seek may lead to either SeekEnd blocking OR using the SetSeekEnd.

func (*Reader) Size added in v1.2.0

func (r *Reader) Size() (int64, bool)

Size returns the current size of the entire stream (not the remaining bytes to be read), and true iff the size is valid (not canceled), and final (won't change). Can be safely called concurrently with all other methods.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is used to concurrently Write and Read from a File.

func New

func New(name string) (*Stream, error)

New creates a new Stream from the StdFileSystem with Name "name".

func NewMemStream added in v1.3.0

func NewMemStream() *Stream

NewMemStream creates an in-memory stream with no name, and no underlying fs. This should replace uses of NewStream("name", NewMemFs()). Remove() is unsupported as there is no fs to remove it from.

func NewStream

func NewStream(name string, fs FileSystem) (*Stream, error)

NewStream creates a new Stream with Name "name" in FileSystem fs.

func (*Stream) Cancel added in v1.3.0

func (s *Stream) Cancel() error

Cancel signals that this Stream is forcibly ending, NextReader() will fail, existing readers will fail Reads, all Readers & Writer are Closed. This call is non-blocking, and Remove() after this call is non-blocking.

func (*Stream) Close

func (s *Stream) Close() error

Close will close the active stream. This will cause Readers to return EOF once they have read the entire stream.

func (*Stream) Name

func (s *Stream) Name() string

Name returns the name of the underlying File in the FileSystem.

func (*Stream) NextReader

func (s *Stream) NextReader() (*Reader, error)

NextReader will return a concurrent-safe Reader for this stream. Each Reader will see a complete and independent view of the stream, and can Read while the stream is written to.

func (*Stream) Remove

func (s *Stream) Remove() error

Remove will block until the Stream and all its Readers have been Closed, at which point it will delete the underlying file. NextReader() will return ErrRemoving if called after Remove.

func (*Stream) SetSeekEnd added in v1.4.0

func (s *Stream) SetSeekEnd(size int64) error

SetSeekEnd is required in order to support Range Requests. Range Requests require the length of a Stream to be returned by using Reader.Seek with io.SeekEnd. You must set this value to the expected final length of the Stream in order for Range Requests / SeekEnd to work correctly.

This method must be called before any such Seek in order to work, and will error if called after a Seek with io.SeekEnd. SeekEnd will still work correctly even if this is not called, but it will block until the entire stream is written in order to actually seek to the true end position.

This value can only be set once, subsequent sets will not update the SeekEnd position and will return an error.

This method currently has no other affects on the Stream and in particular does not prevent Writing a different amount of bytes and Closing the stream, though this is undefined behavior, and this library reserves the right to define that behavior in the future.

func (*Stream) ShutdownWithErr added in v1.3.0

func (s *Stream) ShutdownWithErr(err error)

ShutdownWithErr causes NextReader to stop creating new Readers and instead return err, this method also blocks until all Readers and the Writer have closed.

func (*Stream) Write

func (s *Stream) Write(p []byte) (int, error)

Write writes p to the Stream. It's concurrent safe to be called with Stream's other methods.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL