pipe

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2018 License: MIT Imports: 4 Imported by: 0

README

pipe/s

A pipers bag - generic functions to gain concurrency - batteries included :-)

Software License Go Report Card Build Status GoDoc

    go get -u github.com/GoLangsam/pipe

Please feel free and encouraged to suggest, improve, comment or ask - You'll be welcome!

Overview

  • an evolution:

    • sss a naive approach - as seen in popular slides, talks and blogs.
    • ss a better way to code it
  • the essence

  • toys and tests

  • notes and explanations

    • readme contains further background documentation
  • internals

  • extended

    • xxl demand-driven channel - lazy evaluation
    • xxs supply-driven counter-part
    • xxsl the super luxury version has both: demand- and supply-driven channels

sss - simply super stupid small

ss - still simply small

s - smart & useful - batteries included

Batteries

examples

internal


Think deep - code happy - be simple - see clear :-)

Support on Beerpay

Hey dude! Help me out for a couple of 🍻!

Beerpay Beerpay

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AnyChan added in v0.1.2

func AnyChan(inp ...Any) (out <-chan Any)

AnyChan returns a channel to receive all inputs before close.

func AnyChanFuncErr added in v0.1.2

func AnyChanFuncErr(gen func() (Any, error)) (out <-chan Any)

AnyChanFuncErr returns a channel to receive all results of generator `gen` until `err != nil` before close.

func AnyChanFuncNok added in v0.1.2

func AnyChanFuncNok(gen func() (Any, bool)) (out <-chan Any)

AnyChanFuncNok returns a channel to receive all results of generator `gen` until `!ok` before close.

func AnyChanSlice added in v0.1.2

func AnyChanSlice(inp ...[]Any) (out <-chan Any)

AnyChanSlice returns a channel to receive all inputs before close.

func AnyDaisyChaiN added in v0.1.2

func AnyDaisyChaiN(inp chan Any, somany int,
	procs ...func(into chan<- Any, from <-chan Any),
) (
	out chan Any)

AnyDaisyChaiN returns a channel to receive all inp after having passed `somany` times thru the process(es) (`from` right `into` left) before close.

Note: If `somany` is less than 1 or no `tubes` are provided, `out` shall receive elements from `inp` unaltered (as a convenience), thus making null values useful.

Note: AnyDaisyChaiN(inp, 1, procs) <==> AnyDaisyChain(inp, procs)

func AnyDaisyChain added in v0.1.2

func AnyDaisyChain(inp chan Any,
	procs ...func(into chan<- Any, from <-chan Any),
) (
	out chan Any)

AnyDaisyChain returns a channel to receive all inp after having passed thru the process(es) (`from` right `into` left) before close.

Note: If no `tubes` are provided, `out` shall receive elements from `inp` unaltered (as a convenience), thus making a null value useful.

func AnyDone added in v0.1.2

func AnyDone(inp <-chan Any) (done <-chan struct{})

AnyDone returns a channel to receive one signal before close after `inp` has been drained.

func AnyDoneFunc added in v0.1.2

func AnyDoneFunc(inp <-chan Any, act func(a Any)) (done <-chan struct{})

AnyDoneFunc returns a channel to receive one signal after `act` has been applied to every `inp` before close.

func AnyDoneSlice added in v0.1.2

func AnyDoneSlice(inp <-chan Any) (done <-chan []Any)

AnyDoneSlice returns a channel to receive a slice with every Any received on `inp` before close.

Note: Unlike AnyDone, DoneAnySlice sends the fully accumulated slice, not just an event, once upon close of inp.

func AnyDoneWait added in v0.1.2

func AnyDoneWait(inp chan<- Any, wg AnyWaiter) (done <-chan struct{})

AnyDoneWait returns a channel to receive one signal after wg.Wait() has returned and inp has been closed before close.

Note: Use only *after* You've started flooding the facilities.

func AnyFan2 added in v0.1.2

func AnyFan2(ori <-chan Any, inp ...Any) (out <-chan Any)

AnyFan2 returns a channel to receive everything from the given original channel `ori` as well as all inputs before close.

func AnyFan2Chan added in v0.1.2

func AnyFan2Chan(ori <-chan Any, inp <-chan Any) (out <-chan Any)

AnyFan2Chan returns a channel to receive everything from the given original channel `ori` as well as from the the input channel `inp` before close. Note: AnyFan2Chan is nothing but AnyFanIn2

func AnyFan2FuncErr added in v0.1.2

func AnyFan2FuncErr(ori <-chan Any, gen func() (Any, error)) (out <-chan Any)

AnyFan2FuncErr returns a channel to receive everything from the given original channel `ori` as well as all results of generator `gen` until `err != nil` before close.

func AnyFan2FuncNok added in v0.1.2

func AnyFan2FuncNok(ori <-chan Any, gen func() (Any, bool)) (out <-chan Any)

AnyFan2FuncNok returns a channel to receive everything from the given original channel `ori` as well as all results of generator `gen` until `!ok` before close.

func AnyFan2Slice added in v0.1.2

func AnyFan2Slice(ori <-chan Any, inp ...[]Any) (out <-chan Any)

AnyFan2Slice returns a channel to receive everything from the given original channel `ori` as well as all inputs before close.

func AnyFanIn added in v0.1.2

func AnyFanIn(inps ...<-chan Any) (out <-chan Any)

AnyFanIn returns a channel to receive all inputs arriving on variadic inps before close.

Ref: https://blog.golang.org/pipelines
Ref: https://github.com/QuentinPerez/go-stuff/channel/Fan-out-Fan-in/main.go

func AnyFanIn2 added in v0.1.2

func AnyFanIn2(inp1, inp2 <-chan Any) (out <-chan Any)

AnyFanIn2 returns a channel to receive all to receive all from both `inp1` and `inp2` before close.

func AnyFanOut added in v0.1.2

func AnyFanOut(inp <-chan Any, size int) (outS [](<-chan Any))

AnyFanOut returns a slice (of size = size) of channels each of which shall receive any inp before close.

func AnyFini added in v0.1.2

func AnyFini() func(inp <-chan Any) (done <-chan struct{})

AnyFini returns a closure around `AnyDone(_)`.

func AnyFiniFunc added in v0.1.2

func AnyFiniFunc(act func(a Any)) func(inp <-chan Any) (done <-chan struct{})

AnyFiniFunc returns a closure around `AnyDoneFunc(_, act)`.

func AnyFiniSlice added in v0.1.2

func AnyFiniSlice() func(inp <-chan Any) (done <-chan []Any)

AnyFiniSlice returns a closure around `AnyDoneSlice(_)`.

func AnyFiniWait added in v0.1.2

func AnyFiniWait(wg AnyWaiter) func(inp chan<- Any) (done <-chan struct{})

AnyFiniWait returns a closure around `DoneAnyWait(_, wg)`.

func AnyFork added in v0.1.2

func AnyFork(inp <-chan Any) (out1, out2 <-chan Any)

AnyFork returns two channels either of which is to receive every result of inp before close.

func AnyForkSeen added in v0.1.2

func AnyForkSeen(inp <-chan Any) (new, old <-chan Any)

AnyForkSeen returns two channels, `new` and `old`, where `new` is to receive all `inp` not been seen before and `old` all `inp` seen before (internally growing a `sync.Map` to discriminate) until close.

func AnyForkSeenAttr added in v0.1.2

func AnyForkSeenAttr(inp <-chan Any, attr func(a Any) interface{}) (new, old <-chan Any)

AnyForkSeenAttr returns two channels, `new` and `old`, where `new` is to receive all `inp` whose attribute `attr` has not been seen before and `old` all `inp` seen before (internally growing a `sync.Map` to discriminate) until close.

func AnyJoin added in v0.1.2

func AnyJoin(out chan<- Any, inp ...Any) (done <-chan struct{})

AnyJoin sends inputs on the given out channel and returns a done channel to receive one signal when inp has been drained

func AnyJoinChan added in v0.1.2

func AnyJoinChan(out chan<- Any, inp <-chan Any) (done <-chan struct{})

AnyJoinChan sends inputs on the given out channel and returns a done channel to receive one signal when inp has been drained

func AnyJoinSlice added in v0.1.2

func AnyJoinSlice(out chan<- Any, inp ...[]Any) (done <-chan struct{})

AnyJoinSlice sends inputs on the given out channel and returns a done channel to receive one signal when inp has been drained

func AnyMakeChan added in v0.1.2

func AnyMakeChan() (out chan Any)

AnyMakeChan returns a new open channel (simply a 'chan Any' that is). Note: No 'Any-producer' is launched here yet! (as is in all the other functions).

This is useful to easily create corresponding variables such as:

var myAnyPipelineStartsHere := AnyMakeChan() // ... lot's of code to design and build Your favourite "myAnyWorkflowPipeline"

// ...
// ... *before* You start pouring data into it, e.g. simply via:
for drop := range water {

myAnyPipelineStartsHere <- drop

}

close(myAnyPipelineStartsHere)

Hint: especially helpful, if Your piping library operates on some hidden (non-exported) type
(or on a type imported from elsewhere - and You don't want/need or should(!) have to care.)

Note: as always (except for AnyPipeBuffer) the channel is unbuffered.

func AnyMerge added in v0.1.2

func AnyMerge(less func(i, j Any) bool, inps ...<-chan Any) (out <-chan Any)

AnyMerge returns a channel to receive all inputs sorted and free of duplicates. Each input channel needs to be sorted ascending and free of duplicates. The passed binary boolean function `less` defines the applicable order.

Note: If no inputs are given, a closed channel is returned.

func AnyPair added in v0.1.2

func AnyPair(inp <-chan Any) (out1, out2 <-chan Any)

AnyPair returns a pair of channels to receive every result of inp before close.

Note: Yes, it is a VERY simple fanout - but sometimes all You need.

func AnyPipeAdjust added in v0.1.2

func AnyPipeAdjust(inp <-chan Any, sizes ...int) (out <-chan Any)

AnyPipeAdjust returns a channel to receive all `inp` buffered by a AnySendProxy process before close.

func AnyPipeBuffered added in v0.1.2

func AnyPipeBuffered(inp <-chan Any, cap int) (out <-chan Any)

AnyPipeBuffer returns a buffered channel with capacity `cap` to receive all `inp` before close.

func AnyPipeDone added in v0.1.2

func AnyPipeDone(inp <-chan Any) (out <-chan Any, done <-chan struct{})

AnyPipeDone returns a channel to receive every `inp` before close and a channel to signal this closing.

func AnyPipeEnter added in v0.1.2

func AnyPipeEnter(inp <-chan Any, wg AnyWaiter) (out <-chan Any)

AnyPipeEnter returns a channel to receive all `inp` and registers throughput as arrival on the given `sync.WaitGroup` until close.

func AnyPipeFunc added in v0.1.2

func AnyPipeFunc(inp <-chan Any, act func(a Any) Any) (out <-chan Any)

AnyPipeFunc returns a channel to receive every result of action `act` applied to `inp` before close. Note: it 'could' be PipeAnyMap for functional people, but 'map' has a very different meaning in go lang.

func AnyPipeLeave added in v0.1.2

func AnyPipeLeave(inp <-chan Any, wg AnyWaiter) (out <-chan Any)

AnyPipeLeave returns a channel to receive all `inp` and registers throughput as departure on the given `sync.WaitGroup` until close.

func AnyPipeSeen added in v0.1.2

func AnyPipeSeen(inp <-chan Any) (out <-chan Any)

AnyPipeSeen returns a channel to receive all `inp` not been seen before while silently dropping everything seen before (internally growing a `sync.Map` to discriminate) until close. Note: AnyPipeFilterNotSeenYet might be a better name, but is fairly long.

func AnyPipeSeenAttr added in v0.1.2

func AnyPipeSeenAttr(inp <-chan Any, attr func(a Any) interface{}) (out <-chan Any)

AnyPipeSeenAttr returns a channel to receive all `inp` whose attribute `attr` has not been seen before while silently dropping everything seen before (internally growing a `sync.Map` to discriminate) until close. Note: AnyPipeFilterAttrNotSeenYet might be a better name, but is fairly long.

func AnyPlug added in v0.1.2

func AnyPlug(inp <-chan Any, stop <-chan struct{}) (out <-chan Any, done <-chan struct{})

AnyPlug returns a channel to receive every `inp` before close and a channel to signal this closing. Upon receipt of a stop signal, output is immediately closed, and for graceful termination any remaining input is drained before done is signalled.

func AnyPlugAfter added in v0.1.2

func AnyPlugAfter(inp <-chan Any, after <-chan time.Time) (out <-chan Any, done <-chan struct{})

AnyPlugAfter returns a channel to receive every `inp` before close and a channel to signal this closing. Upon receipt of a time signal (e.g. from `time.After(...)`), output is immediately closed, and for graceful termination any remaining input is drained before done is signalled.

func AnySame added in v0.1.2

func AnySame(same func(a, b Any) bool, inp1, inp2 <-chan Any) (out <-chan bool)

AnySame reads values from two channels in lockstep and iff they have the same contents then `true` is sent on the returned bool channel before close.

func AnySendProxy added in v0.1.2

func AnySendProxy(out chan<- Any, sizes ...int) chan<- Any

AnySendProxy returns a channel to serve as a sending proxy to 'out'. Uses a goroutine to receive values from 'out' and store them in an expanding buffer, so that sending to 'out' never blocks.

Note: the expanding buffer is implemented via "container/ring"

Note: AnySendProxy is kept for the Sieve example and other dynamic use to be discovered even so it does not fit the pipe tube pattern as AnyPipeAdjust does.

func AnyStrew added in v0.1.2

func AnyStrew(inp <-chan Any, size int) (outS [](<-chan Any))

AnyStrew returns a slice (of size = size) of channels one of which shall receive each inp before close.

func AnyTubeAdjust added in v0.1.2

func AnyTubeAdjust(sizes ...int) (tube func(inp <-chan Any) (out <-chan Any))

AnyTubeAdjust returns a closure around AnyPipeAdjust (_, sizes ...int).

func AnyTubeBuffered added in v0.1.2

func AnyTubeBuffered(cap int) (tube func(inp <-chan Any) (out <-chan Any))

AnyTubeBuffered returns a closure around PipeAnyBuffer (_, cap).

func AnyTubeEnter added in v0.1.2

func AnyTubeEnter(wg AnyWaiter) (tube func(inp <-chan Any) (out <-chan Any))

AnyTubeEnter returns a closure around AnyPipeEnter (_, wg) registering throughput on the given `sync.WaitGroup` as arrival.

func AnyTubeFunc added in v0.1.2

func AnyTubeFunc(act func(a Any) Any) (tube func(inp <-chan Any) (out <-chan Any))

AnyTubeFunc returns a closure around PipeAnyFunc (_, act).

func AnyTubeLeave added in v0.1.2

func AnyTubeLeave(wg AnyWaiter) (tube func(inp <-chan Any) (out <-chan Any))

AnyTubeLeave returns a closure around AnyPipeLeave (_, wg) registering throughput on the given `sync.WaitGroup` as departure.

func AnyTubeSeen added in v0.1.2

func AnyTubeSeen() (tube func(inp <-chan Any) (out <-chan Any))

AnyTubeSeen returns a closure around AnyPipeSeen() (silently dropping every Any seen before).

func AnyTubeSeenAttr added in v0.1.2

func AnyTubeSeenAttr(attr func(a Any) interface{}) (tube func(inp <-chan Any) (out <-chan Any))

AnyTubeSeenAttr returns a closure around AnyPipeSeenAttr() (silently dropping every Any whose attribute `attr` was seen before).

Types

type Any

type Any generic.Type

Any is the generic type flowing thru the pipe network.

type AnyWaiter

type AnyWaiter interface {
	Add(delta int)
	Done()
	Wait()
}

AnyWaiter - as implemented by `*sync.WaitGroup` - attends Flapdoors and keeps counting who enters and who leaves.

Use DoneAnyWait to learn about when the facilities are closed.

Note: You may also use Your provided `*sync.WaitGroup.Wait()` to know when to close the facilities. Just: DoneAnyWait is more convenient as it also closes the primary channel.

Just make sure to have _all_ entrances and exits attended, and `Wait()` only *after* You've started flooding the facilities.

type ProcAny

type ProcAny func(into chan<- Any, from <-chan Any)

ProcAny is the signature of the inner process of any linear pipe-network

Example: the identity core:

samesame := func(into chan<- Any, from <-chan Any) { into <- <-from } Note: type ProcAny is provided for documentation purpose only. The implementation uses the explicit function signature in order to avoid some genny-related issue.

Note: In https://talks.golang.org/2012/waza.slide#40

Rob Pike uses a ProcAny named `worker`.

Directories

Path Synopsis
examples
httpsyet/v0
Package httpsyet provides the configuration and execution for crawling a list of sites for links that can be updated to HTTPS.
Package httpsyet provides the configuration and execution for crawling a list of sites for links that can be updated to HTTPS.
httpsyet/v1
Package httpsyet provides the configuration and execution for crawling a list of sites for links that can be updated to HTTPS.
Package httpsyet provides the configuration and execution for crawling a list of sites for links that can be updated to HTTPS.
httpsyet/v2
Package httpsyet provides the configuration and execution for crawling a list of sites for links that can be updated to HTTPS.
Package httpsyet provides the configuration and execution for crawling a list of sites for links that can be updated to HTTPS.
httpsyet/v3
Package httpsyet provides the configuration and execution for crawling a list of sites for links that can be updated to HTTPS.
Package httpsyet provides the configuration and execution for crawling a list of sites for links that can be updated to HTTPS.
httpsyet/v4
Package httpsyet provides the configuration and execution for crawling a list of sites for links that can be updated to HTTPS.
Package httpsyet provides the configuration and execution for crawling a list of sites for links that can be updated to HTTPS.
internal
cmd/bundledotgo
Bundle creates a single-source-file version of a source package suitable for inclusion in a particular target package.
Bundle creates a single-source-file version of a source package suitable for inclusion in a particular target package.
s
xxl
xxs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL