parl

package module
v0.4.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2022 License: ISC Imports: 15 Imported by: 14

README

Parl


A Go library for parallel programming of command-line utilities and system services

features: moderator, sqlite interface, package-selectable logging, debouncer, self-signed certificate authority, watchers, thread management, file system scan and operations, …


© 2022–present Harald Rudell (https://haraldrudell.github.io/haraldrudell/)
ISC License

“justice, peace and massive virtual parallelism”

How to use

import "github.com/haraldrudell/parl"

Documentation

Go Reference  Documentation in the Go Package Index

On March 16th, 2022, parl was open-sourced under an ISC License
Parl is about 9,000 lines of Go code with first line written on November 21, 2018

© 2022–present Harald Rudell harald.rudell@gmail.com (https://haraldrudell.github.io/haraldrudell/)

Documentation

Overview

Package parl handles inter-thread communication and controls parallelism

parl has sub-packages augmenting the Go standard library:

perrors pfs plog pnet pos pruntime psql pstrings
psyscall pterm ptime

parl has feature packages:

ev — handling of goroutine-based functions
goid — unique goroutine IDs
mains — functions for writing command-line utilities and services
parlca — self-signed certificate authority
progress — monitor work progress for a large number of threads
sqlite — local SQL database
threadprof — profiling and counters for what threads are doing
// statuser: thread hang detector
tracer — event lists by task rather than by time or thread

parl features per-writer thread-safe logging with topic and per-package output control:

Logging is to stderr except for the Out function. parl logging uses comma separator for numbers. One argument is output as string, two or more arguments is Printf. The location matched against the regular expression is full package path, optional type receiver and the funtion name: “github.com/haraldrudell/mypackage.(*MyType).MyFunc”

Out(string, ...interface{}) — Standard output
Log(string, ...interface{}) — Always outputs to stderr
parl.D(string, ...interface{}) — Same as Log, intended for temporary use

Info(string, ...interface{}) — Informational progress messages
SetSilent(true) — removes Info output
IsSilent() — deteremines if Info printing applies

Debug(string, ...interface{}) — only prints for locations where SetDebug(true)
SetDebug(true) — Control Debug() globally, code location for all prints, long stack traces
SetRegexp(regExp string) (err error) — Regular expression controlling local Debug() printing
IsThisDebug() — Determines if debug is active for the executing function

Console(string, ...interface{}) — terminal interactivity output

parl.Recover() and parl.Recover2() thread recovery and mains.Executable.Recover() process recovery:

Threads can provide their errors via the perrors.ParlError thread-safe error store, plain error channels, parl.NBChan[error] or parl.ClosableChan[error]. parl.Recover and parl.Recover2 convert thread panic to error along with regular errors, annotating, retrieving and storing those errors and invoking error handling functions for them. mains.Recover is similar for the process.

func thread(errCh *parl.NBChan[error]) { // real-time non-blocking error channel
  defer errCh.Close() // non-blocking close effective on send complete
  var err error
  defer parl.Recover2(parl.Annotation(), &err, errCh.Send)
  errCh.Ch() <- err // non-blocking
  if err = someFunc(); err != nil {
    err = perrors.Errorf("someFunc: %w", err) // labels and attaches a stack
    return
…
func myThreadSafeThread(wg *sync.WaitGroup, errs *perrors.ParlError) { // ParlError: thread-safe error store
  defer wg.Done()
  var err error
  defer parl.Recover(parl.Annotation(), &err, errs.AddErrorProc)
…

parl package features:

AtomicBool — Thread-safe boolean
Closer — Deferrable, panic-free channel close
ClosableChan — Initialization-free channel with observable deferrable panic-free close
Moderator — A ticketing system for limited parallelism
NBChan — A non-blocking channel with trillion-size dynamic buffer
OnceChan — Initialization-free observable shutdown semaphore implementing Context
SerialDo — Serialization of invocations
WaitGroup —Observable WaitGroup
Debouncer — Invocation debouncer, pre-generics
Sprintf — Supporting thousands separator

Parl is about 9,000 lines of Go code with first line written on November 21, 2018

On March 16th, 2022, parl was open-sourced under an ISC License

© 2018–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)

© 2020–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/) ISC License

Index

Constants

View Source
const (
	Rfc3339s   = "2006-01-02 15:04:05-07:00"
	Rfc3339ms  = "2006-01-02 15:04:05.999-07:00"
	Rfc3339us  = "2006-01-02 15:04:05.999999-07:00"
	Rfc3339ns  = "2006-01-02 15:04:05.999999999-07:00"
	Rfc3339sz  = "2006-01-02T15:04:05Z"
	Rfc3339msz = "2006-01-02T15:04:05.999Z"
	Rfc3339usz = "2006-01-02T15:04:05.999999Z"
	Rfc3339nsz = "2006-01-02T15:04:05.999999999Z"
)
View Source
const (
	SerialDoReady         = 0 + iota
	SerialDoLaunch        // from idle, now time
	SerialDoPending       // queued up invocation, request time
	SerialDoPendingLaunch // launch of pending invocation, request time
	SerialDoIdle          // busy since
)

Variables

This section is empty.

Functions

func AddToPanic

func AddToPanic(panicValue interface{}, additionalErr error) (err error)

func Annotation

func Annotation() (annotation string)

func Closer added in v0.4.0

func Closer[T any](ch chan T, errp *error)

Closer is a deferrable function that closes a channel recovering from panic

func Console

func Console(format string, a ...interface{})

Console always print intended for command-line interactivity if debug is enabled, code location is appended

func D

func D(format string, a ...interface{})

D prints to stderr with code location Thread safe. D is meant for temporary output intended to be removed before check-in

func Debug

func Debug(format string, a ...interface{})

Debug outputs only if debug is configured or the code location package matches regexp

func EnsureError

func EnsureError(panicValue interface{}) (err error)

func Errorf

func Errorf(format string, a ...interface{}) (err error)

parl.Errorf offers stack traces and other rich error feattues of packages perrors and errorglue

func HandleErrp added in v0.2.2

func HandleErrp(fn func(), errp *error)

HandleErrp recovers from panics when executing fn. A panic is stored at errp using error116.AppendError()

func HandlePanic

func HandlePanic(fn func()) (err error)

HandlePanic recovers from panics when executing fn. A panic is returned in err

func HandleParlError added in v0.2.2

func HandleParlError(fn func(), storeError func(error))

HandleErrp recovers from panics when executing fn. A panic is provided to the storeError function. storeError can be the thread-safe error116.ParlError.AddErrorProc()

func Info

func Info(format string, a ...interface{})

Info prints unless silence has been configured with SetSilence(true) IsSilent deteremines the state of silence if debug is enabled, code location is appended

func IsSilent

func IsSilent() (isSilent bool)

IsSilent if true it means that Info does not print

func IsThisDebug

func IsThisDebug() bool

IsThisDebug returns whether debug logging is configured for the executing function

func Log

func Log(format string, a ...interface{})

Log invocations always print if debug is enabled, code location is appended

func New

func New(s string) error

parl.New offers stack traces and other rich error feattues of packages perrors and errorglue

func NewDebouncer

func NewDebouncer(d time.Duration, receiver ReceiverFunc, sender SenderFunc, ctx context.Context) (err error)

Debouncer debounces event streams of Value

func Out

func Out(format string, a ...interface{})

Out prints extected output to stdout

func Recover

func Recover(annotation string, errp *error, onError func(error))

Recover recovers from a panic invoking a function no more than once. If there is *errp does not hold an error and there is no panic, onError is not invoked. Otherwise, onError is invoked exactly once. *errp is updated with a possible panic.

func Recover2

func Recover2(annotation string, errp *error, onError func(error))

Recover2 recovers from a panic and may invoke onError multiple times. onError is invoked if there is an error at *errp and on a possible panic. *errp is updated with a possible panic.

func SetDebug

func SetDebug(debug bool)

if SetDebug is true, all Debug prints everywhere produce output. More selective debug printing can be achieved using SetInfoRegexp that matches package names.

func SetRegexp

func SetRegexp(regExp string) (err error)

func SetSilent

func SetSilent(silent bool)

SetSilent

func Sprintf added in v0.2.0

func Sprintf(format string, a ...interface{}) string

Sprintf is a printer that supports comma in large numbers

Types

type AdbAdressProvider added in v0.4.0

type AdbAdressProvider interface {
	// AdbSocketAddress retrievs the tcp socket address used
	// by a near Adbette implementation
	AdbSocketAddress() (socketAddress AdbSocketAddress)
}

AdressProvider retrieves the address from an adb server or device so that custom devices can be created

type AdbRequest added in v0.3.0

type AdbRequest string

AdbRequest is a string formatted as an adb request. AdbRequest is only required for implementations using the Adbette protocol impementation

type AdbResponseID added in v0.4.0

type AdbResponseID string

type AdbSocketAddress added in v0.4.0

type AdbSocketAddress string

AdbSocketAddress is a tcp socket address accessible to the local host. The format is two parts separated by a colon. The first part is an IP address or hostname. The second part is a numeric port number. The empty string "" represents "localhost:5037". If the port part is missing, such as "localhost" it implies port 5037. If the host part is missing, it implies "localhost". Note that locahost is valid both for IPv4 and IPv6.

type AdbSyncRequest added in v0.4.0

type AdbSyncRequest string

type Adbette added in v0.3.0

type Adbette interface {
	// SendReadOkay sends a request to a remote adb endpoint.
	// if anything else than OKAY is received back from the
	// remote endpoint, err is non-nil.
	SendReadOkay(request AdbRequest) (err error)
	// ReadString reads utf-8 text up to 64 KiB-1 in length
	ReadString() (s string, err error)
	// ConnectToDevice sends a forwarding request to an adb
	// server to connect to one of its devices
	ConnectToDevice(serial AndroidSerial) (err error)
	// Shell executes a shell command on a device connected to the adb server.
	// out is a combination of stderr and stdout.
	// The status code from an on-device command cannot be obtained
	Shell(command string, reader ...func(conn io.ReadWriteCloser) (err error)) (out string, err error)
	// TrackDevices orders a server to emit serial number as they become available
	TrackDevices() (err error)
	// Devices lists the currently online serials
	Devices() (serials []AndroidSerial, err error)
	// DeviceStati returns all available serials and their status
	// The two slices correspond and are of the same length
	DeviceStati() (serials []AndroidSerial, stati []AndroidStatus, err error)
	// Closer closes an adb connection, meant to be a deferred function
	Closer(errp *error)
	// SetSync sets the protocol mode of an adb device connection to sync
	SetSync() (err error)
	// LIST is a sync request that lists file system entries in a directory of an adb device
	LIST(remoteDir string, dentReceiver func(mode uint32, size uint32, time uint32, byts []byte) (err error)) (err error)
	// RECV fetches the contents of a file on an adb device
	RECV(remotePath string, blobReceiver func(data []byte) (err error)) (err error)
	// CancelError is a value that a LIST or RECV callback routines can return to
	// cancel further invocations
	CancelError() (cancelError error)

	SendBlob(syncRequest AdbSyncRequest, data []byte) (err error)
	ReadBlob() (byts []byte, err error)
	ReadResponseID() (responseID AdbResponseID, err error)
	ReadBytes(byts []byte) (err error)
	SendBytes(byts []byte) (err error)
}

Adbette is a minimal implementation of the adb Android debug bridge protocol. Adbette include both adb server and Android device functions. Adbette is extensible in that additional protocol features are easily implemented without concern for protocol details. to shutdown an Adbette and release its resouces, invoke the Closer method

type Adbetter added in v0.3.0

type Adbetter interface {
	NewConnection(address AdbSocketAddress, ctx context.Context) (conn Adbette, err error)
}

Adbetter is a factory instance for connections featuring Adbette context is used for terminating a connection attempt. context is not retained in the connection.

type AndroidSerial added in v0.3.0

type AndroidSerial string

AndroidSerial uniquely identities an Android device. It is typically a string of a dozen or so 8-bit chanacters consisting of lower and upper case a-zA-Z0-9

type AndroidStatus added in v0.3.0

type AndroidStatus string

AndroidStatus indicates the current status of a device known to a Server or Serverette it is a single word of ANSII-set characters

const AndroidOnline AndroidStatus = "device"

AndroidOnline is the Android device status that indicates an online device

type AtomicBool

type AtomicBool struct {
	// contains filtered or unexported fields
}

AtomicBool is a thread-safe flag. AtomicBool requires no initialization

var isDone parl.AtomicBool
if isDone.Set() // isDone was not set, but is set now
…
if !isDone.IsTrue() // isDone is not set

func (*AtomicBool) Clear

func (ab *AtomicBool) Clear() (wasSet bool)

func (*AtomicBool) IsTrue

func (ab *AtomicBool) IsTrue() (isTrue bool)

func (*AtomicBool) Set

func (ab *AtomicBool) Set() (wasNotSet bool)

type ClosableChan added in v0.4.0

type ClosableChan[T any] struct {
	// contains filtered or unexported fields
}

ClosableChan wraps channel close. ClosableChan is an initialization-free channel with a deferable, thread-safe, idempotent and observable Close method. Close closes the channel exactly once, recovering panics. IsClosed provides wether the Close method did execute close.

var errCh parl.ClosableChan[error]
go thread(&errCh)
err, ok := <-errCh.Ch()
if errCh.isClosed() { // can be inspected
…
func thread(errCh *parl.ClosableChan[error]) {
  defer errCh.Close(nil) // will not terminate the process
  errCh.Ch() <- err

func NewClosableChan added in v0.4.5

func NewClosableChan[T any](ch ...chan T) (cl *ClosableChan[T])

NewClosableChan ensures a chan does not throw

func (*ClosableChan[T]) Ch added in v0.4.0

func (cl *ClosableChan[T]) Ch() (ch chan T)

Ch retrieves the channel

func (*ClosableChan[T]) Close added in v0.4.0

func (cl *ClosableChan[T]) Close(errp ...*error) (err error, didClose bool)

Close ensures the channel is closed. Close does not panic. Close is thread-safe. Close does not return until the channel is closed. Upon return, all invocations have a possible close error in err. if errp is non-nil, it is updated with a possible error didClose indicates whether this invocation closed the channel

func (*ClosableChan[T]) IsClosed added in v0.4.0

func (cl *ClosableChan[T]) IsClosed() (isClosed bool)

IsClosed indicates whether the Close method has been invoked

type Counter added in v0.3.0

type Counter interface {
	Inc() (counter Counter)
	Dec() (counter Counter)
	CounterValue(reset bool) (values CounterValues)
}

type CounterID added in v0.3.0

type CounterID string

type CounterValues added in v0.3.0

type CounterValues interface {
	Get() (value uint64, running uint64, max uint64, incRate uint64, decRate uint64)
	Value() (value uint64)
	Running() (running uint64)
	Max() (max uint64)
	IncRate() (incRate uint64)
	DecRate() (decRate uint64)
}

type Counters added in v0.3.0

type Counters interface {
	GetOrCreateCounter(name CounterID) (counter Counter)
	GetCounters() (orderedKeys []CounterID, m map[CounterID]Counter)
	ResetCounters()
}

Counters are extremely useful to determine that your code abide by intended paralellism and identifying hangs or abnormalisms. Printing counters every second can verify adequate progress and possibly identify blocking of threads or swapping and garbage collection outages. Inc increases two counters while Dec decreases one, enabling tracking both number of invocations as well as how many remain running by doing Inc and a deferred Dec.

type CountersFactory added in v0.3.0

type CountersFactory interface {
	NewCounters(useCounters bool) (counters Counters)
}

type Dent added in v0.3.0

type Dent interface {
	// Name is utf-8 base path in device file system.
	// Name is base name, ie. file name and extension.
	Name() (name string)
	// Modified time, the time file contents was changed, second precision, continuous time
	Modified() (modified time.Time)
	// IsDir indicates directory.
	// LIST only support symbolic link, directory and regular file types
	IsDir() (isDir bool)
	// IsRegular indicates regular file, ie. not a directory or symbolic link.
	// LIST only support symbolic link, directory and regular file types
	IsRegular() (isRegular bool) // ie.not directory or symlink
	// Perm returns os.FileMode data.
	// 9-bit Unix permissions per os.FilePerm.
	// LIST also supports directory and symlink bits
	Perm() (perm fs.FileMode)
	// Size is limited to 4 GiB-1
	Size() (size uint32)
}

Dent is the information returned by adb ls or LIST

type Devicette added in v0.3.0

type Devicette interface {
	// Serial returns the serial number for this device
	Serial() (serial AndroidSerial)
	// Shell executes a shell command on the device.
	// The resulting socket can be obtained either using the reader callback,
	// which is a socket connection to the device,
	// or by collecting the out string.
	Shell(command string, reader func(conn io.ReadWriteCloser) (err error)) (out string, err error)
	/*
		Pull copies a remote file or directory on the Android device to a local file system location.
		the local file must not exist.
		Pull refuses certain files like product apks. shell cat works
	*/
	Pull(remotePath, nearPath string) (err error)
	/*
		List has some peculiarities:
		If remoteDir is not an existing directory, an empty list is returned.
		Entries with insufficient permisions are ignored.
		Update: . and .. are removed, adb LIST ortherwise do return those.
		File mode: the only present type bits beyond 9-bit Unix permissions are
		symlink, regular file and directory.
		File size is limited to 4 GiB-1.
		Modification time resolution is second and range is confined to a 32-bit Unix timestamp.
	*/
	List(remoteDir string) (dFileInfo []Dent, err error)
}

Devicette is a generic implementation of the capabilities of a device implementing the adb Android debug bridge protocol

type DevicetteFactory added in v0.4.0

type DevicetteFactory interface {
	// NewDevicette creates a Devicette interacting with remote adb Android Debug Bridge
	// devices via an adb server available at the socket address address
	NewDevicette(address AdbSocketAddress, serial AndroidSerial, ctx context.Context) (devicette Devicette)
}

DevicetteFactory describes how Devicette objects are obtained.

type ErrorMethod added in v0.4.9

type ErrorMethod uint8
const (
	// EMsharedChannel outputs thread errors in real-time using a channel shared by multiple threads
	EMsharedChannel ErrorMethod = iota + 1
	// EMdedicatedChannel outputs thread errors in real-time using a channle unique to the thread
	EMdedicatedChannel
	EMsharedParlErrors
	EMdedicatedParlErrors
)

type FSLocation

type FSLocation interface {
	Directory() (directory string)
}

type FanOut

type FanOut struct {
	ErrCh   chan error
	Results chan interface{}
	// contains filtered or unexported fields
}

func NewFanOut

func NewFanOut() (fo *FanOut)

func (*FanOut) Do

func (cr *FanOut) Do(name string, proc FanProc)

Do executes a procedure in a goroutine that has no result other than a possible non-nil error

func (*FanOut) Run

func (cr *FanOut) Run(name string, thunk FanThunk)

Run executes a thunk in a goroutine with a possible non-nil result and a possible non-nil error

func (*FanOut) Wait

func (cr *FanOut) Wait()

Wait waits for all Do and Run invocations to complete, then shuts down

type FanProc

type FanProc func() (err error)

type FanThunk

type FanThunk func() (result interface{}, err error)

type History added in v0.3.0

type History interface {
	Event(event string, ID0 ...goid.ThreadID)
	GetEvents() (events map[goid.ThreadID][]string)
}

History may be deprecated in favor of Tracer

type HistoryFactory added in v0.3.0

type HistoryFactory interface {
	NewThreadHistory(useEvents bool, useHistory bool) (history History)
}

type Moderator

type Moderator struct {
	// contains filtered or unexported fields
}

Moderator invokes functions at a limited level of parallelism. It is a ticketing system

m := NewModerator(20, ctx)
m.Do(func() (err error) { // waiting here for a ticket
  // got a ticket!
  …
  return or panic // ticket automatically returned
m.String() → waiting: 2(20)

func NewModerator

func NewModerator(parallelism uint64, ctx context.Context) (mo *Moderator)

NewModerator creates a new Moderator used to limit parallelism

func (*Moderator) Do

func (mo *Moderator) Do(fn func() error) (err error)

Do calls fn limited by the moderator’s parallelism. If the moderator is shut down, ErrModeratorShutdown is returned

func (*Moderator) Status

func (mo *Moderator) Status() (parallelism uint64, active uint64, waiting uint64, isShutdown bool)

func (*Moderator) String

func (mo *Moderator) String() (s string)

type NBChan added in v0.4.0

type NBChan[T any] struct {
	perrors.ParlError // thread panics
	// contains filtered or unexported fields
}

NBChan is a non-blocking send channel with trillion-size buffer. NBChan can be used as an error channel where the thread does not block from a delayed or missing reader. NBChan is initialization-free, thread-safe, idempotent, deferrable and observable. Ch(), Send(), Close() CloseNow() IsClosed() Count() are not blocked by channel send and are panic-free. Close() CloseNow() are deferrable. WaitForClose() waits until the underlying channel has been closed. NBChan implements a thread-safe error store perrors.ParlError. NBChan.GetError() returns thread panics and close errors. No errors are added to the error store after the channel has closed. NBChan does not generate errors. When it does, errors are thread panics and close errors.

var errCh parl.NBChan[error]
go thread(&errCh)
err, ok := <-errCh.Ch()
errCh.WaitForClose()
errCh.GetError()
…
func thread(errCh *parl.NBChan[error]) {
  defer errCh.Close() // non-blocking close effective on send complete
  var err error
  defer parl.Recover(parl.Annotation(), &err, errCh.AddErrorProc)
  errCh.Ch() <- err // non-blocking
  if err = someFunc(); err != nil {
    err = perrors.Errorf("someFunc: %w", err)
    return

func NewNBChan added in v0.4.0

func NewNBChan[T any](ch ...chan T) (nbChan *NBChan[T])

NewNBChan instantiates a non-blocking trillion-size buffer channel. NewNBChan allows initialization based on an existing channel. NewNBChan does not need initialization and can be used like:

var nbChan NBChan[error]
go thread(&nbChan)

func (*NBChan[T]) Ch added in v0.4.0

func (nb *NBChan[T]) Ch() (ch chan T)

Ch obtains the channel

func (*NBChan[T]) Close added in v0.4.0

func (nb *NBChan[T]) Close() (didClose bool)

Close orders the channel to close once pending sends complete. Close is thread-safe, non-blocking and panic-free.

func (*NBChan[T]) CloseNow added in v0.4.5

func (nb *NBChan[T]) CloseNow(errp ...*error) (err error, didClose bool)

CloseNow closes without waiting for sends to complete. Close does not panic. Close is thread-safe. Close does not return until the channel is closed. Upon return, all invocations have a possible close error in err. if errp is non-nil, it is updated with error status

func (*NBChan[T]) Count added in v0.4.0

func (nb *NBChan[T]) Count() (unsentCount int)

Count returns number of unsent values

func (*NBChan[T]) IsClosed added in v0.4.0

func (nb *NBChan[T]) IsClosed() (isClosed bool)

IsClosed indicates whether the channel has actually closed

func (*NBChan[T]) Send added in v0.4.0

func (nb *NBChan[T]) Send(value T)

Send sends non-blocking on the channel

func (*NBChan[T]) WaitForClose added in v0.4.5

func (nb *NBChan[T]) WaitForClose()

type OnceChan added in v0.3.0

type OnceChan struct {
	// contains filtered or unexported fields
}

OnceChan is a semaphore implementing the Context with Cancel interface. Whenever a context is required for cancellation, a OnceChan can be used in its place. Unlike context, OnceChan requires no initialization. Similar to Context, OnceChan can be waited on like a channel using Done(). OnceChan can be inspected using IsDone() or Err(). OnceChan is cancelled using .Cancel()

var semaphore OnceChan
go func() {
  <-onceChan.Done()
}()
…
semaphore.Cancel()
…
semaphore.IsDone()

func (*OnceChan) Cancel added in v0.3.0

func (oc *OnceChan) Cancel()

func (*OnceChan) Deadline added in v0.3.0

func (oc *OnceChan) Deadline() (deadline time.Time, ok bool)

func (*OnceChan) Done added in v0.3.0

func (oc *OnceChan) Done() (ch <-chan struct{})

func (*OnceChan) Err added in v0.3.0

func (oc *OnceChan) Err() (err error)

func (*OnceChan) IsDone added in v0.3.0

func (oc *OnceChan) IsDone() (isDone bool)

func (*OnceChan) Value added in v0.3.0

func (oc *OnceChan) Value(key any) (value any)

type Password

type Password interface {
	HasPassword() (hasPassword bool)
	Password() (password string)
}

type ReceiverFunc

type ReceiverFunc func(c <-chan time.Time, done <-chan struct{}) (which TriggeringChan, value Value)

ReceiverFunc takes two channels, listens to them and a typed channel, returns what channel triggered and a possible untyped value

type SenderFunc

type SenderFunc func([]Value)

SenderFunc takes an untyped value, type asserts and sends on a typed channel

type SerialDo

type SerialDo struct {
	ErrCh chan error
	ID    string
	Wg    sync.WaitGroup
	// contains filtered or unexported fields
}

serialdo invokes method in sequence

func NewSerialDo

func NewSerialDo(thunk func(), eventReceiver SerialDoFunc, ctx context.Context) (sdo *SerialDo)

NewSerialDo SerialDo. errors on sdo.ErrCh

func (*SerialDo) Do

func (sdo *SerialDo) Do(now time.Time) (nowPending bool)

Invoke thunk serially, maximum queue one invocation, drop additional invocation requests prior to idle. non-blocking Thread-safe

func (*SerialDo) Shutdown

func (sdo *SerialDo) Shutdown()

type SerialDoEvent

type SerialDoEvent uint8

type SerialDoFunc

type SerialDoFunc func(SerialDoEvent, *SerialDo, *time.Time)

type ServerFactory added in v0.3.0

type ServerFactory interface {
	// Adb connects to an adb adb Android Debug Bridge server on a specified tcp socket
	Adb(address AdbSocketAddress, ctx context.Context) (server Serverette)
	// AdbLocalhost connects to an adb Android Debug Bridge server on the local computer
	AdbLocalhost(ctx context.Context) (server Serverette)
}

ServerFactory describes how AdbServer objects are obtained. Such servers may use duifferent protocol implementations from Adbette

type Serverette added in v0.3.0

type Serverette interface {
	AdbAdressProvider // AdbSocketAddress()
	// DeviceSerialList lists serials for the currently online Android devices
	DeviceSerialList() (serials []AndroidSerial, err error)
	// DeviceStati lists all serials currently known to the server along with
	// their current status.
	// The two slices correspond and are of the same length
	DeviceStati() (serials []AndroidSerial, stati []AndroidStatus, err error)
	// TrackDevices emits serial numbers for devices that come online.
	// serials are sent on the serials channel.
	// if err is non-nil, set-up of failed.
	// The errs channel closes when watching stops
	// Watching is stopped by calling cancel function or when the server’s context terminates
	TrackDevices() (tracker Trackerette, err error)
}

Serverette is a generic representation of an adb server running on a host.

command-line adb: As of Android 12, Android Debug Bridge version 1.0.41 Version 32.0.0-8006631 has the following commands are supported: devices connect disconnect pair forward ppp reverse mdns push pull sync shell emu install install-multiple install-multiple-package uninstall bugreport jdwp logcat disable-verity enable-verity keygen wait-for* get-state get-serialno get-devpath remount reboot sideload root unroot usb tcpip start-server kill-server reconnect attach detach

type ServeretteFactory added in v0.3.0

type ServeretteFactory interface {
	// Adb connects to an adb adb Android Debug Bridge server on a specified tcp socket.
	// address is a string default "localhost:5037" and default port ":5037".
	// adbetter is a factory for Adbette connections.
	NewServerette(address AdbSocketAddress, adbetter Adbetter, ctx context.Context) (server Serverette)
}

ServeretteFactory is a Server connection factory for Adbette implementations

type Statuser added in v0.3.0

type Statuser interface {
	Set(status string) (statuser Statuser)
	Shutdown()
}

Statuser prints threads that do not react within a specified time frame. This is useful during early multi-threaded design when it is uncertain why work is not progressing. Is it your code or their code? Once the program concludes without hung threads, Tracer is the better tool to identify issues.

type StatuserFactory added in v0.3.0

type StatuserFactory interface {
	NewStatuser(useStatuser bool, d time.Duration) (statuser Statuser)
}

type Thread added in v0.4.9

type Thread interface {
	NotifyStart()
	Send(ID channelID, value interface{})
	Receive(ID channelID) (value interface{})
	AddError(err error)
	// Done signals thread possibly without error
	Done()
	// DoneFailure signals thread ending with error
	DoneFailure(err error)
}

type ThreadControl added in v0.4.9

type ThreadControl interface {
	SetErrorMethod()
	AddChannel(ID channelID)
}

type ThreadGroup added in v0.4.9

type ThreadGroup interface {
	Add()
	AddThread(input bool, errorMethod ErrorMethod) (threadID ThreadID, thread Thread, threadControl ThreadControl)
	WaitForThread(threadID ThreadID)
	WaitPeriod(duration time.Duration) (isDone bool, threadList []ThreadControl)
	Wait()
}

type ThreadGroupFactory added in v0.4.9

type ThreadGroupFactory[T any] interface {
	NewThreadGroup() (threadGroup ThreadGroup)
}

type ThreadID added in v0.4.9

type ThreadID string

type ThreadInput added in v0.4.9

type ThreadInput[T any] interface {
	Value() (value T)
}

type ThreadInputFactory added in v0.4.9

type ThreadInputFactory[T any] interface {
	NewThreadInput(value T) (input ThreadInput[T])
}

type ThreadResult added in v0.4.9

type ThreadResult[T any] interface {
	Result() (result T)
}

type ThreadResultFactory added in v0.4.9

type ThreadResultFactory[T any] interface {
	NewThreadResult(thread Thread, result T) ThreadResult[T]
}

type Timer

type Timer struct {
	Label string
	// contains filtered or unexported fields
}

Timer is a simple request timer

func NewTimer

func NewTimer(label string) (t *Timer)

NewTimer gets a simple timer with duration or string output

func (*Timer) End

func (t *Timer) End() (d time.Duration)

End gets duration

func (*Timer) Endms

func (t *Timer) Endms() (ms string)

Endms gets tring with duration in ms

type Tracer added in v0.3.0

type Tracer interface {
	// AssignTaskToThread assigns a Thread to a task
	AssignTaskToThread(threadID goid.ThreadID, task TracerTaskID) (tracer Tracer)
	// RecordTaskEvent adds an event to the task threadID is currently assigned to.
	// If threadID is not assigned, a new task is created.
	RecordTaskEvent(threadID goid.ThreadID, text string) (tracer Tracer)
	// Records returns the current map of tasks and their events
	Records(clear bool) (records map[TracerTaskID][]TracerRecord)
}

Tracer lists events in terms of tasks rather than per time or thread. A task is executed by threads assigned to it. Threads are uniquely identified by threadID. A task can have zero or one threads assigned at any one time. A thread can be assigned to zero or one tasks. Each task has an ID, a name and a list of events and thread assignments Tracer can record branching in the code and return that for a particular item being processed. For an item processed incorrectly, or when threads hang, Tracer will find unfavorable branching and last known locations.

type TracerFactory added in v0.3.0

type TracerFactory interface {
	// NewTracer creates tracer storage.
	// use false indicates a nil Tracer whose output will not be used
	NewTracer(use bool) (tracer Tracer)
}

type TracerRecord added in v0.3.0

type TracerRecord interface {
	Values() (at time.Time, text string)
}

type TracerTaskID added in v0.3.0

type TracerTaskID string

type Trackerette added in v0.3.0

type Trackerette interface {
	// Serials emit serial number as online devices become available
	Serials() (serials <-chan AndroidSerial)
	// Errs is available once Serials close. It returns any errors
	Errs() (err error)
	// Cancel shuts down the Tracker
	Cancel()
}

Trackerette represents a server connection emitting device serial numbers

type TriggeringChan

type TriggeringChan uint8
const (
	TimerCh TriggeringChan = iota
	DoneCh
	ValueCh
)

type Value

type Value interface{}

Value is an event value that is being debounced

type WaitGroup added in v0.3.0

type WaitGroup struct {
	sync.WaitGroup
	// contains filtered or unexported fields
}

parl.WaitGroup is like a sync.Waitgroup that can be inspected. The Waiting method returns the number of threads waited for. parl.WaitGroup requires no initialization.

var wg parl.WaitGroup
wg.Add(1)
…
wg.Waiting()

func (*WaitGroup) Add added in v0.3.0

func (wg *WaitGroup) Add(delta int)

func (*WaitGroup) Counter added in v0.4.5

func (wg *WaitGroup) Counter() (counter int)

func (*WaitGroup) Done added in v0.3.0

func (wg *WaitGroup) Done()

func (*WaitGroup) IsZero added in v0.4.5

func (wg *WaitGroup) IsZero() (isZero bool)

Directories

Path Synopsis
Package errorglue contains helful declarations that are not important
Package errorglue contains helful declarations that are not important
Package ev provides standardized goroutine management events contain thread completions, failures and any type of data items.
Package ev provides standardized goroutine management events contain thread completions, failures and any type of data items.
Package evx contains declarations not essential to event handling
Package evx contains declarations not essential to event handling
Package goid provides goid.GoID(), unique goroutine identifiers m := map[goid.ThreadID]SomeInterface{} m[goid.GoID()] = …
Package goid provides goid.GoID(), unique goroutine identifiers m := map[goid.ThreadID]SomeInterface{} m[goid.GoID()] = …
mains module
omaps module
Package parlca provides a self-signed certificate authority
Package parlca provides a self-signed certificate authority
Package error116 enrichens error values with string data, stack traces, associated errors, less severe warnings, thread-safe containers and comprehensive error string representations.
Package error116 enrichens error values with string data, stack traces, associated errors, less severe warnings, thread-safe containers and comprehensive error string representations.
pfs module
Package parlnet provides IP-related functions with few dependencies beyond the net package
Package parlnet provides IP-related functions with few dependencies beyond the net package
Package parlos provides simplified functions related to the os package
Package parlos provides simplified functions related to the os package
process module
Package progress provides printable progress reporting for multi-threaded operations
Package progress provides printable progress reporting for multi-threaded operations
Package pruntime provides an interface to the Go standard library’s runtime package using only serializable simple types Stack traces and code locations have several formats: codeLocation := pruntime.NewCodeLocation(0) codeLocation.Base() // package and type → mypackage.(*MyType).MyFunc codeLocation.PackFunc() // very brief → mypackage.MyFunc codeLocation.Name(): // function name only → MyFunc codeLocation.Short() // line, no package path → mypackage.(*MyType).MyFunc-myfile.go:19 codeLocation.Long() // uniquely identifiable → codeberg.org/haraldrudell/mypackage.(*MyType).MyFunc-myfile.go:19 codeLocation.Full() // everything → codeberg.org/haraldrudell/mypackage.(*MyType).MyFunc-/fs/mypackage/myfile.go:19 codeLocation.String() // two lines → "codeberg.org/haraldrudell/mypackage.(*MyType).MyFunc\n /fs/mypackage/myfile.go:19" Stack can determine where a goroutine was created and whether this is the main thread pruntime.GoRoutineID() → 1 pruntime.NewStack(0).Creator.Short() → main.main-pruntime.go:30 fmt.Println(pruntime.NewStack(0).IsMainThread) → true pruntime.NewStack(0).Frames[0].Args → (0x104c12c60?)
Package pruntime provides an interface to the Go standard library’s runtime package using only serializable simple types Stack traces and code locations have several formats: codeLocation := pruntime.NewCodeLocation(0) codeLocation.Base() // package and type → mypackage.(*MyType).MyFunc codeLocation.PackFunc() // very brief → mypackage.MyFunc codeLocation.Name(): // function name only → MyFunc codeLocation.Short() // line, no package path → mypackage.(*MyType).MyFunc-myfile.go:19 codeLocation.Long() // uniquely identifiable → codeberg.org/haraldrudell/mypackage.(*MyType).MyFunc-myfile.go:19 codeLocation.Full() // everything → codeberg.org/haraldrudell/mypackage.(*MyType).MyFunc-/fs/mypackage/myfile.go:19 codeLocation.String() // two lines → "codeberg.org/haraldrudell/mypackage.(*MyType).MyFunc\n /fs/mypackage/myfile.go:19" Stack can determine where a goroutine was created and whether this is the main thread pruntime.GoRoutineID() → 1 pruntime.NewStack(0).Creator.Short() → main.main-pruntime.go:30 fmt.Println(pruntime.NewStack(0).IsMainThread) → true pruntime.NewStack(0).Frames[0].Args → (0x104c12c60?)
Package sqldb interfaces database/sql
Package sqldb interfaces database/sql
pterm module
Package parltime provides time utility functions
Package parltime provides time utility functions
sqliter module
Package ghandi interfaces Android devices Package ghandi interfaces Android devices Package ghandi interfaces Android devices Package threadprof provide profiling of threading
Package ghandi interfaces Android devices Package ghandi interfaces Android devices Package ghandi interfaces Android devices Package threadprof provide profiling of threading
watchfs module
yaml module
yamler module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL