Documentation ¶
Index ¶
- Constants
- Variables
- func CopyInto(ctx context.Context, wrappedSink Sink, spliterator spliterator.Spliterator)
- type AbstractPipeline
- func (p *AbstractPipeline) Evaluate(ctx context.Context, terminalOp Operation, split spliterator.Spliterator) optional.Optional
- func (p *AbstractPipeline) GetTargetSize(sizeEstimate int) int
- func (p *AbstractPipeline) IsLinkedOrConsumed() bool
- func (p *AbstractPipeline) IsParallel() bool
- func (p *AbstractPipeline) SetLinkedOrConsumed(linkedOrConsumed bool)
- func (p *AbstractPipeline) SetParallel(parallel bool)
- func (p *AbstractPipeline) SetTargetSize(targetSize int)
- func (p *AbstractPipeline) SuggestTargetSize(sizeEstimate int) int
- type Operation
- type ShortCircuitTask
- type Sink
- type TODOOperation
- type TODOShortCircuitTask
- func (task *TODOShortCircuitTask) Cancel()
- func (task *TODOShortCircuitTask) CancelLaterNodes()
- func (task *TODOShortCircuitTask) Compute(ctx context.Context)
- func (task *TODOShortCircuitTask) GetLocalResult() Sink
- func (task *TODOShortCircuitTask) GetSharedResult() Sink
- func (task *TODOShortCircuitTask) SetLocalResult(localResult Sink)
- func (task TODOShortCircuitTask) SetRawResult(result Sink)
- func (task *TODOShortCircuitTask) SharedResult() *sharedResult
- func (task *TODOShortCircuitTask) ShortCircuit(result Sink)
- func (task *TODOShortCircuitTask) TaskCanceled() bool
- func (task *TODOShortCircuitTask) WithParent(parent ShortCircuitTask, spliterator spliterator.Spliterator) *TODOShortCircuitTask
- func (task *TODOShortCircuitTask) WithSpliterator(spliterator spliterator.Spliterator) *TODOShortCircuitTask
- type TODOSink
- type TODOTask
- func (task *TODOTask) Compute(ctx context.Context)
- func (task *TODOTask) DoLeaf(ctx context.Context) Sink
- func (task *TODOTask) Fork(ctx context.Context)
- func (task *TODOTask) GetLeafTarget() int
- func (task *TODOTask) GetLocalResult() Sink
- func (task *TODOTask) GetParent() Task
- func (task *TODOTask) GetSpliterator() spliterator.Spliterator
- func (task *TODOTask) Invoke(ctx context.Context) Sink
- func (task *TODOTask) IsLeaf() bool
- func (task *TODOTask) IsLeftmostNode() bool
- func (task *TODOTask) IsRoot() bool
- func (task *TODOTask) Join() Sink
- func (task *TODOTask) LeftChild() Task
- func (task *TODOTask) MakeChild(spliterator spliterator.Spliterator) Task
- func (task *TODOTask) OnCompletion(caller Task)
- func (task *TODOTask) RightChild() Task
- func (task *TODOTask) SetLeftChild(task_ Task)
- func (task *TODOTask) SetLocalResult(localResult Sink)
- func (task *TODOTask) SetRawResult(result Sink)
- func (task *TODOTask) SetRightChild(task_ Task)
- func (task *TODOTask) TargetSize() int
- func (task *TODOTask) WithParent(parent Task, spliterator spliterator.Spliterator) *TODOTask
- func (task *TODOTask) WithSpliterator(spliterator spliterator.Spliterator) *TODOTask
- type Task
Constants ¶
View Source
const ( MsgStreamLinked = "stream has already been operated upon or closed" MsgConsumed = "source already consumed or closed" )
Variables ¶
View Source
var (
ParallelTargetSize = runtime.GOMAXPROCS(-1)
)
Functions ¶
func CopyInto ¶
func CopyInto(ctx context.Context, wrappedSink Sink, spliterator spliterator.Spliterator)
Types ¶
type AbstractPipeline ¶
*
- Abstract base class for "pipeline" classes, which are the core
- implementations of the Stream interface and its primitive specializations.
- Manages construction and evaluation of stream pipelines. *
- <p>An {@code AbstractPipeline} represents an initial portion of a stream
- pipeline, encapsulating a stream source and zero or more intermediate
- operations. The individual {@code AbstractPipeline} objects are often
- referred to as <em>stages</em>, where each stage describes either the stream
- source or an intermediate operation. *
- <p>A concrete intermediate stage is generally built from an
- {@code AbstractPipeline}, a shape-specific pipeline class which extends it
- (e.g., {@code IntPipeline}) which is also abstract, and an operation-specific
- concrete class which extends that. {@code AbstractPipeline} contains most of
- the mechanics of evaluating the pipeline, and implements methods that will be
- used by the operation; the shape-specific classes add helper methods for
- dealing with collection of results into the appropriate shape-specific
- containers. *
- <p>After chaining a new intermediate operation, or executing a terminal
- operation, the stream is considered to be consumed, and no more intermediate
- or terminal operations are permitted on this stream instance. *
- @implNote
- <p>For sequential streams, and parallel streams without
- <a href="package-summary.html#StreamOps">stateful intermediate
- operations</a>, parallel streams, pipeline evaluation is done in a single
- pass that "jams" all the operations together. For parallel streams with
- stateful operations, execution is divided into segments, where each
- stateful operations marks the end of a segment, and each segment is
- evaluated separately and the result used as the input to the next
- segment. In all cases, the source data is not consumed until a terminal
- operation begins. *
- @param <E_IN> type of input elements
- @param <E_OUT> type of output elements
- @param <S> type of the subclass implementing {@code BaseStream}
- @since 1.8
func (*AbstractPipeline) Evaluate ¶
func (p *AbstractPipeline) Evaluate(ctx context.Context, terminalOp Operation, split spliterator.Spliterator) optional.Optional
*
- Evaluate the pipeline with a terminal operation to produce a result. *
- @param <R> the type of result
- @param terminalOp the terminal operation to be applied to the pipeline.
- @return the result
func (*AbstractPipeline) GetTargetSize ¶
func (p *AbstractPipeline) GetTargetSize(sizeEstimate int) int
*
- Returns the targetSize, initializing it via the supplied
- size estimate if not already initialized.
func (*AbstractPipeline) IsLinkedOrConsumed ¶
func (p *AbstractPipeline) IsLinkedOrConsumed() bool
func (*AbstractPipeline) IsParallel ¶
func (p *AbstractPipeline) IsParallel() bool
func (*AbstractPipeline) SetLinkedOrConsumed ¶
func (p *AbstractPipeline) SetLinkedOrConsumed(linkedOrConsumed bool)
func (*AbstractPipeline) SetParallel ¶
func (p *AbstractPipeline) SetParallel(parallel bool)
func (*AbstractPipeline) SetTargetSize ¶
func (p *AbstractPipeline) SetTargetSize(targetSize int)
func (*AbstractPipeline) SuggestTargetSize ¶
func (p *AbstractPipeline) SuggestTargetSize(sizeEstimate int) int
*
- Returns a suggested target leaf size based on the initial size estimate. *
- @return suggested target leaf size
type Operation ¶
type Operation interface { /** * Gets the stream flags of the operation. Terminal operations may set a * limited subset of the stream flags defined in {@link StreamOpFlag}, and * these flags are combined with the previously combined stream and * intermediate operation flags for the pipeline. * * @implSpec The default implementation returns zero. * * @return the stream flags for this operation * @see StreamOpFlag */ GetOpFlags() int /** * Performs a parallel evaluation of the operation using the specified * {@code PipelineHelper}, which describes the upstream intermediate * operations. * * @implSpec The default performs a sequential evaluation of the operation * using the specified {@code PipelineHelper}. * * @param helper the pipeline helper * @param spliterator the source spliterator * @return the result of the evaluation */ EvaluateParallel(ctx context.Context, spliterator spliterator.Spliterator) optional.Optional /** * Performs a sequential evaluation of the operation using the specified * {@code PipelineHelper}, which describes the upstream intermediate * operations. * * @param helper the pipeline helper * @param spliterator the source spliterator * @return the result of the evaluation */ EvaluateSequential(ctx context.Context, spliterator spliterator.Spliterator) optional.Optional }
*
- An operation in a stream pipeline that takes a stream as input and produces
- a result or side-effect. A {@code Op} has an input type and stream
- shape, and a result type. A {@code Op} also has a set of
- <em>operation flags</em> that describes how the operation processes elements
- of the stream (such as short-circuiting or respecting encounter order; see
- {@link StreamOpFlag}). *
- <p>A {@code Op} must provide a sequential and parallel implementation
- of the operation relative to a given stream source and set of intermediate
- operations. *
- @param <E_IN> the type of input elements
- @param <R> the type of the result
- @since 1.8
type ShortCircuitTask ¶
type ShortCircuitTask interface { Task GetSharedResult() Sink /** * Declares that a globally valid result has been found. If another task has * not already found the answer, the result is installed in * {@code sharedResult}. The {@code compute()} method will check * {@code sharedResult} before proceeding with computation, so this causes * the computation to terminate early. * * @param result the result found */ ShortCircuit(result Sink) /** * Mark this task as canceled */ Cancel() /** * Queries whether this task is canceled. A task is considered canceled if * it or any of its parents have been canceled. * * @return {@code true} if this task or any parent is canceled. */ TaskCanceled() bool /** * Cancels all tasks which succeed this one in the encounter order. This * includes canceling all the current task's right sibling, as well as the * later right siblings of all its parents. */ CancelLaterNodes() }
*
- Abstract class for fork-join tasks used to implement short-circuiting
- stream ops, which can produce a result without processing all elements of the
- stream. *
- @param <P_IN> type of input elements to the pipeline
- @param <P_OUT> type of output elements from the pipeline
- @param <R> type of intermediate result, may be different from operation
- result type
- @param <K> type of child and sibling tasks
- @since 1.8
type Sink ¶
type Sink interface { sink.Sink supplier.OptionalSupplier }
*
- A {@link Sink} which accumulates state as elements are accepted, and allows
- a result to be retrieved after the computation is finished. *
- @param <T> the type of elements to be accepted
- @param <R> the type of the result *
- @since 1.8
func WrapAndCopyInto ¶
func WrapAndCopyInto(ctx context.Context, sink Sink, spliterator spliterator.Spliterator) Sink
*
- Applies the pipeline stages described by this {@code PipelineHelper} to
- the provided {@code Spliterator} and send the results to the provided
- {@code Sink}. *
- @implSpec
- The implementation behaves as if:
- <pre>{@code
- copyInto(wrapSink(sink), spliterator);
- }</pre> *
- @param sink the {@code Sink} to receive the results
- @param spliterator the spliterator describing the source input to process
type TODOOperation ¶
func (*TODOOperation) EvaluateParallel ¶
func (op *TODOOperation) EvaluateParallel(ctx context.Context, spliterator spliterator.Spliterator) optional.Optional
func (*TODOOperation) EvaluateSequential ¶
func (op *TODOOperation) EvaluateSequential(ctx context.Context, spliterator spliterator.Spliterator) optional.Optional
func (*TODOOperation) GetOpFlags ¶
func (op *TODOOperation) GetOpFlags() int
type TODOShortCircuitTask ¶
type TODOShortCircuitTask struct { TODOTask // contains filtered or unexported fields }
func (*TODOShortCircuitTask) Cancel ¶
func (task *TODOShortCircuitTask) Cancel()
func (*TODOShortCircuitTask) CancelLaterNodes ¶
func (task *TODOShortCircuitTask) CancelLaterNodes()
func (*TODOShortCircuitTask) Compute ¶
func (task *TODOShortCircuitTask) Compute(ctx context.Context)
*
- Overrides TODOTask version to include checks for early
- exits while splitting or computing.
func (*TODOShortCircuitTask) GetLocalResult ¶
func (task *TODOShortCircuitTask) GetLocalResult() Sink
*
- Retrieves the local result for this task. If this task is the root,
- retrieves the shared result instead.
func (*TODOShortCircuitTask) GetSharedResult ¶
func (task *TODOShortCircuitTask) GetSharedResult() Sink
func (*TODOShortCircuitTask) SetLocalResult ¶
func (task *TODOShortCircuitTask) SetLocalResult(localResult Sink)
*
- Sets a local result for this task. If this task is the root, set the
- shared result instead (if not already set). *
- @param localResult The result to set for this task
func (TODOShortCircuitTask) SetRawResult ¶
func (task TODOShortCircuitTask) SetRawResult(result Sink)
*
- Does nothing; instead, subclasses should use
- {@link #setLocalResult(Object)}} to manage results. *
- @param result must be null, or an exception is thrown (this is a safety
- tripwire to detect when {@code setRawResult()} is being used
- instead of {@code setLocalResult()}
func (*TODOShortCircuitTask) SharedResult ¶
func (task *TODOShortCircuitTask) SharedResult() *sharedResult
func (*TODOShortCircuitTask) ShortCircuit ¶
func (task *TODOShortCircuitTask) ShortCircuit(result Sink)
func (*TODOShortCircuitTask) TaskCanceled ¶
func (task *TODOShortCircuitTask) TaskCanceled() bool
func (*TODOShortCircuitTask) WithParent ¶
func (task *TODOShortCircuitTask) WithParent(parent ShortCircuitTask, spliterator spliterator.Spliterator) *TODOShortCircuitTask
*
- Constructor for non-root nodes. *
- @param parent parent task in the computation tree
- @param spliterator the {@code Spliterator} for the portion of the
- computation tree described by this task
func (*TODOShortCircuitTask) WithSpliterator ¶
func (task *TODOShortCircuitTask) WithSpliterator(spliterator spliterator.Spliterator) *TODOShortCircuitTask
*
- Constructor for root tasks. *
- @param helper the {@code PipelineHelper} describing the stream pipeline
- up to this operation
- @param spliterator the {@code Spliterator} describing the source for this
- pipeline
type TODOTask ¶
func (*TODOTask) GetLeafTarget ¶
func (*TODOTask) GetLocalResult ¶
*
- Retrieves a result previously stored with {@link #setLocalResult} *
- @return local result for this node previously stored with
- {@link #setLocalResult}
func (*TODOTask) GetSpliterator ¶
func (task *TODOTask) GetSpliterator() spliterator.Spliterator
func (*TODOTask) IsLeftmostNode ¶
func (*TODOTask) MakeChild ¶
func (task *TODOTask) MakeChild(spliterator spliterator.Spliterator) Task
func (*TODOTask) OnCompletion ¶
func (*TODOTask) RightChild ¶
func (*TODOTask) SetLeftChild ¶
func (*TODOTask) SetLocalResult ¶
*
- Associates the result with the task, can be retrieved with
- {@link #GetLocalResult} *
- @param localResult local result for this node
func (*TODOTask) SetRawResult ¶
*
- Does nothing; instead, subclasses should use
- {@link #setLocalResult(Object)}} to manage results. *
- @param result must be null, or an exception is thrown (this is a safety
- tripwire to detect when {@code setRawResult()} is being used
- instead of {@code setLocalResult()}
func (*TODOTask) SetRightChild ¶
func (*TODOTask) TargetSize ¶
func (*TODOTask) WithParent ¶
func (task *TODOTask) WithParent(parent Task, spliterator spliterator.Spliterator) *TODOTask
*
- Constructor for non-root nodes. *
- @param parent this node's parent task
- @param spliterator {@code Spliterator} describing the subtree rooted at
- this node, obtained by splitting the parent {@code Spliterator}
func (*TODOTask) WithSpliterator ¶
func (task *TODOTask) WithSpliterator(spliterator spliterator.Spliterator) *TODOTask
*
- Constructor for root nodes. *
- @param helper The {@code PipelineHelper} describing the stream pipeline
- up to this operation
- @param spliterator The {@code Spliterator} describing the source for this
- pipeline
type Task ¶
type Task interface { GetSpliterator() spliterator.Spliterator /** * Returns the parent of this task, or null if this task is the root * * @return the parent of this task, or null if this task is the root */ GetParent() Task /** * The left child. * null if no children * if non-null rightChild is non-null * * @return the right child */ LeftChild() Task SetLeftChild(task Task) /** * The right child. * null if no children * if non-null rightChild is non-null * * @return the right child */ RightChild() Task SetRightChild(task Task) /** * Indicates whether this task is a leaf node. (Only valid after * {@link #compute} has been called on this node). If the node is not a * leaf node, then children will be non-null and numChildren will be * positive. * * @return {@code true} if this task is a leaf node */ IsLeaf() bool /** * Indicates whether this task is the root node * * @return {@code true} if this task is the root node. */ IsRoot() bool /** * Target leaf size, common to all tasks in a computation * * @return target leaf size. */ TargetSize() int /** * Default target of leaf tasks for parallel decomposition. * To allow load balancing, we over-partition, currently to approximately * four tasks per processor, which enables others to help out * if leaf tasks are uneven or some processors are otherwise busy. * * @return the default target size of leaf tasks */ GetLeafTarget() int /** * Constructs a new node of type T whose parent is the receiver; must call * the TODOTask(T, Spliterator) constructor with the receiver and the * provided Spliterator. * * @param spliterator {@code Spliterator} describing the subtree rooted at * this node, obtained by splitting the parent {@code Spliterator} * @return newly constructed child node */ MakeChild(spliterator spliterator.Spliterator) Task /** * Computes the result associated with a leaf node. Will be called by * {@code compute()} and the result passed to @{code setLocalResult()} * * @return the computed result of a leaf node */ DoLeaf(ctx context.Context) Sink /** * Retrieves a result previously stored with {@link #setLocalResult} * * @return local result for this node previously stored with * {@link #setLocalResult} */ GetLocalResult() Sink /** * Associates the result with the task, can be retrieved with * {@link #GetLocalResult} * * @param localResult local result for this node */ SetLocalResult(localResult Sink) /** * Decides whether or not to split a task further or compute it * directly. If computing directly, calls {@code doLeaf} and pass * the result to {@code setRawResult}. Otherwise splits off * subtasks, forking one and continuing as the other. * * <p> The method is structured to conserve resources across a * range of uses. The loop continues with one of the child tasks * when split, to avoid deep recursion. To cope with spliterators * that may be systematically biased toward left-heavy or * right-heavy splits, we alternate which child is forked versus * continued in the loop. */ Compute(ctx context.Context) /** * {@inheritDoc} * * @implNote * Clears spliterator and children fields. Overriders MUST call * {@code super.onCompletion} as the last thing they do if they want these * cleared. */ OnCompletion(caller Task) /** * Returns whether this node is a "leftmost" node -- whether the path from * the root to this node involves only traversing leftmost child links. For * a leaf node, this means it is the first leaf node in the encounter order. * * @return {@code true} if this node is a "leftmost" node */ IsLeftmostNode() bool /** * Arranges to asynchronously execute this task in the pool the * current task is running in, if applicable, or using the {@link * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While * it is not necessarily enforced, it is a usage error to fork a * task more than once unless it has completed and been * reinitialized. Subsequent modifications to the state of this * task or any data it operates on are not necessarily * consistently observable by any thread other than the one * executing it unless preceded by a call to {@link #join} or * related methods, or a call to {@link #isDone} returning {@code * true}. * * @return {@code this}, to simplify usage */ Fork(ctx context.Context) /** * Returns the result of the computation when it * {@linkplain #isDone is done}. * This method differs from {@link #get()} in that abnormal * completion results in {@code RuntimeException} or {@code Error}, * not {@code ExecutionException}, and that interrupts of the * calling thread do <em>not</em> cause the method to abruptly * return by throwing {@code InterruptedException}. * * @return the computed result */ Join() Sink /** * Commences performing this task, awaits its completion if * necessary, and returns its result, or throws an (unchecked) * {@code RuntimeException} or {@code Error} if the underlying * computation did so. * * @return the computed result */ Invoke(ctx context.Context) Sink }
Click to show internal directories.
Click to hide internal directories.