pipelines

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2022 License: Apache-2.0 Imports: 17 Imported by: 1

README

GoDoc Go Report Card License

gobase

Go packages that come from sterling devils to be used by multiple projects Mostly based on channels

Documentation

Overview

package chanbaedcontainer implements an ordered container with only channels as the API

The input channel is used to add things to the container. The output channel will contain the head of the container when read The delete channel is used to remove things from the container before the are read out of the output channel.

This uses Go 1.18 generics, Things must impement the Indexable interface:

has a method to return a comparable key

Things come in and go out the channels in order. Things can be removed while in the container by passing their key to the delete channel

Package udp implements a UDP socket component that uses go channels for sending and receiving UDP packets.

The main interface is two channels, one input channel and one output channel. Any Packet put onto the input channel will be sent out a UDP network socket. Any received UDP packets from the network socket will be placed onto the output channel.

The input channel is passed into this component so the caller can control the life time of the channel. It should be closed to cause the input channel processing routine to finish.

For the SERVER mode, the component will listen on the passed in port, the underling socket does not contain a destination address so it needs to be set in the Packet that is put onto the input channel.

For the CLIENT mode, the component will Dial the address passed in, the underling socket contains that address and the send will ignore any address set in the Packet. All outgoing Packets will be sent the the address passed in the New call.

New(port) will handle creating the waitgroup and input channel NewwithParams(...) can be give the caller more options

Example
package main

import (
	"fmt"
	"os"
	"sync"
	"time"

	"github.com/sterlingdevils/pipelines"
)

type DataHold struct {
	data []byte
}

func (d DataHold) Data() []byte {
	return d.data
}

func readOut(wg *sync.WaitGroup, o chan string) {
	defer wg.Done()
	count := 0
	for n := range o {
		count++
		fmt.Println(count)
		_ = n
	}
}

func main() {
	os.Chdir("/tmp")
	fd := pipelines.FileDump{}.New()
	ochan := make(chan string)
	fd.Outchan = &ochan

	var wg sync.WaitGroup

	wg.Add(1)
	go readOut(&wg, ochan)

	// Send a Packet
	fd.InChan() <- pipelines.Packet{DataSlice: []byte("Hello, World!")}
	fd.InChan() <- pipelines.Packet{DataSlice: []byte("Gimme Jimmy")}
	fd.InChan() <- pipelines.Packet{DataSlice: []byte("See what happens with special characters\nOn this line")}

	fd.InChan() <- DataHold{data: []byte("This is another type of input")}

	time.Sleep(1 * time.Second)

	fd.Close()
	close(ochan)

	wg.Wait()
}
Output:

1
2
3
4
Example (Testsend)
package main

import (
	"fmt"

	"github.com/sterlingdevils/pipelines"
)

type DataType string

type StringNode struct {
	data DataType
}

func (n StringNode) Size() int {
	return len(n.data)
}

func (n StringNode) Data() []byte {
	return []byte(n.data)
}

func main() {
	n := StringNode{data: "potatoes"}
	r := pipelines.RateLimiterPipe[StringNode]{}.New(4, n.Size())
	r.InChan() <- n
	t := <-r.OutChan()
	fmt.Println(t.data)
}
Output:

potatoes
Example (Testsend2)
package main

import (
	"fmt"

	"github.com/sterlingdevils/pipelines"
)

type DataType string

type StringNode struct {
	data DataType
}

func (n StringNode) Size() int {
	return len(n.data)
}

func (n StringNode) Data() []byte {
	return []byte(n.data)
}

func main() {
	n := StringNode{data: "potatoes"}
	r := pipelines.RateLimiterPipe[StringNode]{}.New(1, n.Size())
	r.InChan() <- n
	t := <-r.OutChan()
	r.InChan() <- n
	t = <-r.OutChan()
	fmt.Println(t.data)
}
Output:

potatoes

Index

Examples

Constants

View Source
const (
	RETRYTIME  = 13 * time.Second
	EXPIRETIME = 35 * time.Second
)
View Source
const (
	// SERVER is used to create a listen socket
	SERVER = ConnType(1)
	// CLIENT is used to create a connected socket (using Dial)
	CLIENT = ConnType(2)
)

Socket Connection type

View Source
const (
	CHANSIZE = 0
)
View Source
const (
	// Max size of a Packet Data slice
	MaxPacketSize = 65507
)

udp constants for the protocol

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncSkipPipe added in v0.1.1

type AsyncSkipPipe[T any] struct {
	// contains filtered or unexported fields
}

func (*AsyncSkipPipe[_]) Close added in v0.1.1

func (b *AsyncSkipPipe[_]) Close()

Close

func (AsyncSkipPipe[T]) InChan added in v0.1.1

func (b AsyncSkipPipe[T]) InChan() chan<- T

PipelineChan returns a R/W channel that is used for pipelining

func (AsyncSkipPipe[T]) New added in v0.1.1

func (b AsyncSkipPipe[T]) New() *AsyncSkipPipe[T]

New creates a new logger name is used to put unique label on each log

func (AsyncSkipPipe[T]) NewWithChannel added in v0.1.1

func (AsyncSkipPipe[T]) NewWithChannel(in chan T) *AsyncSkipPipe[T]

func (AsyncSkipPipe[T]) NewWithPipeline added in v0.1.1

func (b AsyncSkipPipe[T]) NewWithPipeline(p Pipeline[T]) *AsyncSkipPipe[T]

func (AsyncSkipPipe[T]) OutChan added in v0.1.1

func (b AsyncSkipPipe[T]) OutChan() <-chan T

PipelineChan returns a R/W channel that is used for pipelining

func (AsyncSkipPipe[T]) PipelineChan added in v0.1.1

func (b AsyncSkipPipe[T]) PipelineChan() chan T

PipelineChan returns a R/W channel that is used for pipelining

type BufferPipe added in v0.1.0

type BufferPipe[T any] struct {
	// contains filtered or unexported fields
}

func (*BufferPipe[_]) Close added in v0.1.0

func (b *BufferPipe[_]) Close()

Close

func (BufferPipe[T]) InChan added in v0.1.0

func (b BufferPipe[T]) InChan() chan<- T

InChan

func (BufferPipe[T]) New added in v0.1.0

func (BufferPipe[T]) New(size int) (*BufferPipe[T], error)

func (BufferPipe[T]) NewWithChannel added in v0.1.0

func (BufferPipe[T]) NewWithChannel(size int, in chan T) (*BufferPipe[T], error)

func (BufferPipe[T]) NewWithPipeline added in v0.1.0

func (b BufferPipe[T]) NewWithPipeline(size int, p Pipeline[T]) (*BufferPipe[T], error)

func (BufferPipe[T]) OutChan added in v0.1.0

func (b BufferPipe[T]) OutChan() <-chan T

OutChan

func (BufferPipe[T]) PipelineChan added in v0.1.0

func (b BufferPipe[T]) PipelineChan() chan T

PipelineChan returns a R/W channel that is used for pipelining

type Closer added in v0.0.6

type Closer interface {
	// Close is used to clear up any resources made by the component
	Close()
}

type ConnType added in v0.1.0

type ConnType int

ConnType are constants for UDP socket type

type ContainerPipe added in v0.1.0

type ContainerPipe[K comparable, T Keyer[K]] struct {
	// contains filtered or unexported fields
}
Example (Duptest)
package main

import (
	"fmt"

	"github.com/sterlingdevils/pipelines"
)

// Node does uses a pointer receiver on Key,  this is to test pointer things
type noder struct {
	key  int
	data string
}

func (n *noder) Key() int {
	return n.key
}

// readAndPrint is used to get num items from the output channel and display them
func readAndPrint(num int, c <-chan *noder) {
	for i := 0; i < num; i++ {
		n := <-c
		fmt.Printf("%v, %v\n", n.key, n.data)
	}
}

func main() {
	r := pipelines.ContainerPipe[int, *noder]{}.New()
	r.InChan() <- &noder{key: 1, data: "I don't care what it is"}
	// This should be dropped as a dup
	r.InChan() <- &noder{key: 1, data: "This is a test"}

	readAndPrint(1, r.OutChan())

	r.Close()
}
Output:

1, I don't care what it is
Example (Fullnopointers)

// This example will test if we dont pass pointer fully thru the container

package main

import (
	"fmt"

	"github.com/sterlingdevils/pipelines"
)

// Node2 does not use a pointer receiver on Key,  this is to test non-pointer things
type node2 struct {
	key int
}

func (n node2) Key() int {
	return n.key
}

func main() {
	// Notice the small difference in T, we are no longer a pointer to Node
	r := pipelines.ContainerPipe[int, node2]{}.New()

	ni := node2{key: 1}

	r.InChan() <- ni
	no := <-r.OutChan()

	// Ok, we have our input node and the output after it went thru the container
	// lets change the key on the input node and make sure the output is not changed
	ni.key = 2
	fmt.Println(no.key)

	r.Close()
}
Output:

1
Example (Fullpointers)

This example will test if we pass pointer fully thru the container

package main

import (
	"fmt"

	"github.com/sterlingdevils/pipelines"
)

// Node does uses a pointer receiver on Key,  this is to test pointer things
type noder struct {
	key  int
	data string
}

func (n *noder) Key() int {
	return n.key
}

func main() {
	// Notice that T is a pointer to a Node
	r := pipelines.ContainerPipe[int, *noder]{}.New()

	ni := &noder{key: 1, data: "I don't care what it is"}

	r.InChan() <- ni
	no := <-r.OutChan()

	// Ok, we have our input node and the output after it went thru the container
	// lets change the key on the input node and make sure it changed on the output
	ni.key = 2
	fmt.Println(no.key)

	r.Close()
}
Output:

2
Example (Testdeloffirst)
package main

import (
	"fmt"

	"github.com/sterlingdevils/pipelines"
)

// Node does uses a pointer receiver on Key,  this is to test pointer things
type noder struct {
	key  int
	data string
}

func (n *noder) Key() int {
	return n.key
}

// readAndPrint is used to get num items from the output channel and display them
func readAndPrint(num int, c <-chan *noder) {
	for i := 0; i < num; i++ {
		n := <-c
		fmt.Printf("%v, %v\n", n.key, n.data)
	}
}

func main() {
	r := pipelines.ContainerPipe[int, *noder]{}.New()
	r.InChan() <- &noder{key: 1, data: "I don't care what it is"}
	r.InChan() <- &noder{key: 2, data: "This is a test"}
	r.InChan() <- &noder{key: 3, data: "This is a test again"}
	r.DelChan() <- 1

	readAndPrint(2, r.OutChan())

	r.Close()
}
Output:

2, This is a test
3, This is a test again
Example (Testdelofsecond)
package main

import (
	"fmt"

	"github.com/sterlingdevils/pipelines"
)

// Node does uses a pointer receiver on Key,  this is to test pointer things
type noder struct {
	key  int
	data string
}

func (n *noder) Key() int {
	return n.key
}

// readAndPrint is used to get num items from the output channel and display them
func readAndPrint(num int, c <-chan *noder) {
	for i := 0; i < num; i++ {
		n := <-c
		fmt.Printf("%v, %v\n", n.key, n.data)
	}
}

func main() {
	r := pipelines.ContainerPipe[int, *noder]{}.New()
	r.InChan() <- &noder{key: 1, data: "I don't care what it is"}
	r.InChan() <- &noder{key: 2, data: "This is a test"}
	r.InChan() <- &noder{key: 3, data: "This is a test again"}
	r.DelChan() <- 2

	readAndPrint(2, r.OutChan())

	r.Close()
}
Output:

1, I don't care what it is
3, This is a test again
Example (TestdelonNotThere)
package main

import (
	"fmt"

	"github.com/sterlingdevils/pipelines"
)

// Node does uses a pointer receiver on Key,  this is to test pointer things
type noder struct {
	key  int
	data string
}

func (n *noder) Key() int {
	return n.key
}

// readAndPrint is used to get num items from the output channel and display them
func readAndPrint(num int, c <-chan *noder) {
	for i := 0; i < num; i++ {
		n := <-c
		fmt.Printf("%v, %v\n", n.key, n.data)
	}
}

func main() {
	r := pipelines.ContainerPipe[int, *noder]{}.New()
	r.InChan() <- &noder{key: 1, data: "I don't care what it is"}
	r.InChan() <- &noder{key: 2, data: "This is a test"}
	r.DelChan() <- 3

	readAndPrint(2, r.OutChan())

	r.Close()
}
Output:

1, I don't care what it is
2, This is a test
Example (Testtwoitemsinorder)
package main

import (
	"fmt"

	"github.com/sterlingdevils/pipelines"
)

// Node does uses a pointer receiver on Key,  this is to test pointer things
type noder struct {
	key  int
	data string
}

func (n *noder) Key() int {
	return n.key
}

// readAndPrint is used to get num items from the output channel and display them
func readAndPrint(num int, c <-chan *noder) {
	for i := 0; i < num; i++ {
		n := <-c
		fmt.Printf("%v, %v\n", n.key, n.data)
	}
}

func main() {
	r := pipelines.ContainerPipe[int, *noder]{}.New()
	r.InChan() <- &noder{key: 1, data: "I don't care what it is"}
	r.InChan() <- &noder{key: 2, data: "This is a test"}

	readAndPrint(2, r.OutChan())

	r.Close()
}
Output:

1, I don't care what it is
2, This is a test

func (ContainerPipe[_, _]) ApproxSize added in v0.1.0

func (c ContainerPipe[_, _]) ApproxSize() int32

ApproxSize returns something close to the number of items in the container, maybe. Only updated at the start of each mainloop

Example

Checks the ApproxSize that it returns something close

package main

import (
	"fmt"
	"time"

	"github.com/sterlingdevils/pipelines"
)

// Node does uses a pointer receiver on Key,  this is to test pointer things
type noder struct {
	key  int
	data string
}

func (n *noder) Key() int {
	return n.key
}

func main() {
	numwrite := 100
	r := pipelines.ContainerPipe[int, *noder]{}.New()

	s1 := r.ApproxSize()

	r.InChan() <- &noder{key: 1}
	s2 := r.ApproxSize()

	<-r.OutChan()
	time.Sleep(10 * time.Millisecond) // Have to wait a little for mainloop to cycle
	s3 := r.ApproxSize()

	for i := 0; i < numwrite; i++ {
		r.InChan() <- &noder{key: i}
	}
	time.Sleep(10 * time.Millisecond) // Have to wait a little for mainloop to cycle
	s4 := r.ApproxSize()

	fmt.Println(s1, s2, s3, s4)

	r.Close()
}
Output:

0 1 0 100

func (*ContainerPipe[_, _]) Close added in v0.1.0

func (c *ContainerPipe[_, _]) Close()

Close the ChanBasedContainer

Example

ExampleContainerPipe_Close

package main

import (
	"github.com/sterlingdevils/pipelines"
)

// Node2 does not use a pointer receiver on Key,  this is to test non-pointer things
type node2 struct {
	key int
}

func (n node2) Key() int {
	return n.key
}

func main() {
	r := pipelines.ContainerPipe[int, node2]{}.New()
	r.Close()
}
Output:

func (ContainerPipe[K, _]) DelChan added in v0.1.0

func (c ContainerPipe[K, _]) DelChan() chan<- K

DelChan

func (ContainerPipe[_, T]) InChan added in v0.1.0

func (c ContainerPipe[_, T]) InChan() chan<- T

InChan

Example

ExampleContainerPipe_InChan

package main

import (
	"github.com/sterlingdevils/pipelines"
)

// Node does uses a pointer receiver on Key,  this is to test pointer things
type noder struct {
	key  int
	data string
}

func (n *noder) Key() int {
	return n.key
}

func main() {
	r := pipelines.ContainerPipe[int, *noder]{}.New()
	r.InChan() <- &noder{key: 7, data: "This is a test"}
	r.Close()
}
Output:

func (ContainerPipe[K, T]) New added in v0.1.0

func (c ContainerPipe[K, T]) New() *ContainerPipe[K, T]

New returns a reference to a a container or error if there was a problem for performance T should be a pointer

Example
package main

import (
	"github.com/sterlingdevils/pipelines"
)

// Node2 does not use a pointer receiver on Key,  this is to test non-pointer things
type node2 struct {
	key int
}

func (n node2) Key() int {
	return n.key
}

func main() {
	_ = pipelines.ContainerPipe[int, node2]{}.New()
}
Output:

func (ContainerPipe[K, T]) NewWithChan added in v0.1.0

func (ContainerPipe[K, T]) NewWithChan(in chan T) *ContainerPipe[K, T]

func (ContainerPipe[K, T]) NewWithPipeline added in v0.1.0

func (c ContainerPipe[K, T]) NewWithPipeline(p Pipeline[T]) *ContainerPipe[K, T]

func (ContainerPipe[_, T]) OutChan added in v0.1.0

func (c ContainerPipe[_, T]) OutChan() <-chan T

OutChan

func (ContainerPipe[_, T]) PipelineChan added in v0.1.0

func (c ContainerPipe[_, T]) PipelineChan() chan T

PipelineChan returns a R/W channel that is used for pipelining

type ConverterPipe added in v0.1.0

type ConverterPipe[I any, O any] struct {
	// contains filtered or unexported fields
}
Example (Fixedoutput)
package main

import (
	"fmt"

	"github.com/sterlingdevils/pipelines"
)

// ignore the input and just return hello
func returnHello(i int) (string, error) {
	return "Hello", nil
}

func main() {
	cvt := pipelines.ConverterPipe[int, string]{}.New(returnHello)

	cvt.InChan() <- 5
	o := <-cvt.OutChan()

	fmt.Println(o)

}
Output:

Hello
Example (Numchar)
package main

import (
	"fmt"

	"github.com/sterlingdevils/pipelines"
)

func main() {
	cvt := pipelines.ConverterPipe[int, string]{}.New(
		func(i int) (string, error) {
			return fmt.Sprintf("%010d", i), nil
		})

	cvt.InChan() <- 5
	o := <-cvt.OutChan()

	fmt.Println(o)

}
Output:

0000000005

func (*ConverterPipe[_, _]) Close added in v0.1.0

func (c *ConverterPipe[_, _]) Close()

Close

func (ConverterPipe[I, O]) InChan added in v0.1.0

func (c ConverterPipe[I, O]) InChan() chan<- I

InChan

func (ConverterPipe[I, O]) New added in v0.1.0

func (c ConverterPipe[I, O]) New(fun func(I) (O, error)) *ConverterPipe[I, O]
Example
package main

import (
	"fmt"
	"reflect"
	"strconv"

	"github.com/sterlingdevils/pipelines"
)

func main() {
	cvt := pipelines.ConverterPipe[int, string]{}.New(
		func(i int) (string, error) {
			return strconv.Itoa(i), nil
		})

	cvt.InChan() <- 5
	o := <-cvt.OutChan()

	fmt.Println(o, reflect.TypeOf(o))

}
Output:

5 string

func (ConverterPipe[I, O]) NewWithChannel added in v0.1.0

func (ConverterPipe[I, O]) NewWithChannel(in chan I, fun func(I) (O, error)) *ConverterPipe[I, O]

func (ConverterPipe[I, O]) NewWithPipeline added in v0.1.0

func (c ConverterPipe[I, O]) NewWithPipeline(p Pipeline[I], fun func(I) (O, error)) *ConverterPipe[I, O]

func (ConverterPipe[I, O]) OutChan added in v0.1.0

func (c ConverterPipe[I, O]) OutChan() <-chan O

OutChan

func (ConverterPipe[_, O]) PipelineChan added in v0.1.0

func (c ConverterPipe[_, O]) PipelineChan() chan O

PipelineChan returns a R/W channel that is used for pipelining

type DataSizer added in v0.1.0

type DataSizer interface {
	Dataer
	Sizer
}

type Dataer added in v0.0.6

type Dataer interface {
	// Data returns a byte slice to the data it holds
	Data() []byte
}

type DirScan added in v0.1.0

type DirScan struct {
	Dir      string
	ScanTime time.Duration
	// contains filtered or unexported fields
}

DirScan holds a directry to scan and a Channel to put the filenames onto

func (*DirScan) Close added in v0.1.0

func (d *DirScan) Close()

Close will close the data channel

func (DirScan) New added in v0.1.0

func (DirScan) New(dir string, scantime time.Duration, chanSize int) (*DirScan, error)

New creates a new dir scanner and starts a scanning loop to send filenames to a channel Must pass a WaitGroup it as we create a go routine for the scanner As a writter we assume we own the channel we return, we will close it when our Close() is called

func (DirScan) OutChan added in v0.1.0

func (d DirScan) OutChan() <-chan string

----- Public Methods OutChan returns the output Channel as ReadOnly

func (DirScan) PipelineChan added in v0.1.0

func (d DirScan) PipelineChan() chan string

PipelineChan returns a R/W channel that is used for pipelining

type File added in v0.1.0

type File struct {
	Reference string
	// contains filtered or unexported fields
}

func (File) Data added in v0.1.0

func (f File) Data() []byte

type FileDump added in v0.1.0

type FileDump struct {
	Outchan *chan string

	Metricfunc func(name string, val int)
	// contains filtered or unexported fields
}

func (*FileDump) Close added in v0.1.0

func (b *FileDump) Close()

Close

func (FileDump) InChan added in v0.1.0

func (b FileDump) InChan() chan<- Dataer

InChan returns a write only channel that the incomming packets will be read from

func (FileDump) New added in v0.1.0

func (f FileDump) New() *FileDump

New creates a new FileDump

func (FileDump) NewWithChannel added in v0.1.0

func (FileDump) NewWithChannel(in chan Dataer) *FileDump

func (FileDump) NewWithPipeline added in v0.1.0

func (f FileDump) NewWithPipeline(p Pipeline[Dataer]) *FileDump

func (FileDump) SetMetric added in v0.1.0

func (b FileDump) SetMetric(name string, val int)

type FileNamer added in v0.1.0

type FileNamer interface {
	FileName() string
}

type FileNamerDataer added in v0.1.0

type FileNamerDataer interface {
	Dataer
	FileNamer
}

type FileReadPipe added in v0.1.0

type FileReadPipe struct {
	// contains filtered or unexported fields
}
Example (Fileconsume)
package main

import (
	"fmt"
	"log"
	"os"

	"github.com/sterlingdevils/pipelines"
)

func main() {
	err := os.WriteFile("/tmp/dat1", []byte("This is a test"), 0666)
	if err != nil {
		log.Fatalln(err)
	}

	fp := pipelines.FileReadPipe{}.New()

	fp.InChan() <- "/tmp/dat1"

	fmt.Println(string((<-fp.OutChan()).Data()))

	fp.Close()

}
Output:

This is a test

func (*FileReadPipe) Close added in v0.1.0

func (f *FileReadPipe) Close()

Close

func (*FileReadPipe) InChan added in v0.1.0

func (f *FileReadPipe) InChan() chan<- string

PipelineChan returns a R/W channel that is used for pipelining

func (FileReadPipe) New added in v0.1.0

func (f FileReadPipe) New() *FileReadPipe

New creates a new filereadpipe

func (FileReadPipe) NewWithChannel added in v0.1.0

func (FileReadPipe) NewWithChannel(in chan string) *FileReadPipe

func (FileReadPipe) NewWithPipeline added in v0.1.0

func (f FileReadPipe) NewWithPipeline(p Pipeline[string]) *FileReadPipe

func (*FileReadPipe) OutChan added in v0.1.0

func (f *FileReadPipe) OutChan() <-chan Dataer

PipelineChan returns a R/W channel that is used for pipelining

func (*FileReadPipe) PipelineChan added in v0.1.0

func (f *FileReadPipe) PipelineChan() chan Dataer

PipelineChan returns a R/W channel that is used for pipelining

type FileWriterPipe added in v0.1.0

type FileWriterPipe struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	"os"

	"github.com/sterlingdevils/pipelines"
)

type FileHolder struct {
	filename string
	data     []byte
}

func (f FileHolder) FileName() string {
	return f.filename
}

func (f FileHolder) Data() []byte {
	return f.data
}

type FileHolder2 struct {
	filename  string
	data      []byte
	Arbitrary []int
}

func (f FileHolder2) FileName() string {
	return f.filename
}

func (f FileHolder2) Data() []byte {
	return f.data
}

func main() {
	os.Chdir("/tmp")
	fd := pipelines.FileWriterPipe{}.New()

	// Send a file
	fd.InChan() <- FileHolder{filename: "a.out", data: []byte("This is one type of input")}

	// Send a file as another type
	fd.InChan() <- FileHolder2{filename: "b.out", data: []byte("This is another type of input")}

	fd.Close()
}
Output:

func (*FileWriterPipe) Close added in v0.1.0

func (b *FileWriterPipe) Close()

Close

func (FileWriterPipe) InChan added in v0.1.0

func (b FileWriterPipe) InChan() chan<- FileNamerDataer

InChan returns a write only channel that the incomming packets will be read from

func (FileWriterPipe) New added in v0.1.0

func (f FileWriterPipe) New() *FileWriterPipe

New creates a new FileWriterPipe

func (FileWriterPipe) NewWithChannel added in v0.1.0

func (FileWriterPipe) NewWithChannel(in chan FileNamerDataer) *FileWriterPipe

func (FileWriterPipe) NewWithPipeline added in v0.1.0

func (f FileWriterPipe) NewWithPipeline(p Pipeline[FileNamerDataer]) *FileWriterPipe

type GeneratorPipe added in v0.1.0

type GeneratorPipe[T any] struct {
	Metricfunc func(gobase.MetricsProto)
	// contains filtered or unexported fields
}
Example (Increment)
package main

import (
	"fmt"

	"github.com/sterlingdevils/pipelines"
)

func main() {
	i := 0
	gen := pipelines.GeneratorPipe[int]{}.New(
		func() int {
			i++
			return i
		})

	for j := 0; j < 10; j++ {
		o := <-gen.OutChan()
		fmt.Println(o)
	}

	gen.Close()
}
Output:

1
2
3
4
5
6
7
8
9
10

func (*GeneratorPipe[T]) Close added in v0.1.0

func (g *GeneratorPipe[T]) Close()

Close

func (GeneratorPipe[T]) New added in v0.1.0

func (GeneratorPipe[T]) New(fun func() T) *GeneratorPipe[T]

func (GeneratorPipe[T]) OutChan added in v0.1.0

func (g GeneratorPipe[T]) OutChan() <-chan T

OutChan

func (GeneratorPipe[T]) PipelineChan added in v0.1.0

func (g GeneratorPipe[T]) PipelineChan() chan T

PipelineChan returns a R/W channel that is used for pipelining

type KeyablePacket added in v0.1.0

type KeyablePacket struct {
	// Addr holds a UDP address (with port) for the packet
	// will be ignored if UDP is created in CLIENT mode
	Addr net.UDPAddr
	// Data contains the data
	DataSlice []byte
}

KeyablePacket holds a UDP address and Data from the UDP The channel types for the input and output channel are of this type

func (KeyablePacket) Address added in v0.1.0

func (p KeyablePacket) Address() net.UDPAddr

func (KeyablePacket) Data added in v0.1.0

func (p KeyablePacket) Data() []byte

func (KeyablePacket) Key added in v0.1.0

func (p KeyablePacket) Key() uint64

func (KeyablePacket) Size added in v0.1.0

func (p KeyablePacket) Size() int

type Keyer added in v0.0.6

type Keyer[K comparable] interface {
	Key() K
}

type LogPipe added in v0.1.0

type LogPipe[T any] struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	"fmt"

	"github.com/sterlingdevils/pipelines"
)

func main() {
	lg := pipelines.LogPipe[int]{}.New("example")
	lg.InChan() <- 5
	fmt.Println(<-lg.OutChan())
}
Output:

5

func (*LogPipe[_]) Close added in v0.1.0

func (b *LogPipe[_]) Close()

Close

func (LogPipe[T]) InChan added in v0.1.0

func (b LogPipe[T]) InChan() chan<- T

PipelineChan returns a R/W channel that is used for pipelining

func (LogPipe[T]) New added in v0.1.0

func (b LogPipe[T]) New(name string) *LogPipe[T]

New creates a new logger name is used to put unique label on each log

func (LogPipe[T]) NewWithChannel added in v0.1.0

func (LogPipe[T]) NewWithChannel(name string, in chan T) *LogPipe[T]

func (LogPipe[T]) NewWithPipeline added in v0.1.0

func (b LogPipe[T]) NewWithPipeline(name string, p Pipeline[T]) *LogPipe[T]

func (LogPipe[T]) OutChan added in v0.1.0

func (b LogPipe[T]) OutChan() <-chan T

PipelineChan returns a R/W channel that is used for pipelining

func (LogPipe[T]) PipelineChan added in v0.1.0

func (b LogPipe[T]) PipelineChan() chan T

PipelineChan returns a R/W channel that is used for pipelining

type NullConsumePipe added in v0.1.0

type NullConsumePipe[T any] struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	"fmt"

	"github.com/sterlingdevils/pipelines"
)

func main() {
	nc := pipelines.NullConsumePipe[any]{}.New()

	inchan := nc.InChan()

	inchan <- 1
	inchan <- 2
	inchan <- 3
	inchan <- 4
	inchan <- 5

	nc.Close()

	fmt.Println("reached end")

}
Output:

reached end

func (*NullConsumePipe[_]) Close added in v0.1.0

func (b *NullConsumePipe[_]) Close()

Close

func (NullConsumePipe[T]) InChan added in v0.1.0

func (b NullConsumePipe[T]) InChan() chan<- T

PipelineChan returns a R/W channel that is used for pipelining

func (NullConsumePipe[T]) New added in v0.1.0

func (n NullConsumePipe[T]) New() *NullConsumePipe[T]

New creates a new logger name is used to put unique label on each log

func (NullConsumePipe[T]) NewWithChannel added in v0.1.0

func (NullConsumePipe[T]) NewWithChannel(in chan T) *NullConsumePipe[T]

func (NullConsumePipe[T]) NewWithPipeline added in v0.1.0

func (n NullConsumePipe[T]) NewWithPipeline(p Pipeline[T]) *NullConsumePipe[T]

type OnlyOncePipe added in v0.1.1

type OnlyOncePipe[T comparable] struct {
	// contains filtered or unexported fields
}

func (*OnlyOncePipe[_]) Close added in v0.1.1

func (b *OnlyOncePipe[_]) Close()

Close

func (OnlyOncePipe[T]) InChan added in v0.1.1

func (b OnlyOncePipe[T]) InChan() chan<- T

PipelineChan returns a R/W channel that is used for pipelining

func (OnlyOncePipe[T]) New added in v0.1.1

func (b OnlyOncePipe[T]) New(gc time.Duration, fr time.Duration) *OnlyOncePipe[T]

New creates a new logger name is used to put unique label on each log

func (OnlyOncePipe[T]) NewWithChannel added in v0.1.1

func (OnlyOncePipe[T]) NewWithChannel(gc time.Duration, fr time.Duration, in chan T) *OnlyOncePipe[T]

func (OnlyOncePipe[T]) NewWithPipeline added in v0.1.1

func (b OnlyOncePipe[T]) NewWithPipeline(gc time.Duration, fr time.Duration, p Pipeline[T]) *OnlyOncePipe[T]

func (OnlyOncePipe[T]) OutChan added in v0.1.1

func (b OnlyOncePipe[T]) OutChan() <-chan T

PipelineChan returns a R/W channel that is used for pipelining

func (OnlyOncePipe[T]) PipelineChan added in v0.1.1

func (b OnlyOncePipe[T]) PipelineChan() chan T

PipelineChan returns a R/W channel that is used for pipelining

type Packet added in v0.1.0

type Packet struct {
	// Addr holds a UDP address (with port) for the packet
	// will be ignored if UDP is created in CLIENT mode
	Addr net.UDPAddr
	// Data contains the data
	DataSlice []byte
}

Packet holds a UDP address and Data from the UDP The channel types for the input and output channel are of this type

func (Packet) Address added in v0.1.0

func (p Packet) Address() net.UDPAddr

func (Packet) Data added in v0.1.0

func (p Packet) Data() []byte

func (Packet) Size added in v0.1.0

func (p Packet) Size() int

type Packetable added in v0.1.0

type Packetable interface {
	Address() net.UDPAddr
	Data() []byte
}

type Pipeline added in v0.0.6

type Pipeline[T any] interface {
	PipelineChaner[T]
	Closer
}
Example

Check we can make a Pipelineable

package main

import "github.com/sterlingdevils/pipelines"

type Node[T any] struct {
}

func (n Node[T]) PipelineChan() chan T {
	return nil
}

func (n Node[T]) Close() {
}

// Check we can make a Pipelineable
func main() {
	var p pipelines.Pipeline[int] = Node[int]{}
	p.Close()

}
Output:

type PipelineChaner added in v0.0.6

type PipelineChaner[T any] interface {
	// PipelineChan needs to return a R/W chan that is for the output channel, This is for chaining the pipeline
	PipelineChan() chan T
}

type RateLimiterPipe added in v0.1.1

type RateLimiterPipe[T DataSizer] struct {
	// contains filtered or unexported fields
}

func (*RateLimiterPipe[_]) Close added in v0.1.1

func (r *RateLimiterPipe[_]) Close()

func (RateLimiterPipe[T]) InChan added in v0.1.1

func (r RateLimiterPipe[T]) InChan() chan<- T

InChan

func (RateLimiterPipe[T]) New added in v0.1.1

func (rl RateLimiterPipe[T]) New(rLimit rate.Limit, bLimit int) *RateLimiterPipe[T]

func (RateLimiterPipe[T]) NewWithChannel added in v0.1.1

func (RateLimiterPipe[T]) NewWithChannel(rLimit rate.Limit, bLimit int, in chan T) *RateLimiterPipe[T]

func (RateLimiterPipe[T]) NewWithPipeline added in v0.1.1

func (rl RateLimiterPipe[T]) NewWithPipeline(rLimit rate.Limit, bLimit int, p Pipeline[T]) *RateLimiterPipe[T]

func (RateLimiterPipe[T]) OutChan added in v0.1.1

func (r RateLimiterPipe[T]) OutChan() <-chan T

OutChan

func (RateLimiterPipe[T]) PipelineChan added in v0.1.1

func (r RateLimiterPipe[T]) PipelineChan() chan T

PipelineChan returns a R/W channel that is used for pipelining

func (*RateLimiterPipe[_]) SetBurst added in v0.1.1

func (r *RateLimiterPipe[_]) SetBurst(n int)

func (*RateLimiterPipe[_]) SetLimit added in v0.1.1

func (r *RateLimiterPipe[_]) SetLimit(l rate.Limit)

type RetryPipe added in v0.1.0

type RetryPipe[K comparable, T Retryable[K]] struct {
	RetryTime  time.Duration
	ExpireTime time.Duration
	// contains filtered or unexported fields
}
Example
package main

import (
	"github.com/sterlingdevils/pipelines"
)

type rptKeyType uint64
type rptDataType []byte

type Obj struct {
	Sn   rptKeyType
	Data rptDataType
}

func (o *Obj) Key() rptKeyType {
	return o.Sn
}

func main() {
	retry := pipelines.RetryPipe[rptKeyType, *Obj]{}.New()

	retry.Close()
}
Output:

Example (Inout)
package main

import (
	"fmt"
	"time"

	"github.com/sterlingdevils/gobase"
	"github.com/sterlingdevils/pipelines"
)

type rptKeyType uint64
type rptDataType []byte

type Obj struct {
	Sn   rptKeyType
	Data rptDataType
}

func (o *Obj) Key() rptKeyType {
	return o.Sn
}

func NewObj(timeout time.Duration) (*Obj, error) {
	return &Obj{}, nil
}

func main() {
	sn := (&gobase.SerialNum{}).New()
	retry := pipelines.RetryPipe[rptKeyType, *Obj]{}.New()

	go func() {
		for o := range retry.OutChan() {
			fmt.Println(o.Key())
		}
	}()

	for i := 0; i < 10; i++ {
		o, _ := NewObj(2 * time.Second)
		o.Sn = rptKeyType(sn.Next())
		retry.InChan() <- o
	}

	time.Sleep(4 * time.Second)
	retry.Close()

}
Output:

0
1
2
3
4
5
6
7
8
9
0
1
2
3
4
5
6
7
8
9
Example (Pointercheck)
package main

import (
	"fmt"
	"time"

	"github.com/sterlingdevils/pipelines"
)

type rptKeyType uint64
type rptDataType []byte

type Obj struct {
	Sn   rptKeyType
	Data rptDataType
}

func (o *Obj) Key() rptKeyType {
	return o.Sn
}

func NewObj(timeout time.Duration) (*Obj, error) {
	return &Obj{}, nil
}

func main() {
	retry := pipelines.RetryPipe[rptKeyType, *Obj]{}.New()

	// Check that we are passing pointer
	o, _ := NewObj(5 * time.Second)
	retry.InChan() <- o
	o.Sn = 5
	go func() {
		for o := range retry.OutChan() {
			fmt.Println(o.Key())
		}
	}()

	retry.InChan() <- o

	time.Sleep(2 * time.Second)
	retry.Close()

}
Output:

5
5

func (RetryPipe[K, _]) AckIn added in v0.1.0

func (r RetryPipe[K, _]) AckIn() chan<- K

AckIn

func (*RetryPipe[_, _]) Close added in v0.1.0

func (r *RetryPipe[_, _]) Close()

Close us

Example
package main

import (
	"github.com/sterlingdevils/pipelines"
)

type rptKeyType uint64
type rptDataType []byte

type Obj struct {
	Sn   rptKeyType
	Data rptDataType
}

func (o *Obj) Key() rptKeyType {
	return o.Sn
}

func main() {
	retry := pipelines.RetryPipe[rptKeyType, *Obj]{}.New()
	retry.Close()
}
Output:

func (RetryPipe[_, T]) InChan added in v0.1.0

func (r RetryPipe[_, T]) InChan() chan<- T

func (RetryPipe[K, T]) New added in v0.1.0

func (r RetryPipe[K, T]) New() *RetryPipe[K, T]

New

func (RetryPipe[K, T]) NewWithChannel added in v0.1.0

func (RetryPipe[K, T]) NewWithChannel(in chan T) *RetryPipe[K, T]

New with input channel

func (RetryPipe[K, T]) NewWithPipeline added in v0.1.0

func (r RetryPipe[K, T]) NewWithPipeline(p Pipeline[T]) *RetryPipe[K, T]

New with pipeline

func (RetryPipe[_, T]) OutChan added in v0.1.0

func (r RetryPipe[_, T]) OutChan() <-chan T

func (RetryPipe[_, T]) PipelineChan added in v0.1.0

func (r RetryPipe[_, T]) PipelineChan() chan T

PipelineChan returns a R/W channel that is used for pipelining

func (*RetryPipe[K, _]) SetAckIn added in v0.1.0

func (r *RetryPipe[K, _]) SetAckIn(c chan K)

SetAckIn

type RetryThing added in v0.1.0

type RetryThing[K comparable, T Retryable[K]] struct {
	LastRetry time.Time
	// contains filtered or unexported fields
}

func (RetryThing[_, _]) Created added in v0.1.0

func (p RetryThing[_, _]) Created() time.Time

func (RetryThing[K, _]) Key added in v0.1.0

func (p RetryThing[K, _]) Key() K

func (RetryThing[K, T]) New added in v0.1.0

func (RetryThing[K, T]) New(k K, t T) *RetryThing[K, T]

func (RetryThing[_, T]) Thing added in v0.1.0

func (p RetryThing[_, T]) Thing() T

type Retryable added in v0.1.0

type Retryable[K comparable] interface {
	Keyer[K]
}

type Sizer added in v0.1.0

type Sizer interface {
	Size() int
}

type ThrottlePipe added in v0.1.1

type ThrottlePipe[T any] struct {
	// contains filtered or unexported fields
}

func (ThrottlePipe[T]) AddTok added in v0.1.1

func (b ThrottlePipe[T]) AddTok() chan<- uint64

Returns a write channel used for adding to the number of tokens

func (*ThrottlePipe[_]) Close added in v0.1.1

func (b *ThrottlePipe[_]) Close()

Close

func (ThrottlePipe[T]) InChan added in v0.1.1

func (b ThrottlePipe[T]) InChan() chan<- T

PipelineChan returns a R/W channel that is used for pipelining

func (ThrottlePipe[T]) New added in v0.1.1

func (b ThrottlePipe[T]) New() *ThrottlePipe[T]

New creates a new logger name is used to put unique label on each log

Example
package main

import (
	"errors"
	"fmt"
	"log"
	"time"

	"github.com/sterlingdevils/pipelines"
)

func writePipe(p *pipelines.ThrottlePipe[int]) {
	for i := 0; i < 10; i++ {

		p.InChan() <- i
	}
}

func main() {
	log.Printf("Starting test\n")
	throt := pipelines.ThrottlePipe[int]{}.New()
	conv := pipelines.ConverterPipe[int, int]{}.NewWithPipeline(throt,
		func(i int) (int, error) {
			fmt.Printf("Reading: %v\n", i)
			return 0, errors.New("This is not an error")
		})
	nullpipe := pipelines.NullConsumePipe[int]{}.NewWithPipeline(conv)
	defer nullpipe.Close()

	go writePipe(throt)

	time.Sleep(2 * time.Second)
	throt.SetTok() <- 5
	throt.AddTok() <- 2

	time.Sleep(2 * time.Second)
	throt.SetTok() <- 2

	time.Sleep(1 * time.Second)
	throt.SetTok() <- 3

	time.Sleep(1 * time.Second)
}
Output:

Reading: 0
Reading: 1
Reading: 2
Reading: 3
Reading: 4
Reading: 5
Reading: 6
Reading: 7
Reading: 8
Reading: 9

func (ThrottlePipe[T]) NewWithChannel added in v0.1.1

func (ThrottlePipe[T]) NewWithChannel(in chan T) *ThrottlePipe[T]

func (ThrottlePipe[T]) NewWithPipeline added in v0.1.1

func (b ThrottlePipe[T]) NewWithPipeline(p Pipeline[T]) *ThrottlePipe[T]

func (ThrottlePipe[T]) OutChan added in v0.1.1

func (b ThrottlePipe[T]) OutChan() <-chan T

PipelineChan returns a R/W channel that is used for pipelining

func (ThrottlePipe[T]) PipelineChan added in v0.1.1

func (b ThrottlePipe[T]) PipelineChan() chan T

PipelineChan returns a R/W channel that is used for pipelining

func (ThrottlePipe[T]) SetTok added in v0.1.1

func (b ThrottlePipe[T]) SetTok() chan<- uint64

Returns a write channel used for setting the number of tokens

type TypeConverterPipe added in v0.1.0

type TypeConverterPipe[I any, O any] struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	"fmt"
	"reflect"

	"github.com/sterlingdevils/pipelines"
)

type Iface interface {
	Boo() int
}

type tctA struct{}

func (a tctA) Boo() int {
	return 2
}

func main() {
	cvt := pipelines.TypeConverterPipe[tctA, Iface]{}.New()

	cvt.InChan() <- tctA{}
	var o Iface = <-cvt.OutChan()

	fmt.Println(o.Boo(), reflect.TypeOf(o))

}
Output:

2 pipelines_test.tctA

func (*TypeConverterPipe[_, _]) Close added in v0.1.0

func (c *TypeConverterPipe[_, _]) Close()

Close

func (TypeConverterPipe[I, O]) InChan added in v0.1.0

func (c TypeConverterPipe[I, O]) InChan() chan<- I

InChan

func (TypeConverterPipe[I, O]) New added in v0.1.0

func (t TypeConverterPipe[I, O]) New() *TypeConverterPipe[I, O]

func (TypeConverterPipe[I, O]) NewWithChannel added in v0.1.0

func (TypeConverterPipe[I, O]) NewWithChannel(in chan I) *TypeConverterPipe[I, O]

func (TypeConverterPipe[I, O]) NewWithPipeline added in v0.1.0

func (t TypeConverterPipe[I, O]) NewWithPipeline(p Pipeline[I]) *TypeConverterPipe[I, O]

func (TypeConverterPipe[I, O]) OutChan added in v0.1.0

func (c TypeConverterPipe[I, O]) OutChan() <-chan O

OutChan

func (TypeConverterPipe[_, O]) PipelineChan added in v0.1.0

func (c TypeConverterPipe[_, O]) PipelineChan() chan O

PipelineChan returns a R/W channel that is used for pipelining

type UDPPipe added in v0.1.0

type UDPPipe struct {
	// contains filtered or unexported fields
}

UDPPipe holds our private data for the component

func (*UDPPipe) Close added in v0.1.0

func (u *UDPPipe) Close()

Close will shutdown the output channel and cancel the context for the listen

func (UDPPipe) InChan added in v0.1.0

func (u UDPPipe) InChan() chan<- Packetable

InChan returns a write only channel that the incomming packets will be read from

func (UDPPipe) New added in v0.1.0

func (u UDPPipe) New(port int) (*UDPPipe, error)

New will create a UDP component with little fuss for the caller it takes just a port. It will always setup a SERVER mode component

Example
package main

import (
	"fmt"
	"net"

	"github.com/sterlingdevils/pipelines"
)

const TESTPORT = 9092

func main() {
	udpcomp, err := pipelines.UDPPipe{}.New(TESTPORT)
	if err != nil {
		fmt.Printf("failed to create udp component")
	}

	// Send a Packet
	udpcomp.InChan() <- &pipelines.Packet{Addr: net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: TESTPORT}, DataSlice: []byte("Hello from Us.")}

	// Receive a Packet and Display it
	p := <-udpcomp.OutChan()
	fmt.Printf("%v: %v\n", p.Address(), p.Data())

}
Output:

{127.0.0.1 9092 }: [72 101 108 108 111 32 102 114 111 109 32 85 115 46]

func (UDPPipe) NewWithChan added in v0.1.0

func (u UDPPipe) NewWithChan(port int, in chan Packetable) (*UDPPipe, error)

NewwithChan will create a UDP component with little fuss for the caller it takes just a port and input channel. It will always setup a SERVER mode component

func (UDPPipe) NewWithParams added in v0.1.0

func (u UDPPipe) NewWithParams(in1 chan Packetable, addr string, ct ConnType, outChanSize int) (*UDPPipe, error)

NewWithParams will return a UDP connection component, it can be setup with as a Server to listen for incomming connections, or a client to connect out to a server. After that client and server mode work the same. Either way it will read from in channel and then send the packet, and it will listen for incomming packets on the socket and put them onto the output channel

This code uses the waitgoup and will add 1 for each routine it starts. The Close method needs to be called so we stop all our routines

NOTE:
  The input channel we will not close, we assume we do not own it
Example

Example of how to create,receive and send packets

This will create a UDP component and then send a packet, receive the udp, then display it, and check the display is correct.

package main

import (
	"fmt"
	"log"
	"net"
	"time"

	"github.com/sterlingdevils/pipelines"
)

const TESTPORT = 9092

func main() {
	in := make(chan pipelines.Packetable, 1)

	// Must pass in the input channel as we dont assume we own it
	udpcomp, err := pipelines.UDPPipe{}.NewWithParams(in, ":9092", pipelines.SERVER, 1)
	if err != nil {
		log.Fatalln("error creating UDP")
	}

	// Wait for 1 second, then send a packet to our self, and display it, exit
loopexit:
	for {
		select {
		case <-time.After(time.Second * 1):
			in <- &pipelines.Packet{Addr: net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: TESTPORT}, DataSlice: []byte("Hello from Us.")}
		case p := <-udpcomp.OutChan():
			fmt.Printf("%v: %v\n", p.Address(), p.Data())
			break loopexit
		}
	}

	// Test that when we close we will release the waitgroup
	udpcomp.Close()

	// Close the input channel so we stop reading
	close(in)

}
Output:

{127.0.0.1 9092 }: [72 101 108 108 111 32 102 114 111 109 32 85 115 46]

func (UDPPipe) NewWithPipeline added in v0.1.0

func (u UDPPipe) NewWithPipeline(port int, p Pipeline[Packetable]) (*UDPPipe, error)

NewWithPipeline takes a pipelineable

func (UDPPipe) OutChan added in v0.1.0

func (u UDPPipe) OutChan() <-chan Packetable

OutChan returns a read only output channel that the incomming UDP packets will be placed onto

func (UDPPipe) PipelineChan added in v0.1.0

func (u UDPPipe) PipelineChan() chan Packetable

PipelineChan returns a R/W channel that is used for pipelining

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL