pio

package
v0.4.173 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2024 License: ISC Imports: 16 Imported by: 0

Documentation

Overview

Package pio provides a context-cancelable stream copier, a closable buffer, line-based reader and other io functions

EofReader is an empty reader returning EOF. Thread-safe

ReadWriteCloserSlice is a read-writer with a slice as intermediate storage. thread-safe.

TeeWriter is a writer that copies its writes to one or more other writers.

Index

Constants

This section is empty.

Variables

View Source
var EofReader io.Reader = &eofReader{}

EofReader is an empty reader returning EOF. Thread-safe

View Source
var ErrCopyShutdown = errors.New("Copy received Shutdown")
View Source
var ErrInvalidWrite = errors.New("invalid write result")

errInvalidWrite means that a write returned an impossible count

Functions

func CopyContext added in v0.4.108

func CopyContext(dst io.Writer, src io.Reader, buf []byte, ctx context.Context) (written int64, err error)

CopyContext is like io.Copy but is cancelable via context

  • CopyContext closes both reader and writer if their runtime type is closable
  • context cancel is detected on any io.File.Read or io.File.Write invocation and carried out by parallel io.File.Close if either reader or writer is closable
  • may launch 1 thread while copying
  • err may be context.Canceled

func CopyThread added in v0.4.95

func CopyThread(
	label string, reader io.Reader, writer io.Writer,
	errCh chan<- error, ctx context.Context,
)

CopyThread copies from an io.Reader to an io.Writer.

  • label is used for thread identification on panics
  • errCh receives result and makes thread awaitable
  • if ctx, a CancelContext, is non-nil and error occurs, ctx is cancelled
  • CopyThread itself never fails

func InitWriteCloserToChan

func InitWriteCloserToChan(wcp *WriteCloserToChan)

func NewContextReaderX added in v0.4.143

func NewContextReaderX(ctx context.Context, reader io.Reader) io.Reader

NewContextReader instantiates ContextReader

func NewNetConnTap added in v0.4.159

func NewNetConnTap(conn net.Conn, readsWriter, writesWriter io.Writer, addError parl.AddError) (socketTap io.ReadWriter)

NewNetConnTap returns a data tap for a bidirectional data stream

  • data from readWriter.Read is written to reads.Write if non-nil
  • data written to readWriter.Write is written to writes.Write if non-nil
  • a non-nil errs receives all errors from delegated Read Write reads and writes
  • if errs is nil, an error from the reads and writes taps is panic
  • ReadWriterTap impements idempotent Close
  • if any of readWriter, reads or writes implements io.Close, they are closed on socketTap.Close
  • the consumer may invoke socketTap.Close to ensure reads and writes are closed
  • errors in reads or writes do not affect the socketTap consumer

func NewPioError added in v0.4.159

func NewPioError(source PIOErrorSource, e error) (err error)

func NewReadWriterTap added in v0.4.159

func NewReadWriterTap(readWriter io.ReadWriter, reads, writes io.Writer, errs func(err error)) (socketTap io.ReadWriter)

NewReadWriterTap returns a data tap for a bidirectional data stream

  • data from readWriter.Read is written to reads.Write if non-nil
  • data written to readWriter.Write is written to writes.Write if non-nil
  • a non-nil errs receives all errors from delegated Read Write reads and writes
  • if errs is nil, an error from the reads and writes taps is panic
  • ReadWriterTap impements idempotent Close
  • if any of readWriter, reads or writes implements io.Close, they are closed on socketTap.Close
  • the consumer may invoke socketTap.Close to ensure reads and writes are closed
  • errors in reads or writes do not affect the socketTap consumer

func NewTLSTap added in v0.4.159

func NewTLSTap(conn *tls.Conn, readsWriter, writesWriter io.Writer, addError parl.AddError) (socketTap io.ReadWriter)

NewNetConnTap returns a data tap for a bidirectional data stream

  • data from readWriter.Read is written to reads.Write if non-nil
  • data written to readWriter.Write is written to writes.Write if non-nil
  • a non-nil errs receives all errors from delegated Read Write reads and writes
  • if errs is nil, an error from the reads and writes taps is panic
  • ReadWriterTap impements idempotent Close
  • if any of readWriter, reads or writes implements io.Close, they are closed on socketTap.Close
  • the consumer may invoke socketTap.Close to ensure reads and writes are closed
  • errors in reads or writes do not affect the socketTap consumer

func NewTeeWriter added in v0.4.38

func NewTeeWriter(closeCallback func() (err error), writers ...io.Writer) (teeWriter io.WriteCloser)

TeeWriter is a writer that copies its writes to one or more other writers.

func NewWriteCloserToChan

func NewWriteCloserToChan() (writeCloser io.WriteCloser)

func NewWriteCloserToChanLine

func NewWriteCloserToChanLine(withNewline ...bool) (writeCloser io.WriteCloser)

Types

type CloseCallback added in v0.4.135

type CloseCallback func(err error) (e error)

type CloseCallbacker added in v0.4.135

type CloseCallbacker struct {
	// contains filtered or unexported fields
}

CloseCallbacker implements a close callback for io.Closer 231128 unused

func NewCloseCallbacker added in v0.4.135

func NewCloseCallbacker(closer io.Closer, closeCallback CloseCallback) (closeCallbacker *CloseCallbacker)

func (*CloseCallbacker) Close added in v0.4.135

func (c *CloseCallbacker) Close() (err error)

func (*CloseCallbacker) IsClosed added in v0.4.135

func (c *CloseCallbacker) IsClosed() (isClosed bool)

func (*CloseCallbacker) Wait added in v0.4.135

func (c *CloseCallbacker) Wait()

type CloserBuffer added in v0.4.135

type CloserBuffer struct {
	// Available() AvailableBuffer() Bytes() Cap() Grow() Len() Next()
	// Read() ReadByte() ReadBytes() ReadFrom() ReadRune() ReadString()
	// Reset() String() Truncate() UnreadByte() UnreadRune()
	// Write() WriteByte() WriteRune() WriteString() WriteTo()
	bytes.Buffer
	// contains filtered or unexported fields
}

CloserBuffer extends byte.Buffer to be io.Closer

func NewCloserBuffer added in v0.4.135

func NewCloserBuffer(buffer ...*bytes.Buffer) (closer *CloserBuffer)

NewCloserBuffer returns an bytes.Buffer implementing io.Closer

func (*CloserBuffer) Close added in v0.4.135

func (b *CloserBuffer) Close() (err error)

Close should only be invoked once. Close is not required for releasing resources.

func (*CloserBuffer) Read added in v0.4.135

func (b *CloserBuffer) Read(p []byte) (n int, err error)

Read reads up to len(p) bytes into p. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered.

func (*CloserBuffer) ReadByte added in v0.4.135

func (b *CloserBuffer) ReadByte() (c byte, err error)

ReadByte reads and returns the next byte from the input or any error encountered. If ReadByte returns an error, no input byte was consumed, and the returned byte value is undefined.

func (*CloserBuffer) ReadFrom added in v0.4.135

func (b *CloserBuffer) ReadFrom(r io.Reader) (n int64, err error)

ReadFrom reads data from r until EOF or error. The return value n is the number of bytes read. Any error except EOF encountered during the read is also returned.

func (*CloserBuffer) ReadRune added in v0.4.135

func (b *CloserBuffer) ReadRune() (r rune, size int, err error)

ReadRune reads a single encoded Unicode character and returns the rune and its size in bytes. If no character is available, err will be set.

func (*CloserBuffer) Reset added in v0.4.135

func (b *CloserBuffer) Reset()

Reset resets the buffer to be empty, but it retains the underlying storage for use by future writes.

func (*CloserBuffer) UnreadByte added in v0.4.135

func (b *CloserBuffer) UnreadByte() (err error)

UnreadByte causes the next call to ReadByte to return the last byte read.

func (*CloserBuffer) UnreadRune added in v0.4.135

func (b *CloserBuffer) UnreadRune() (err error)

UnreadRune causes the next call to ReadRune to return the last rune read.

func (*CloserBuffer) Write added in v0.4.135

func (b *CloserBuffer) Write(p []byte) (n int, err error)

Write writes len(p) bytes from p to the underlying data stream. It returns the number of bytes written from p (0 <= n <= len(p)) and any error encountered that caused the write to stop early. Write must return a non-nil error if it returns n < len(p).

func (*CloserBuffer) WriteByte added in v0.4.135

func (b *CloserBuffer) WriteByte(c byte) (err error)

WriteByte writes a byte to w.

func (*CloserBuffer) WriteString added in v0.4.135

func (b *CloserBuffer) WriteString(s string) (n int, err error)

WriteString writes the contents of the string s to w, which accepts a slice of bytes.

func (*CloserBuffer) WriteTo added in v0.4.135

func (b *CloserBuffer) WriteTo(w io.Writer) (n int64, err error)

WriteTo writes data to w until there's no more data to write or when an error occurs. The return value n is the number of bytes written. Any error encountered during the write is also returned.

type ContextCloser added in v0.4.108

type ContextCloser struct {
	// contains filtered or unexported fields
}

ContextCloser is an idempotent io.Closer

func NewContextCloser added in v0.4.108

func NewContextCloser(closer io.Closer) (contextCloser *ContextCloser)

NewContextCloser returns a an idempotent io.Closer

  • closer may be nil
  • panic-free idempotent observable

func (*ContextCloser) Close added in v0.4.108

func (c *ContextCloser) Close() (err error)

Close closes the io.Closer

  • if Close has already been invoked, noop, no error
  • if io.Closer is nil, noop, no error
  • panic-free idempotent

func (*ContextCloser) IsCloseable added in v0.4.108

func (c *ContextCloser) IsCloseable() (isCloseable bool)

IsCloseable indicates whether an io.Closer is present that can be closed

type ContextCopier added in v0.4.108

type ContextCopier struct {
	// contains filtered or unexported fields
}

ContextCopier is an io.Copy cancelable via context

func NewContextCopier added in v0.4.108

func NewContextCopier(buf ...[]byte) (copier *ContextCopier)

NewContextCopier copies src to dst aborting if context is canceled

  • buf is buffer that can be used
  • if reader implements WriteTo or writer implements ReadFrom, no buffer is required
  • if a buffer is reqiired ans missing, 1 MiB is allocated
  • Copy methods does copying
  • Shutdown method or context cancel aborts Copy in progress
  • if the runtime type of reader or writer is io.Closable, a thread is active during copying

func (*ContextCopier) Copy added in v0.4.134

func (c *ContextCopier) Copy(
	dst io.Writer,
	src io.Reader,
	ctx context.Context,
) (n int64, err error)

Copy copies from src to dst until end of data, error, Shutdown or context cancel

  • Shutdown method or context cancel aborts copy in progress
  • on context cancel, error returned is context.Canceled
  • on Shutdown, error returned has ErrCopyShutdown
  • if the runtime type of dst or src is io.Closable, a thread is active during copying
  • such reader or writer will be closed

func (*ContextCopier) Shutdown added in v0.4.134

func (c *ContextCopier) Shutdown()

Shutdown order the thread to exit and wait for its result

  • every Copy invocation will have a Shutdown either by consumer or the deferred copyEnd method

type ContextReader added in v0.4.108

type ContextReader struct {

	// idempotent pannic-free closer if reader implemented [io.Closer]
	//	- Close() IsClosable()
	ContextCloser
	// contains filtered or unexported fields
}

ContextReader is an io.ReadCloser that aborts on context cancel

  • on context cancel, Read returns error context.Canceled
  • If the runtime type of reader implements io.Close, it ContextReader can close it

func NewContextReader added in v0.4.108

func NewContextReader(reader io.Reader, ctx context.Context) (contextReader *ContextReader)

NewContextReader returns an io.ReadCloser that aborts on context cancel

  • on context cancel, Read returns error context.Canceled
  • If the runtime type of reader implements io.Close, it can be closed

func (*ContextReader) Read added in v0.4.108

func (c *ContextReader) Read(p []byte) (n int, err error)

Read is like io.Reader.Read but cancels if the context is canceled

type ContextReaderX added in v0.4.143

type ContextReaderX struct {
	io.Reader
	// contains filtered or unexported fields
}

ContextReader reader terminated by context

func (*ContextReaderX) Read added in v0.4.143

func (cr *ContextReaderX) Read(p []byte) (n int, err error)

type ContextWriter added in v0.4.108

type ContextWriter struct {

	// idempotent pannic-free closer if reader implemented [io.Closer]
	//	- Close() IsClosable()
	ContextCloser
	// contains filtered or unexported fields
}

ContextWriter is an io.WriteCloser that aborts on context cancel

  • on context cancel, Write returns error context.Canceled
  • If the runtime type of writer implements io.Close, it ContextWriter can close it

func NewContextWriter added in v0.4.108

func NewContextWriter(writer io.Writer, ctx context.Context) (contextWriter *ContextWriter)

NewContextWriter returns an io.WriteCloser that aborts on context cancel

  • on context cancel, Write returns error context.Canceled
  • If the runtime type of reader implements io.Close, it can be closed

func (*ContextWriter) Write added in v0.4.108

func (c *ContextWriter) Write(p []byte) (n int, err error)

Write is like io.Writer.Write but cancels if the context is canceled

type LineFilterFunc added in v0.4.135

type LineFilterFunc func(line *[]byte, isLastLine bool) (skipLine bool, err error)

LineFilterFunc receives lines as they are written to the writer

  • can modify the line
  • can skip the line using skipLine
  • can return error

type LineFilterWriter added in v0.4.135

type LineFilterWriter struct {
	// contains filtered or unexported fields
}

LineFilterWriter is a writer that filters each line using a filter function

func NewLineFilterWriter added in v0.4.135

func NewLineFilterWriter(writeCloser io.WriteCloser, filter LineFilterFunc) (lineWriter *LineFilterWriter)

NewLineFilterWriter is a writer that filters each line using a filter function

func (*LineFilterWriter) Close added in v0.4.135

func (wc *LineFilterWriter) Close() (err error)

Close closes

func (*LineFilterWriter) Write added in v0.4.135

func (wc *LineFilterWriter) Write(p []byte) (n int, err error)

Write saves data in slice and returns all bytes written or ErrFileAlreadyClosed

type LineReader added in v0.4.38

type LineReader struct {
	// contains filtered or unexported fields
}

LineReader reads a io.Reader stream returing one line per Read invocation

func NewLineReader added in v0.4.38

func NewLineReader(reader io.Reader) (lineReader *LineReader)

NewLineReader reads a io.Reader stream returing one line per Read invocation

func (*LineReader) Read added in v0.4.38

func (rr *LineReader) Read(p []byte) (n int, err error)

Read returns a byte-sequence ending with newline if size of p is sufficient.

  • if size of p is too short, the text will not end with newline
  • if EOF without newline, text has no newline and err is io.EOF

func (*LineReader) ReadLine added in v0.4.38

func (rr *LineReader) ReadLine(p []byte) (line []byte, isEOF bool, err error)

ReadLine returns full lines, extending p as necessary

  • len(line) is number of bytes
  • max line length 1 MiB
  • line will end with newLine unless 1 MiB or isEOF
  • EOF is returned as isEOF true

type NetConnTap added in v0.4.159

type NetConnTap struct {
	net.Conn
	// contains filtered or unexported fields
}

func (*NetConnTap) Close added in v0.4.159

func (t *NetConnTap) Close() (err error)

func (*NetConnTap) Read added in v0.4.159

func (t *NetConnTap) Read(p []byte) (n int, err error)

func (*NetConnTap) Write added in v0.4.159

func (t *NetConnTap) Write(p []byte) (n int, err error)

type PIOError added in v0.4.159

type PIOError struct {
	// contains filtered or unexported fields
}

ReadError indicates an

func (*PIOError) Error added in v0.4.159

func (e *PIOError) Error() (msg string)

func (*PIOError) PIOErrorSource added in v0.4.159

func (e *PIOError) PIOErrorSource() (source PIOErrorSource)

func (*PIOError) Unwrap added in v0.4.159

func (e *PIOError) Unwrap() (err error)

type PIOErrorSource added in v0.4.159

type PIOErrorSource uint8

error source for a data tap

const (
	// error during delegated Read
	PeRead PIOErrorSource = iota + 1
	// error during delegated Write
	PeWrite
	// error during delegated Close
	PeClose
	// error from reads [io.Writer]
	PeReads
	// error from writes [io.Writer]
	PeWrites
)

func (PIOErrorSource) String added in v0.4.159

func (s PIOErrorSource) String() (s2 string)

type ReadWriteCloserSlice

type ReadWriteCloserSlice struct {
	// contains filtered or unexported fields
}

ReadWriteCloserSlice is a read-writer with a slice as intermediate storage. thread-safe.

  • Close closes the writer side indicating no further data will be added
  • Write and Close may return error that can be checked: errors.Is(err, pio.ErrFileAlreadyClosed)
  • read will eventually return io.EOF after a Close
  • there are no other errors

func NewReadWriteCloserSlice

func NewReadWriteCloserSlice() (readWriteCloser *ReadWriteCloserSlice)

func (*ReadWriteCloserSlice) Buffer added in v0.4.95

func (r *ReadWriteCloserSlice) Buffer() (buffer []byte)

func (*ReadWriteCloserSlice) Close

func (r *ReadWriteCloserSlice) Close() (err error)

Close closes thw Write part, may return ErrFileAlreadyClosed

func (*ReadWriteCloserSlice) Read

func (r *ReadWriteCloserSlice) Read(p []byte) (n int, err error)

Read returns at most len(p) bytes read in n and possibly io.EOF

  • Read is blocking
  • n may be less than len(p)
  • if len(p) > 0, non-error return will have n > 0

func (*ReadWriteCloserSlice) Write

func (r *ReadWriteCloserSlice) Write(p []byte) (n int, err error)

Write saves data in slice and returns all bytes written or ErrFileAlreadyClosed

type ReadWriterTap added in v0.4.159

type ReadWriterTap struct {
	io.ReadWriter
	// contains filtered or unexported fields
}

func (*ReadWriterTap) Close added in v0.4.159

func (t *ReadWriterTap) Close() (err error)

func (*ReadWriterTap) Read added in v0.4.159

func (t *ReadWriterTap) Read(p []byte) (n int, err error)

func (*ReadWriterTap) Write added in v0.4.159

func (t *ReadWriterTap) Write(p []byte) (n int, err error)

type TLSTap added in v0.4.159

type TLSTap struct {
	*tls.Conn
	// contains filtered or unexported fields
}

func (*TLSTap) Close added in v0.4.159

func (t *TLSTap) Close() (err error)

func (*TLSTap) Read added in v0.4.159

func (t *TLSTap) Read(p []byte) (n int, err error)

func (*TLSTap) Write added in v0.4.159

func (t *TLSTap) Write(p []byte) (n int, err error)

type Tap added in v0.4.159

type Tap struct {
	IsClosed parl.Awaitable
	// contains filtered or unexported fields
}

Tap returns a socket tap producing two streams of data read from and written to a socket

func NewTap added in v0.4.159

func NewTap(readsWriter, writesWriter io.Writer, addError parl.AddError) (tap *Tap)

func (*Tap) Close added in v0.4.159

func (t *Tap) Close(closer any) (err error)

func (*Tap) Read added in v0.4.159

func (t *Tap) Read(reader io.Reader, p []byte) (n int, err error)

func (*Tap) Write added in v0.4.159

func (t *Tap) Write(writer io.Writer, p []byte) (n int, err error)

type TeeWriter added in v0.4.38

type TeeWriter struct {
	// contains filtered or unexported fields
}

TeeWriter is a writer that copies its writes to one or more other writers.

func (*TeeWriter) Close added in v0.4.38

func (w *TeeWriter) Close() (err error)

func (*TeeWriter) Write added in v0.4.38

func (tw *TeeWriter) Write(p []byte) (n int, err error)

type WriteCloserToChan

type WriteCloserToChan struct {
	// contains filtered or unexported fields
}

func (*WriteCloserToChan) Ch

func (wc *WriteCloserToChan) Ch() (readCh <-chan []byte)

func (*WriteCloserToChan) Close

func (wc *WriteCloserToChan) Close() (err error)

func (*WriteCloserToChan) Write

func (wc *WriteCloserToChan) Write(p []byte) (n int, err error)

type WriteCloserToChanLine

type WriteCloserToChanLine struct {
	// contains filtered or unexported fields
}

func (*WriteCloserToChanLine) Ch

func (wc *WriteCloserToChanLine) Ch() (readCh <-chan string)

func (*WriteCloserToChanLine) Close

func (wc *WriteCloserToChanLine) Close() (err error)

func (*WriteCloserToChanLine) Write

func (wc *WriteCloserToChanLine) Write(p []byte) (n int, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL