moxio

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: MIT Imports: 12 Imported by: 16

Documentation

Overview

Package moxio has common i/o functions.

Index

Constants

This section is empty.

Variables

View Source
var ErrLimit = errors.New("input exceeds maximum size") // Returned by LimitReader.
View Source
var ErrLineTooLong = errors.New("line from remote too long") // Returned by Bufpool.Readline.

Functions

func Base64Writer added in v0.0.6

func Base64Writer(w io.Writer) io.WriteCloser

Base64Writer turns a writer for data into one that writes base64 content on \r\n separated lines of max 78+2 characters length.

func IsClosed

func IsClosed(err error) bool

IsClosed returns whether i/o failed, typically because the connection is closed or otherwise cannot be used for further i/o.

Used to prevent error logging for connections that are closed.

func IsStorageSpace

func IsStorageSpace(err error) bool

IsStorageSpace returns whether the error is for storage space issue. Like disk full, no inodes, quota reached.

func LinkOrCopy added in v0.0.6

func LinkOrCopy(log mlog.Log, dst, src string, srcReaderOpt io.Reader, sync bool) (rerr error)

LinkOrCopy attempts to make a hardlink dst. If that fails, it will try to do a regular file copy. If srcReaderOpt is not nil, it will be used for reading. If sync is true and the file is copied, Sync is called on the file after writing to ensure the file is written on disk. Callers should also sync the directory of the destination file, but may want to do that after linking/copying multiple files. If dst was created and an error occurred, it is removed.

func SyncDir

func SyncDir(log mlog.Log, dir string) error

SyncDir opens a directory and syncs its contents to disk.

func TLSInfo added in v0.0.9

func TLSInfo(conn *tls.Conn) (version, ciphersuite string)

TLSInfo returns human-readable strings about the TLS connection, for use in logging.

Types

type AtReader

type AtReader struct {
	R      io.ReaderAt
	Offset int64
}

AtReader is turns an io.ReaderAt into a io.Reader by keeping track of the offset.

func (*AtReader) Read

func (r *AtReader) Read(buf []byte) (int, error)

type Bufpool

type Bufpool struct {
	// contains filtered or unexported fields
}

Bufpool caches byte slices for reuse during parsing of line-terminated commands.

func NewBufpool

func NewBufpool(max, size int) *Bufpool

NewBufpool makes a new pool, initially empty, but holding at most "max" buffers of "size" bytes each.

func (*Bufpool) Readline

func (b *Bufpool) Readline(log mlog.Log, r *bufio.Reader) (line string, rerr error)

Readline reads a \n- or \r\n-terminated line. Line is returned without \n or \r\n. If the line was too long, ErrLineTooLong is returned. If an EOF is encountered before a \n, io.ErrUnexpectedEOF is returned.

type LimitAtReader

type LimitAtReader struct {
	R     io.ReaderAt
	Limit int64
}

LimitAtReader is a reader at that returns ErrLimit if reads would extend beyond Limit.

func (*LimitAtReader) ReadAt

func (r *LimitAtReader) ReadAt(buf []byte, offset int64) (int, error)

ReadAt passes the read on to R, but returns an error if the read data would extend beyond Limit.

type LimitReader

type LimitReader struct {
	R     io.Reader
	Limit int64
}

LimitReader reads up to Limit bytes, returning an error if more bytes are read. LimitReader can be used to enforce a maximum input length.

func (*LimitReader) Read

func (r *LimitReader) Read(buf []byte) (int, error)

Read reads bytes from the underlying reader.

type PrefixConn

type PrefixConn struct {
	PrefixReader io.Reader // If not nil, reads are fulfilled from here. It is cleared when a read returns io.EOF.
	net.Conn
}

PrefixConn is a net.Conn prefixed with a reader that is first drained. Used for STARTTLS where already did a buffered read of initial TLS data.

func (*PrefixConn) Read

func (c *PrefixConn) Read(buf []byte) (int, error)

Read returns data when PrefixReader when not nil, and net.Conn otherwise.

type TraceReader

type TraceReader struct {
	// contains filtered or unexported fields
}

func NewTraceReader

func NewTraceReader(log mlog.Log, prefix string, r io.Reader) *TraceReader

NewTraceReader wraps reader "r" into a reader that logs all reads to "log" with log level trace, prefixed with "prefix".

func (*TraceReader) Read

func (r *TraceReader) Read(buf []byte) (int, error)

Read does a single Read on its underlying reader, logs data of successful reads, and returns the data read.

func (*TraceReader) SetTrace

func (r *TraceReader) SetTrace(level slog.Level)

type TraceWriter

type TraceWriter struct {
	// contains filtered or unexported fields
}

func NewTraceWriter

func NewTraceWriter(log mlog.Log, prefix string, w io.Writer) *TraceWriter

NewTraceWriter wraps "w" into a writer that logs all writes to "log" with log level trace, prefixed with "prefix".

func (*TraceWriter) SetTrace

func (w *TraceWriter) SetTrace(level slog.Level)

func (*TraceWriter) Write

func (w *TraceWriter) Write(buf []byte) (int, error)

Write logs a trace line for writing buf to the client, then writes to the client.

type Work added in v0.0.7

type Work[T, R any] struct {
	In  T
	Err error
	Out R
	// contains filtered or unexported fields
}

Work is a slot for work that needs to be done.

type WorkQueue added in v0.0.7

type WorkQueue[T, R any] struct {
	// contains filtered or unexported fields
}

WorkQueue can be used to execute a work load where many items are processed with a slow step and where a pool of workers goroutines to execute the slow step helps. Reading messages from the database file is fast and cannot be easily done concurrently, but reading the message file from disk and parsing the headers is the bottleneck. The workqueue can manage the goroutines that read the message file from disk and parse.

func NewWorkQueue added in v0.0.7

func NewWorkQueue[T, R any](procs, size int, preparer func(in, out chan Work[T, R]), process func(T, R) error) *WorkQueue[T, R]

NewWorkQueue creates a new work queue with "procs" goroutines, and a total work queue size of "size" (e.g. 2*procs). The worker goroutines run "preparer", which should be a loop receiving work from "in" and sending the work result (with Err or Out set) on "out". The preparer function should return when the "in" channel is closed, the signal to stop. WorkQueue processes the results in the order they went in, so prepared work that was scheduled after earlier work that is not yet prepared will wait and be queued.

func (*WorkQueue[T, R]) Add added in v0.0.7

func (wq *WorkQueue[T, R]) Add(in T) error

Add adds new work to be prepared to the queue. If the queue is full, it waits until space becomes available, i.e. when the head of the queue has work that becomes prepared. Add processes the prepared items to make space available.

func (*WorkQueue[T, R]) Finish added in v0.0.7

func (wq *WorkQueue[T, R]) Finish() error

Finish waits for the remaining work to be prepared and processes the work.

func (*WorkQueue[T, R]) Stop added in v0.0.7

func (wq *WorkQueue[T, R]) Stop()

Stop shuts down the worker goroutines and waits until they have returned. Stop must always be called on a WorkQueue, otherwise the goroutines never stop.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL