terminal

package
v1.2.56 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2023 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MsgStreamLinked = "stream has already been operated upon or closed"
	MsgConsumed     = "source already consumed or closed"
)

Variables

View Source
var (
	ParallelTargetSize = runtime.GOMAXPROCS(-1)
)

Functions

func CopyInto

func CopyInto(ctx context.Context, wrappedSink Sink, spliterator spliterator.Spliterator)

Types

type AbstractPipeline

type AbstractPipeline struct {
	class.Class
	// contains filtered or unexported fields
}

*

  • Abstract base class for "pipeline" classes, which are the core
  • implementations of the Stream interface and its primitive specializations.
  • Manages construction and evaluation of stream pipelines. *
  • <p>An {@code AbstractPipeline} represents an initial portion of a stream
  • pipeline, encapsulating a stream source and zero or more intermediate
  • operations. The individual {@code AbstractPipeline} objects are often
  • referred to as <em>stages</em>, where each stage describes either the stream
  • source or an intermediate operation. *
  • <p>A concrete intermediate stage is generally built from an
  • {@code AbstractPipeline}, a shape-specific pipeline class which extends it
  • (e.g., {@code IntPipeline}) which is also abstract, and an operation-specific
  • concrete class which extends that. {@code AbstractPipeline} contains most of
  • the mechanics of evaluating the pipeline, and implements methods that will be
  • used by the operation; the shape-specific classes add helper methods for
  • dealing with collection of results into the appropriate shape-specific
  • containers. *
  • <p>After chaining a new intermediate operation, or executing a terminal
  • operation, the stream is considered to be consumed, and no more intermediate
  • or terminal operations are permitted on this stream instance. *
  • @implNote
  • <p>For sequential streams, and parallel streams without
  • <a href="package-summary.html#StreamOps">stateful intermediate
  • operations</a>, parallel streams, pipeline evaluation is done in a single
  • pass that "jams" all the operations together. For parallel streams with
  • stateful operations, execution is divided into segments, where each
  • stateful operations marks the end of a segment, and each segment is
  • evaluated separately and the result used as the input to the next
  • segment. In all cases, the source data is not consumed until a terminal
  • operation begins. *
  • @param <E_IN> type of input elements
  • @param <E_OUT> type of output elements
  • @param <S> type of the subclass implementing {@code BaseStream}
  • @since 1.8

func (*AbstractPipeline) Evaluate

func (p *AbstractPipeline) Evaluate(ctx context.Context, terminalOp Operation, split spliterator.Spliterator) optional.Optional

*

  • Evaluate the pipeline with a terminal operation to produce a result. *
  • @param <R> the type of result
  • @param terminalOp the terminal operation to be applied to the pipeline.
  • @return the result

func (*AbstractPipeline) GetTargetSize

func (p *AbstractPipeline) GetTargetSize(sizeEstimate int) int

*

  • Returns the targetSize, initializing it via the supplied
  • size estimate if not already initialized.

func (*AbstractPipeline) IsLinkedOrConsumed

func (p *AbstractPipeline) IsLinkedOrConsumed() bool

func (*AbstractPipeline) IsParallel

func (p *AbstractPipeline) IsParallel() bool

func (*AbstractPipeline) SetLinkedOrConsumed

func (p *AbstractPipeline) SetLinkedOrConsumed(linkedOrConsumed bool)

func (*AbstractPipeline) SetParallel

func (p *AbstractPipeline) SetParallel(parallel bool)

func (*AbstractPipeline) SetTargetSize

func (p *AbstractPipeline) SetTargetSize(targetSize int)

func (*AbstractPipeline) SuggestTargetSize

func (p *AbstractPipeline) SuggestTargetSize(sizeEstimate int) int

*

  • Returns a suggested target leaf size based on the initial size estimate. *
  • @return suggested target leaf size

type Operation

type Operation interface {
	/**
	 * Gets the stream flags of the operation.  Terminal operations may set a
	 * limited subset of the stream flags defined in {@link StreamOpFlag}, and
	 * these flags are combined with the previously combined stream and
	 * intermediate operation flags for the pipeline.
	 *
	 * @implSpec The default implementation returns zero.
	 *
	 * @return the stream flags for this operation
	 * @see StreamOpFlag
	 */
	GetOpFlags() int

	/**
	 * Performs a parallel evaluation of the operation using the specified
	 * {@code PipelineHelper}, which describes the upstream intermediate
	 * operations.
	 *
	 * @implSpec The default performs a sequential evaluation of the operation
	 * using the specified {@code PipelineHelper}.
	 *
	 * @param helper the pipeline helper
	 * @param spliterator the source spliterator
	 * @return the result of the evaluation
	 */
	EvaluateParallel(ctx context.Context, spliterator spliterator.Spliterator) optional.Optional

	/**
	 * Performs a sequential evaluation of the operation using the specified
	 * {@code PipelineHelper}, which describes the upstream intermediate
	 * operations.
	 *
	 * @param helper the pipeline helper
	 * @param spliterator the source spliterator
	 * @return the result of the evaluation
	 */
	EvaluateSequential(ctx context.Context, spliterator spliterator.Spliterator) optional.Optional
}

*

  • An operation in a stream pipeline that takes a stream as input and produces
  • a result or side-effect. A {@code Op} has an input type and stream
  • shape, and a result type. A {@code Op} also has a set of
  • <em>operation flags</em> that describes how the operation processes elements
  • of the stream (such as short-circuiting or respecting encounter order; see
  • {@link StreamOpFlag}). *
  • <p>A {@code Op} must provide a sequential and parallel implementation
  • of the operation relative to a given stream source and set of intermediate
  • operations. *
  • @param <E_IN> the type of input elements
  • @param <R> the type of the result
  • @since 1.8

type ShortCircuitTask

type ShortCircuitTask interface {
	Task

	SharedResult() *sharedResult
	/**
	 * The result for this computation; this is shared among all tasks and set
	 * exactly once
	 */
	GetSharedResult() Sink

	/**
	 * Declares that a globally valid result has been found.  If another task has
	 * not already found the answer, the result is installed in
	 * {@code sharedResult}.  The {@code compute()} method will check
	 * {@code sharedResult} before proceeding with computation, so this causes
	 * the computation to terminate early.
	 *
	 * @param result the result found
	 */
	ShortCircuit(result Sink)

	/**
	 * Mark this task as canceled
	 */
	Cancel()

	/**
	 * Queries whether this task is canceled.  A task is considered canceled if
	 * it or any of its parents have been canceled.
	 *
	 * @return {@code true} if this task or any parent is canceled.
	 */
	TaskCanceled() bool

	/**
	 * Cancels all tasks which succeed this one in the encounter order.  This
	 * includes canceling all the current task's right sibling, as well as the
	 * later right siblings of all its parents.
	 */
	CancelLaterNodes()
}

*

  • Abstract class for fork-join tasks used to implement short-circuiting
  • stream ops, which can produce a result without processing all elements of the
  • stream. *
  • @param <P_IN> type of input elements to the pipeline
  • @param <P_OUT> type of output elements from the pipeline
  • @param <R> type of intermediate result, may be different from operation
  • result type
  • @param <K> type of child and sibling tasks
  • @since 1.8

type Sink

type Sink interface {
	sink.Sink
	supplier.OptionalSupplier
}

*

  • A {@link Sink} which accumulates state as elements are accepted, and allows
  • a result to be retrieved after the computation is finished. *
  • @param <T> the type of elements to be accepted
  • @param <R> the type of the result *
  • @since 1.8

func WrapAndCopyInto

func WrapAndCopyInto(ctx context.Context, sink Sink, spliterator spliterator.Spliterator) Sink

*

  • Applies the pipeline stages described by this {@code PipelineHelper} to
  • the provided {@code Spliterator} and send the results to the provided
  • {@code Sink}. *
  • @implSpec
  • The implementation behaves as if:
  • <pre>{@code
  • copyInto(wrapSink(sink), spliterator);
  • }</pre> *
  • @param sink the {@code Sink} to receive the results
  • @param spliterator the spliterator describing the source input to process

type TODOOperation

type TODOOperation struct {
	class.Class
}

func (*TODOOperation) EvaluateParallel

func (op *TODOOperation) EvaluateParallel(ctx context.Context, spliterator spliterator.Spliterator) optional.Optional

func (*TODOOperation) EvaluateSequential

func (op *TODOOperation) EvaluateSequential(ctx context.Context, spliterator spliterator.Spliterator) optional.Optional

func (*TODOOperation) GetOpFlags

func (op *TODOOperation) GetOpFlags() int

type TODOShortCircuitTask

type TODOShortCircuitTask struct {
	TODOTask
	// contains filtered or unexported fields
}

func (*TODOShortCircuitTask) Cancel

func (task *TODOShortCircuitTask) Cancel()

func (*TODOShortCircuitTask) CancelLaterNodes

func (task *TODOShortCircuitTask) CancelLaterNodes()

func (*TODOShortCircuitTask) Compute

func (task *TODOShortCircuitTask) Compute(ctx context.Context)

*

  • Overrides TODOTask version to include checks for early
  • exits while splitting or computing.

func (*TODOShortCircuitTask) GetLocalResult

func (task *TODOShortCircuitTask) GetLocalResult() Sink

*

  • Retrieves the local result for this task. If this task is the root,
  • retrieves the shared result instead.

func (*TODOShortCircuitTask) GetSharedResult

func (task *TODOShortCircuitTask) GetSharedResult() Sink

func (*TODOShortCircuitTask) SetLocalResult

func (task *TODOShortCircuitTask) SetLocalResult(localResult Sink)

*

  • Sets a local result for this task. If this task is the root, set the
  • shared result instead (if not already set). *
  • @param localResult The result to set for this task

func (TODOShortCircuitTask) SetRawResult

func (task TODOShortCircuitTask) SetRawResult(result Sink)

*

  • Does nothing; instead, subclasses should use
  • {@link #setLocalResult(Object)}} to manage results. *
  • @param result must be null, or an exception is thrown (this is a safety
  • tripwire to detect when {@code setRawResult()} is being used
  • instead of {@code setLocalResult()}

func (*TODOShortCircuitTask) SharedResult

func (task *TODOShortCircuitTask) SharedResult() *sharedResult

func (*TODOShortCircuitTask) ShortCircuit

func (task *TODOShortCircuitTask) ShortCircuit(result Sink)

func (*TODOShortCircuitTask) TaskCanceled

func (task *TODOShortCircuitTask) TaskCanceled() bool

func (*TODOShortCircuitTask) WithParent

func (task *TODOShortCircuitTask) WithParent(parent ShortCircuitTask, spliterator spliterator.Spliterator) *TODOShortCircuitTask

*

  • Constructor for non-root nodes. *
  • @param parent parent task in the computation tree
  • @param spliterator the {@code Spliterator} for the portion of the
  • computation tree described by this task

func (*TODOShortCircuitTask) WithSpliterator

func (task *TODOShortCircuitTask) WithSpliterator(spliterator spliterator.Spliterator) *TODOShortCircuitTask

*

  • Constructor for root tasks. *
  • @param helper the {@code PipelineHelper} describing the stream pipeline
  • up to this operation
  • @param spliterator the {@code Spliterator} describing the source for this
  • pipeline

type TODOSink

type TODOSink struct {
	sink.TODO
}

func (*TODOSink) Get

func (sink *TODOSink) Get() optional.Optional

type TODOTask

type TODOTask struct {
	class.Class
	// contains filtered or unexported fields
}

func (*TODOTask) Compute

func (task *TODOTask) Compute(ctx context.Context)

func (*TODOTask) DoLeaf

func (task *TODOTask) DoLeaf(ctx context.Context) Sink

func (*TODOTask) Fork

func (task *TODOTask) Fork(ctx context.Context)

func (*TODOTask) GetLeafTarget

func (task *TODOTask) GetLeafTarget() int

func (*TODOTask) GetLocalResult

func (task *TODOTask) GetLocalResult() Sink

*

  • Retrieves a result previously stored with {@link #setLocalResult} *
  • @return local result for this node previously stored with
  • {@link #setLocalResult}

func (*TODOTask) GetParent

func (task *TODOTask) GetParent() Task

func (*TODOTask) GetSpliterator

func (task *TODOTask) GetSpliterator() spliterator.Spliterator

func (*TODOTask) Invoke

func (task *TODOTask) Invoke(ctx context.Context) Sink

func (*TODOTask) IsLeaf

func (task *TODOTask) IsLeaf() bool

func (*TODOTask) IsLeftmostNode

func (task *TODOTask) IsLeftmostNode() bool

func (*TODOTask) IsRoot

func (task *TODOTask) IsRoot() bool

func (*TODOTask) Join

func (task *TODOTask) Join() Sink

func (*TODOTask) LeftChild

func (task *TODOTask) LeftChild() Task

func (*TODOTask) MakeChild

func (task *TODOTask) MakeChild(spliterator spliterator.Spliterator) Task

func (*TODOTask) OnCompletion

func (task *TODOTask) OnCompletion(caller Task)

func (*TODOTask) RightChild

func (task *TODOTask) RightChild() Task

func (*TODOTask) SetLeftChild

func (task *TODOTask) SetLeftChild(task_ Task)

func (*TODOTask) SetLocalResult

func (task *TODOTask) SetLocalResult(localResult Sink)

*

  • Associates the result with the task, can be retrieved with
  • {@link #GetLocalResult} *
  • @param localResult local result for this node

func (*TODOTask) SetRawResult

func (task *TODOTask) SetRawResult(result Sink)

*

  • Does nothing; instead, subclasses should use
  • {@link #setLocalResult(Object)}} to manage results. *
  • @param result must be null, or an exception is thrown (this is a safety
  • tripwire to detect when {@code setRawResult()} is being used
  • instead of {@code setLocalResult()}

func (*TODOTask) SetRightChild

func (task *TODOTask) SetRightChild(task_ Task)

func (*TODOTask) TargetSize

func (task *TODOTask) TargetSize() int

func (*TODOTask) WithParent

func (task *TODOTask) WithParent(parent Task, spliterator spliterator.Spliterator) *TODOTask

*

  • Constructor for non-root nodes. *
  • @param parent this node's parent task
  • @param spliterator {@code Spliterator} describing the subtree rooted at
  • this node, obtained by splitting the parent {@code Spliterator}

func (*TODOTask) WithSpliterator

func (task *TODOTask) WithSpliterator(spliterator spliterator.Spliterator) *TODOTask

*

  • Constructor for root nodes. *
  • @param helper The {@code PipelineHelper} describing the stream pipeline
  • up to this operation
  • @param spliterator The {@code Spliterator} describing the source for this
  • pipeline

type Task

type Task interface {
	GetSpliterator() spliterator.Spliterator
	/**
	 * Returns the parent of this task, or null if this task is the root
	 *
	 * @return the parent of this task, or null if this task is the root
	 */
	GetParent() Task

	/**
	 * The left child.
	 * null if no children
	 * if non-null rightChild is non-null
	 *
	 * @return the right child
	 */
	LeftChild() Task

	SetLeftChild(task Task)

	/**
	 * The right child.
	 * null if no children
	 * if non-null rightChild is non-null
	 *
	 * @return the right child
	 */
	RightChild() Task

	SetRightChild(task Task)

	/**
	 * Indicates whether this task is a leaf node.  (Only valid after
	 * {@link #compute} has been called on this node).  If the node is not a
	 * leaf node, then children will be non-null and numChildren will be
	 * positive.
	 *
	 * @return {@code true} if this task is a leaf node
	 */
	IsLeaf() bool

	/**
	 * Indicates whether this task is the root node
	 *
	 * @return {@code true} if this task is the root node.
	 */
	IsRoot() bool

	/**
	 * Target leaf size, common to all tasks in a computation
	 *
	 * @return target leaf size.
	 */
	TargetSize() int

	/**
	 * Default target of leaf tasks for parallel decomposition.
	 * To allow load balancing, we over-partition, currently to approximately
	 * four tasks per processor, which enables others to help out
	 * if leaf tasks are uneven or some processors are otherwise busy.
	 *
	 * @return the default target size of leaf tasks
	 */
	GetLeafTarget() int

	/**
	 * Constructs a new node of type T whose parent is the receiver; must call
	 * the TODOTask(T, Spliterator) constructor with the receiver and the
	 * provided Spliterator.
	 *
	 * @param spliterator {@code Spliterator} describing the subtree rooted at
	 *        this node, obtained by splitting the parent {@code Spliterator}
	 * @return newly constructed child node
	 */
	MakeChild(spliterator spliterator.Spliterator) Task

	/**
	 * Computes the result associated with a leaf node.  Will be called by
	 * {@code compute()} and the result passed to @{code setLocalResult()}
	 *
	 * @return the computed result of a leaf node
	 */
	DoLeaf(ctx context.Context) Sink

	/**
	 * Retrieves a result previously stored with {@link #setLocalResult}
	 *
	 * @return local result for this node previously stored with
	 * {@link #setLocalResult}
	 */
	GetLocalResult() Sink

	/**
	 * Associates the result with the task, can be retrieved with
	 * {@link #GetLocalResult}
	 *
	 * @param localResult local result for this node
	 */
	SetLocalResult(localResult Sink)

	/**
	 * Decides whether or not to split a task further or compute it
	 * directly. If computing directly, calls {@code doLeaf} and pass
	 * the result to {@code setRawResult}. Otherwise splits off
	 * subtasks, forking one and continuing as the other.
	 *
	 * <p> The method is structured to conserve resources across a
	 * range of uses.  The loop continues with one of the child tasks
	 * when split, to avoid deep recursion. To cope with spliterators
	 * that may be systematically biased toward left-heavy or
	 * right-heavy splits, we alternate which child is forked versus
	 * continued in the loop.
	 */
	Compute(ctx context.Context)

	/**
	 * {@inheritDoc}
	 *
	 * @implNote
	 * Clears spliterator and children fields.  Overriders MUST call
	 * {@code super.onCompletion} as the last thing they do if they want these
	 * cleared.
	 */
	OnCompletion(caller Task)

	/**
	 * Returns whether this node is a "leftmost" node -- whether the path from
	 * the root to this node involves only traversing leftmost child links.  For
	 * a leaf node, this means it is the first leaf node in the encounter order.
	 *
	 * @return {@code true} if this node is a "leftmost" node
	 */
	IsLeftmostNode() bool

	/**
	 * Arranges to asynchronously execute this task in the pool the
	 * current task is running in, if applicable, or using the {@link
	 * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}.  While
	 * it is not necessarily enforced, it is a usage error to fork a
	 * task more than once unless it has completed and been
	 * reinitialized.  Subsequent modifications to the state of this
	 * task or any data it operates on are not necessarily
	 * consistently observable by any thread other than the one
	 * executing it unless preceded by a call to {@link #join} or
	 * related methods, or a call to {@link #isDone} returning {@code
	 * true}.
	 *
	 * @return {@code this}, to simplify usage
	 */
	Fork(ctx context.Context)

	/**
	 * Returns the result of the computation when it
	 * {@linkplain #isDone is done}.
	 * This method differs from {@link #get()} in that abnormal
	 * completion results in {@code RuntimeException} or {@code Error},
	 * not {@code ExecutionException}, and that interrupts of the
	 * calling thread do <em>not</em> cause the method to abruptly
	 * return by throwing {@code InterruptedException}.
	 *
	 * @return the computed result
	 */
	Join() Sink

	/**
	 * Commences performing this task, awaits its completion if
	 * necessary, and returns its result, or throws an (unchecked)
	 * {@code RuntimeException} or {@code Error} if the underlying
	 * computation did so.
	 *
	 * @return the computed result
	 */
	Invoke(ctx context.Context) Sink
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL