fx

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2023 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrCanceled is the error returned when the context is canceled.
	ErrCanceled = context.Canceled
	// ErrTimeout is the error returned when the context's deadline passes.
	ErrTimeout = context.DeadlineExceeded
)

Functions

func DoWithRetry

func DoWithRetry(fn func() error, opts ...RetryOption) error

DoWithRetry runs fn, and retries if failed. Default to retry 3 times.

func DoWithTimeout

func DoWithTimeout(fn func() error, timeout time.Duration, opts ...DoOption) error

DoWithTimeout runs fn with timeout control.

func Parallel

func Parallel(fns ...func())

Parallel runs fns parallelly and waits for done.

Types

type DoOption

type DoOption func() context.Context

DoOption defines the method to customize a DoWithTimeout call.

func WithContext

func WithContext(ctx context.Context) DoOption

WithContext customizes a DoWithTimeout call with given ctx.

type FilterFunc

type FilterFunc func(item interface{}) bool

FilterFunc defines the method to filter a Stream.

type ForAllFunc

type ForAllFunc func(pipe <-chan interface{})

ForAllFunc defines the method to handle all elements in a Stream.

type ForEachFunc

type ForEachFunc func(item interface{})

ForEachFunc defines the method to handle each element in a Stream.

type GenerateFunc

type GenerateFunc func(source chan<- interface{})

GenerateFunc defines the method to send elements into a Stream.

type KeyFunc

type KeyFunc func(item interface{}) interface{}

KeyFunc defines the method to generate keys for the elements in a Stream.

type LessFunc

type LessFunc func(a, b interface{}) bool

LessFunc defines the method to compare the elements in a Stream.

type MapFunc

type MapFunc func(item interface{}) interface{}

MapFunc defines the method to map each element to another object in a Stream.

type Option

type Option func(opts *rxOptions)

Option defines the method to customize a Stream.

func UnlimitedWorkers

func UnlimitedWorkers() Option

UnlimitedWorkers lets the caller use as many workers as the tasks.

func WithWorkers

func WithWorkers(workers int) Option

WithWorkers lets the caller customize the concurrent workers.

type ParallelFunc

type ParallelFunc func(item interface{})

ParallelFunc defines the method to handle elements parallelly.

type ReduceFunc

type ReduceFunc func(pipe <-chan interface{}) (interface{}, error)

ReduceFunc defines the method to reduce all the elements in a Stream.

type RetryOption

type RetryOption func(*retryOptions)

RetryOption defines the method to customize DoWithRetry.

func WithRetry

func WithRetry(times int) RetryOption

WithRetry customize a DoWithRetry call with given retry times.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

A Stream is a stream that can be used to do stream processing.

func Concat

func Concat(s Stream, others ...Stream) Stream

Concat returns a concatenated Stream.

func From

func From(generate GenerateFunc) Stream

From constructs a Stream from the given GenerateFunc.

func Just

func Just(items ...interface{}) Stream

Just converts the given arbitrary items to a Stream.

func Range

func Range(source <-chan interface{}) Stream

Range converts the given channel to a Stream.

func (Stream) AllMach

func (s Stream) AllMach(predicate func(item interface{}) bool) bool

AllMach returns whether all elements of this stream match the provided predicate. May not evaluate the predicate on all elements if not necessary for determining the result. If the stream is empty then true is returned and the predicate is not evaluated.

func (Stream) AnyMach

func (s Stream) AnyMach(predicate func(item interface{}) bool) bool

AnyMach returns whether any elements of this stream match the provided predicate. May not evaluate the predicate on all elements if not necessary for determining the result. If the stream is empty then false is returned and the predicate is not evaluated.

func (Stream) Buffer

func (s Stream) Buffer(n int) Stream

Buffer buffers the items into a queue with size n. It can balance the producer and the consumer if their processing throughput don't match.

func (Stream) Concat

func (s Stream) Concat(others ...Stream) Stream

Concat returns a Stream that concatenated other streams

func (Stream) Count

func (s Stream) Count() (count int)

Count counts the number of elements in the result.

func (Stream) Distinct

func (s Stream) Distinct(fn KeyFunc) Stream

Distinct removes the duplicated items base on the given KeyFunc.

func (Stream) Done

func (s Stream) Done()

Done waits all upstreaming operations to be done.

func (Stream) Filter

func (s Stream) Filter(fn FilterFunc, opts ...Option) Stream

Filter filters the items by the given FilterFunc.

func (Stream) First

func (s Stream) First() interface{}

First returns the first item, nil if no items.

func (Stream) ForAll

func (s Stream) ForAll(fn ForAllFunc)

ForAll handles the streaming elements from the source and no later streams.

func (Stream) ForEach

func (s Stream) ForEach(fn ForEachFunc)

ForEach seals the Stream with the ForEachFunc on each item, no successive operations.

func (Stream) Group

func (s Stream) Group(fn KeyFunc) Stream

Group groups the elements into different groups based on their keys.

func (Stream) Head

func (s Stream) Head(n int64) Stream

Head returns the first n elements in p.

func (Stream) Last

func (s Stream) Last() (item interface{})

Last returns the last item, or nil if no items.

func (Stream) Map

func (s Stream) Map(fn MapFunc, opts ...Option) Stream

Map converts each item to another corresponding item, which means it's a 1:1 model.

func (Stream) Merge

func (s Stream) Merge() Stream

Merge merges all the items into a slice and generates a new stream.

func (Stream) NoneMatch

func (s Stream) NoneMatch(predicate func(item interface{}) bool) bool

NoneMatch returns whether all elements of this stream don't match the provided predicate. May not evaluate the predicate on all elements if not necessary for determining the result. If the stream is empty then true is returned and the predicate is not evaluated.

func (Stream) Parallel

func (s Stream) Parallel(fn ParallelFunc, opts ...Option)

Parallel applies the given ParallelFunc to each item concurrently with given number of workers.

func (Stream) Reduce

func (s Stream) Reduce(fn ReduceFunc) (interface{}, error)

Reduce is an utility method to let the caller deal with the underlying channel.

func (Stream) Reverse

func (s Stream) Reverse() Stream

Reverse reverses the elements in the stream.

func (Stream) Skip

func (s Stream) Skip(n int64) Stream

Skip returns a Stream that skips size elements.

func (Stream) Sort

func (s Stream) Sort(less LessFunc) Stream

Sort sorts the items from the underlying source.

func (Stream) Split

func (s Stream) Split(n int) Stream

Split splits the elements into chunk with size up to n, might be less than n on tailing elements.

func (Stream) Tail

func (s Stream) Tail(n int64) Stream

Tail returns the last n elements in p.

func (Stream) Walk

func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream

Walk lets the callers handle each item, the caller may write zero, one or more items base on the given item.

type WalkFunc

type WalkFunc func(item interface{}, pipe chan<- interface{})

WalkFunc defines the method to walk through all the elements in a Stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL