Documentation ¶
Overview ¶
package chanbaedcontainer implements an ordered container with only channels as the API
The input channel is used to add things to the container. The output channel will contain the head of the container when read The delete channel is used to remove things from the container before the are read out of the output channel.
This uses Go 1.18 generics, Things must impement the Indexable interface:
has a method to return a comparable key
Things come in and go out the channels in order. Things can be removed while in the container by passing their key to the delete channel
Package udp implements a UDP socket component that uses go channels for sending and receiving UDP packets.
The main interface is two channels, one input channel and one output channel. Any Packet put onto the input channel will be sent out a UDP network socket. Any received UDP packets from the network socket will be placed onto the output channel.
The input channel is passed into this component so the caller can control the life time of the channel. It should be closed to cause the input channel processing routine to finish.
For the SERVER mode, the component will listen on the passed in port, the underling socket does not contain a destination address so it needs to be set in the Packet that is put onto the input channel.
For the CLIENT mode, the component will Dial the address passed in, the underling socket contains that address and the send will ignore any address set in the Packet. All outgoing Packets will be sent the the address passed in the New call.
New(port) will handle creating the waitgroup and input channel NewwithParams(...) can be give the caller more options
Example ¶
package main import ( "fmt" "os" "sync" "time" "github.com/sterlingdevils/pipelines" ) type DataHold struct { data []byte } func (d DataHold) Data() []byte { return d.data } func readOut(wg *sync.WaitGroup, o chan string) { defer wg.Done() count := 0 for n := range o { count++ fmt.Println(count) _ = n } } func main() { os.Chdir("/tmp") fd := pipelines.FileDump{}.New() ochan := make(chan string) fd.Outchan = &ochan var wg sync.WaitGroup wg.Add(1) go readOut(&wg, ochan) // Send a Packet fd.InChan() <- pipelines.Packet{DataSlice: []byte("Hello, World!")} fd.InChan() <- pipelines.Packet{DataSlice: []byte("Gimme Jimmy")} fd.InChan() <- pipelines.Packet{DataSlice: []byte("See what happens with special characters\nOn this line")} fd.InChan() <- DataHold{data: []byte("This is another type of input")} time.Sleep(1 * time.Second) fd.Close() close(ochan) wg.Wait() }
Output: 1 2 3 4
Example (Testsend) ¶
package main import ( "fmt" "github.com/sterlingdevils/pipelines" ) type DataType string type StringNode struct { data DataType } func (n StringNode) Size() int { return len(n.data) } func (n StringNode) Data() []byte { return []byte(n.data) } func main() { n := StringNode{data: "potatoes"} r := pipelines.RateLimiterPipe[StringNode]{}.New(4, n.Size()) r.InChan() <- n t := <-r.OutChan() fmt.Println(t.data) }
Output: potatoes
Example (Testsend2) ¶
package main import ( "fmt" "github.com/sterlingdevils/pipelines" ) type DataType string type StringNode struct { data DataType } func (n StringNode) Size() int { return len(n.data) } func (n StringNode) Data() []byte { return []byte(n.data) } func main() { n := StringNode{data: "potatoes"} r := pipelines.RateLimiterPipe[StringNode]{}.New(1, n.Size()) r.InChan() <- n t := <-r.OutChan() r.InChan() <- n t = <-r.OutChan() fmt.Println(t.data) }
Output: potatoes
Index ¶
- Constants
- type AsyncSkipPipe
- func (b *AsyncSkipPipe[_]) Close()
- func (b AsyncSkipPipe[T]) InChan() chan<- T
- func (b AsyncSkipPipe[T]) New() *AsyncSkipPipe[T]
- func (AsyncSkipPipe[T]) NewWithChannel(in chan T) *AsyncSkipPipe[T]
- func (b AsyncSkipPipe[T]) NewWithPipeline(p Pipeline[T]) *AsyncSkipPipe[T]
- func (b AsyncSkipPipe[T]) OutChan() <-chan T
- func (b AsyncSkipPipe[T]) PipelineChan() chan T
- type BufferPipe
- func (b *BufferPipe[_]) Close()
- func (b BufferPipe[T]) InChan() chan<- T
- func (BufferPipe[T]) New(size int) (*BufferPipe[T], error)
- func (BufferPipe[T]) NewWithChannel(size int, in chan T) (*BufferPipe[T], error)
- func (b BufferPipe[T]) NewWithPipeline(size int, p Pipeline[T]) (*BufferPipe[T], error)
- func (b BufferPipe[T]) OutChan() <-chan T
- func (b BufferPipe[T]) PipelineChan() chan T
- type Closer
- type ConnType
- type ContainerPipe
- func (c ContainerPipe[_, _]) ApproxSize() int32
- func (c *ContainerPipe[_, _]) Close()
- func (c ContainerPipe[K, _]) DelChan() chan<- K
- func (c ContainerPipe[_, T]) InChan() chan<- T
- func (c ContainerPipe[K, T]) New() *ContainerPipe[K, T]
- func (ContainerPipe[K, T]) NewWithChan(in chan T) *ContainerPipe[K, T]
- func (c ContainerPipe[K, T]) NewWithPipeline(p Pipeline[T]) *ContainerPipe[K, T]
- func (c ContainerPipe[_, T]) OutChan() <-chan T
- func (c ContainerPipe[_, T]) PipelineChan() chan T
- type ConverterPipe
- func (c *ConverterPipe[_, _]) Close()
- func (c ConverterPipe[I, O]) InChan() chan<- I
- func (c ConverterPipe[I, O]) New(fun func(I) (O, error)) *ConverterPipe[I, O]
- func (ConverterPipe[I, O]) NewWithChannel(in chan I, fun func(I) (O, error)) *ConverterPipe[I, O]
- func (c ConverterPipe[I, O]) NewWithPipeline(p Pipeline[I], fun func(I) (O, error)) *ConverterPipe[I, O]
- func (c ConverterPipe[I, O]) OutChan() <-chan O
- func (c ConverterPipe[_, O]) PipelineChan() chan O
- type DataSizer
- type Dataer
- type DirScan
- type File
- type FileDump
- type FileNamer
- type FileNamerDataer
- type FileReadPipe
- func (f *FileReadPipe) Close()
- func (f *FileReadPipe) InChan() chan<- string
- func (f FileReadPipe) New() *FileReadPipe
- func (FileReadPipe) NewWithChannel(in chan string) *FileReadPipe
- func (f FileReadPipe) NewWithPipeline(p Pipeline[string]) *FileReadPipe
- func (f *FileReadPipe) OutChan() <-chan Dataer
- func (f *FileReadPipe) PipelineChan() chan Dataer
- type FileWriterPipe
- type GeneratorPipe
- type KeyablePacket
- type Keyer
- type LogPipe
- func (b *LogPipe[_]) Close()
- func (b LogPipe[T]) InChan() chan<- T
- func (b LogPipe[T]) New(name string) *LogPipe[T]
- func (LogPipe[T]) NewWithChannel(name string, in chan T) *LogPipe[T]
- func (b LogPipe[T]) NewWithPipeline(name string, p Pipeline[T]) *LogPipe[T]
- func (b LogPipe[T]) OutChan() <-chan T
- func (b LogPipe[T]) PipelineChan() chan T
- type NullConsumePipe
- type OnlyOncePipe
- func (b *OnlyOncePipe[_]) Close()
- func (b OnlyOncePipe[T]) InChan() chan<- T
- func (b OnlyOncePipe[T]) New(gc time.Duration, fr time.Duration) *OnlyOncePipe[T]
- func (OnlyOncePipe[T]) NewWithChannel(gc time.Duration, fr time.Duration, in chan T) *OnlyOncePipe[T]
- func (b OnlyOncePipe[T]) NewWithPipeline(gc time.Duration, fr time.Duration, p Pipeline[T]) *OnlyOncePipe[T]
- func (b OnlyOncePipe[T]) OutChan() <-chan T
- func (b OnlyOncePipe[T]) PipelineChan() chan T
- type Packet
- type Packetable
- type Pipeline
- type PipelineChaner
- type RateLimiterPipe
- func (r *RateLimiterPipe[_]) Close()
- func (r RateLimiterPipe[T]) InChan() chan<- T
- func (rl RateLimiterPipe[T]) New(rLimit rate.Limit, bLimit int) *RateLimiterPipe[T]
- func (RateLimiterPipe[T]) NewWithChannel(rLimit rate.Limit, bLimit int, in chan T) *RateLimiterPipe[T]
- func (rl RateLimiterPipe[T]) NewWithPipeline(rLimit rate.Limit, bLimit int, p Pipeline[T]) *RateLimiterPipe[T]
- func (r RateLimiterPipe[T]) OutChan() <-chan T
- func (r RateLimiterPipe[T]) PipelineChan() chan T
- func (r *RateLimiterPipe[_]) SetBurst(n int)
- func (r *RateLimiterPipe[_]) SetLimit(l rate.Limit)
- type RetryPipe
- func (r RetryPipe[K, _]) AckIn() chan<- K
- func (r *RetryPipe[_, _]) Close()
- func (r RetryPipe[_, T]) InChan() chan<- T
- func (r RetryPipe[K, T]) New() *RetryPipe[K, T]
- func (RetryPipe[K, T]) NewWithChannel(in chan T) *RetryPipe[K, T]
- func (r RetryPipe[K, T]) NewWithPipeline(p Pipeline[T]) *RetryPipe[K, T]
- func (r RetryPipe[_, T]) OutChan() <-chan T
- func (r RetryPipe[_, T]) PipelineChan() chan T
- func (r *RetryPipe[K, _]) SetAckIn(c chan K)
- type RetryThing
- type Retryable
- type Sizer
- type ThrottlePipe
- func (b ThrottlePipe[T]) AddTok() chan<- uint64
- func (b *ThrottlePipe[_]) Close()
- func (b ThrottlePipe[T]) InChan() chan<- T
- func (b ThrottlePipe[T]) New() *ThrottlePipe[T]
- func (ThrottlePipe[T]) NewWithChannel(in chan T) *ThrottlePipe[T]
- func (b ThrottlePipe[T]) NewWithPipeline(p Pipeline[T]) *ThrottlePipe[T]
- func (b ThrottlePipe[T]) OutChan() <-chan T
- func (b ThrottlePipe[T]) PipelineChan() chan T
- func (b ThrottlePipe[T]) SetTok() chan<- uint64
- type TypeConverterPipe
- func (c *TypeConverterPipe[_, _]) Close()
- func (c TypeConverterPipe[I, O]) InChan() chan<- I
- func (t TypeConverterPipe[I, O]) New() *TypeConverterPipe[I, O]
- func (TypeConverterPipe[I, O]) NewWithChannel(in chan I) *TypeConverterPipe[I, O]
- func (t TypeConverterPipe[I, O]) NewWithPipeline(p Pipeline[I]) *TypeConverterPipe[I, O]
- func (c TypeConverterPipe[I, O]) OutChan() <-chan O
- func (c TypeConverterPipe[_, O]) PipelineChan() chan O
- type UDPPipe
- func (u *UDPPipe) Close()
- func (u UDPPipe) InChan() chan<- Packetable
- func (u UDPPipe) New(port int) (*UDPPipe, error)
- func (u UDPPipe) NewWithChan(port int, in chan Packetable) (*UDPPipe, error)
- func (u UDPPipe) NewWithParams(in1 chan Packetable, addr string, ct ConnType, outChanSize int) (*UDPPipe, error)
- func (u UDPPipe) NewWithPipeline(port int, p Pipeline[Packetable]) (*UDPPipe, error)
- func (u UDPPipe) OutChan() <-chan Packetable
- func (u UDPPipe) PipelineChan() chan Packetable
Examples ¶
- Package
- Package (Testsend)
- Package (Testsend2)
- ContainerPipe (Duptest)
- ContainerPipe (Fullnopointers)
- ContainerPipe (Fullpointers)
- ContainerPipe (Testdeloffirst)
- ContainerPipe (Testdelofsecond)
- ContainerPipe (TestdelonNotThere)
- ContainerPipe (Testtwoitemsinorder)
- ContainerPipe.ApproxSize
- ContainerPipe.Close
- ContainerPipe.InChan
- ContainerPipe.New
- ConverterPipe (Fixedoutput)
- ConverterPipe (Numchar)
- ConverterPipe.New
- FileReadPipe (Fileconsume)
- FileWriterPipe
- GeneratorPipe (Increment)
- LogPipe
- NullConsumePipe
- Pipeline
- RetryPipe
- RetryPipe (Inout)
- RetryPipe (Pointercheck)
- RetryPipe.Close
- ThrottlePipe.New
- TypeConverterPipe
- UDPPipe.New
- UDPPipe.NewWithParams
Constants ¶
const ( RETRYTIME = 13 * time.Second EXPIRETIME = 35 * time.Second )
const ( // SERVER is used to create a listen socket SERVER = ConnType(1) // CLIENT is used to create a connected socket (using Dial) CLIENT = ConnType(2) )
Socket Connection type
const (
CHANSIZE = 0
)
const (
// Max size of a Packet Data slice
MaxPacketSize = 65507
)
udp constants for the protocol
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AsyncSkipPipe ¶ added in v0.1.1
type AsyncSkipPipe[T any] struct { // contains filtered or unexported fields }
func (AsyncSkipPipe[T]) InChan ¶ added in v0.1.1
func (b AsyncSkipPipe[T]) InChan() chan<- T
PipelineChan returns a R/W channel that is used for pipelining
func (AsyncSkipPipe[T]) New ¶ added in v0.1.1
func (b AsyncSkipPipe[T]) New() *AsyncSkipPipe[T]
New creates a new logger name is used to put unique label on each log
func (AsyncSkipPipe[T]) NewWithChannel ¶ added in v0.1.1
func (AsyncSkipPipe[T]) NewWithChannel(in chan T) *AsyncSkipPipe[T]
func (AsyncSkipPipe[T]) NewWithPipeline ¶ added in v0.1.1
func (b AsyncSkipPipe[T]) NewWithPipeline(p Pipeline[T]) *AsyncSkipPipe[T]
func (AsyncSkipPipe[T]) OutChan ¶ added in v0.1.1
func (b AsyncSkipPipe[T]) OutChan() <-chan T
PipelineChan returns a R/W channel that is used for pipelining
func (AsyncSkipPipe[T]) PipelineChan ¶ added in v0.1.1
func (b AsyncSkipPipe[T]) PipelineChan() chan T
PipelineChan returns a R/W channel that is used for pipelining
type BufferPipe ¶ added in v0.1.0
type BufferPipe[T any] struct { // contains filtered or unexported fields }
func (BufferPipe[T]) New ¶ added in v0.1.0
func (BufferPipe[T]) New(size int) (*BufferPipe[T], error)
func (BufferPipe[T]) NewWithChannel ¶ added in v0.1.0
func (BufferPipe[T]) NewWithChannel(size int, in chan T) (*BufferPipe[T], error)
func (BufferPipe[T]) NewWithPipeline ¶ added in v0.1.0
func (b BufferPipe[T]) NewWithPipeline(size int, p Pipeline[T]) (*BufferPipe[T], error)
func (BufferPipe[T]) PipelineChan ¶ added in v0.1.0
func (b BufferPipe[T]) PipelineChan() chan T
PipelineChan returns a R/W channel that is used for pipelining
type Closer ¶ added in v0.0.6
type Closer interface {
// Close is used to clear up any resources made by the component
Close()
}
type ContainerPipe ¶ added in v0.1.0
type ContainerPipe[K comparable, T Keyer[K]] struct { // contains filtered or unexported fields }
Example (Duptest) ¶
package main import ( "fmt" "github.com/sterlingdevils/pipelines" ) // Node does uses a pointer receiver on Key, this is to test pointer things type noder struct { key int data string } func (n *noder) Key() int { return n.key } // readAndPrint is used to get num items from the output channel and display them func readAndPrint(num int, c <-chan *noder) { for i := 0; i < num; i++ { n := <-c fmt.Printf("%v, %v\n", n.key, n.data) } } func main() { r := pipelines.ContainerPipe[int, *noder]{}.New() r.InChan() <- &noder{key: 1, data: "I don't care what it is"} // This should be dropped as a dup r.InChan() <- &noder{key: 1, data: "This is a test"} readAndPrint(1, r.OutChan()) r.Close() }
Output: 1, I don't care what it is
Example (Fullnopointers) ¶
// This example will test if we dont pass pointer fully thru the container
package main import ( "fmt" "github.com/sterlingdevils/pipelines" ) // Node2 does not use a pointer receiver on Key, this is to test non-pointer things type node2 struct { key int } func (n node2) Key() int { return n.key } func main() { // Notice the small difference in T, we are no longer a pointer to Node r := pipelines.ContainerPipe[int, node2]{}.New() ni := node2{key: 1} r.InChan() <- ni no := <-r.OutChan() // Ok, we have our input node and the output after it went thru the container // lets change the key on the input node and make sure the output is not changed ni.key = 2 fmt.Println(no.key) r.Close() }
Output: 1
Example (Fullpointers) ¶
This example will test if we pass pointer fully thru the container
package main import ( "fmt" "github.com/sterlingdevils/pipelines" ) // Node does uses a pointer receiver on Key, this is to test pointer things type noder struct { key int data string } func (n *noder) Key() int { return n.key } func main() { // Notice that T is a pointer to a Node r := pipelines.ContainerPipe[int, *noder]{}.New() ni := &noder{key: 1, data: "I don't care what it is"} r.InChan() <- ni no := <-r.OutChan() // Ok, we have our input node and the output after it went thru the container // lets change the key on the input node and make sure it changed on the output ni.key = 2 fmt.Println(no.key) r.Close() }
Output: 2
Example (Testdeloffirst) ¶
package main import ( "fmt" "github.com/sterlingdevils/pipelines" ) // Node does uses a pointer receiver on Key, this is to test pointer things type noder struct { key int data string } func (n *noder) Key() int { return n.key } // readAndPrint is used to get num items from the output channel and display them func readAndPrint(num int, c <-chan *noder) { for i := 0; i < num; i++ { n := <-c fmt.Printf("%v, %v\n", n.key, n.data) } } func main() { r := pipelines.ContainerPipe[int, *noder]{}.New() r.InChan() <- &noder{key: 1, data: "I don't care what it is"} r.InChan() <- &noder{key: 2, data: "This is a test"} r.InChan() <- &noder{key: 3, data: "This is a test again"} r.DelChan() <- 1 readAndPrint(2, r.OutChan()) r.Close() }
Output: 2, This is a test 3, This is a test again
Example (Testdelofsecond) ¶
package main import ( "fmt" "github.com/sterlingdevils/pipelines" ) // Node does uses a pointer receiver on Key, this is to test pointer things type noder struct { key int data string } func (n *noder) Key() int { return n.key } // readAndPrint is used to get num items from the output channel and display them func readAndPrint(num int, c <-chan *noder) { for i := 0; i < num; i++ { n := <-c fmt.Printf("%v, %v\n", n.key, n.data) } } func main() { r := pipelines.ContainerPipe[int, *noder]{}.New() r.InChan() <- &noder{key: 1, data: "I don't care what it is"} r.InChan() <- &noder{key: 2, data: "This is a test"} r.InChan() <- &noder{key: 3, data: "This is a test again"} r.DelChan() <- 2 readAndPrint(2, r.OutChan()) r.Close() }
Output: 1, I don't care what it is 3, This is a test again
Example (TestdelonNotThere) ¶
package main import ( "fmt" "github.com/sterlingdevils/pipelines" ) // Node does uses a pointer receiver on Key, this is to test pointer things type noder struct { key int data string } func (n *noder) Key() int { return n.key } // readAndPrint is used to get num items from the output channel and display them func readAndPrint(num int, c <-chan *noder) { for i := 0; i < num; i++ { n := <-c fmt.Printf("%v, %v\n", n.key, n.data) } } func main() { r := pipelines.ContainerPipe[int, *noder]{}.New() r.InChan() <- &noder{key: 1, data: "I don't care what it is"} r.InChan() <- &noder{key: 2, data: "This is a test"} r.DelChan() <- 3 readAndPrint(2, r.OutChan()) r.Close() }
Output: 1, I don't care what it is 2, This is a test
Example (Testtwoitemsinorder) ¶
package main import ( "fmt" "github.com/sterlingdevils/pipelines" ) // Node does uses a pointer receiver on Key, this is to test pointer things type noder struct { key int data string } func (n *noder) Key() int { return n.key } // readAndPrint is used to get num items from the output channel and display them func readAndPrint(num int, c <-chan *noder) { for i := 0; i < num; i++ { n := <-c fmt.Printf("%v, %v\n", n.key, n.data) } } func main() { r := pipelines.ContainerPipe[int, *noder]{}.New() r.InChan() <- &noder{key: 1, data: "I don't care what it is"} r.InChan() <- &noder{key: 2, data: "This is a test"} readAndPrint(2, r.OutChan()) r.Close() }
Output: 1, I don't care what it is 2, This is a test
func (ContainerPipe[_, _]) ApproxSize ¶ added in v0.1.0
func (c ContainerPipe[_, _]) ApproxSize() int32
ApproxSize returns something close to the number of items in the container, maybe. Only updated at the start of each mainloop
Example ¶
Checks the ApproxSize that it returns something close
package main import ( "fmt" "time" "github.com/sterlingdevils/pipelines" ) // Node does uses a pointer receiver on Key, this is to test pointer things type noder struct { key int data string } func (n *noder) Key() int { return n.key } func main() { numwrite := 100 r := pipelines.ContainerPipe[int, *noder]{}.New() s1 := r.ApproxSize() r.InChan() <- &noder{key: 1} s2 := r.ApproxSize() <-r.OutChan() time.Sleep(10 * time.Millisecond) // Have to wait a little for mainloop to cycle s3 := r.ApproxSize() for i := 0; i < numwrite; i++ { r.InChan() <- &noder{key: i} } time.Sleep(10 * time.Millisecond) // Have to wait a little for mainloop to cycle s4 := r.ApproxSize() fmt.Println(s1, s2, s3, s4) r.Close() }
Output: 0 1 0 100
func (*ContainerPipe[_, _]) Close ¶ added in v0.1.0
func (c *ContainerPipe[_, _]) Close()
Close the ChanBasedContainer
Example ¶
ExampleContainerPipe_Close
package main import ( "github.com/sterlingdevils/pipelines" ) // Node2 does not use a pointer receiver on Key, this is to test non-pointer things type node2 struct { key int } func (n node2) Key() int { return n.key } func main() { r := pipelines.ContainerPipe[int, node2]{}.New() r.Close() }
Output:
func (ContainerPipe[K, _]) DelChan ¶ added in v0.1.0
func (c ContainerPipe[K, _]) DelChan() chan<- K
DelChan
func (ContainerPipe[_, T]) InChan ¶ added in v0.1.0
func (c ContainerPipe[_, T]) InChan() chan<- T
InChan
Example ¶
ExampleContainerPipe_InChan
package main import ( "github.com/sterlingdevils/pipelines" ) // Node does uses a pointer receiver on Key, this is to test pointer things type noder struct { key int data string } func (n *noder) Key() int { return n.key } func main() { r := pipelines.ContainerPipe[int, *noder]{}.New() r.InChan() <- &noder{key: 7, data: "This is a test"} r.Close() }
Output:
func (ContainerPipe[K, T]) New ¶ added in v0.1.0
func (c ContainerPipe[K, T]) New() *ContainerPipe[K, T]
New returns a reference to a a container or error if there was a problem for performance T should be a pointer
Example ¶
package main import ( "github.com/sterlingdevils/pipelines" ) // Node2 does not use a pointer receiver on Key, this is to test non-pointer things type node2 struct { key int } func (n node2) Key() int { return n.key } func main() { _ = pipelines.ContainerPipe[int, node2]{}.New() }
Output:
func (ContainerPipe[K, T]) NewWithChan ¶ added in v0.1.0
func (ContainerPipe[K, T]) NewWithChan(in chan T) *ContainerPipe[K, T]
func (ContainerPipe[K, T]) NewWithPipeline ¶ added in v0.1.0
func (c ContainerPipe[K, T]) NewWithPipeline(p Pipeline[T]) *ContainerPipe[K, T]
func (ContainerPipe[_, T]) OutChan ¶ added in v0.1.0
func (c ContainerPipe[_, T]) OutChan() <-chan T
OutChan
func (ContainerPipe[_, T]) PipelineChan ¶ added in v0.1.0
func (c ContainerPipe[_, T]) PipelineChan() chan T
PipelineChan returns a R/W channel that is used for pipelining
type ConverterPipe ¶ added in v0.1.0
Example (Fixedoutput) ¶
package main import ( "fmt" "github.com/sterlingdevils/pipelines" ) // ignore the input and just return hello func returnHello(i int) (string, error) { return "Hello", nil } func main() { cvt := pipelines.ConverterPipe[int, string]{}.New(returnHello) cvt.InChan() <- 5 o := <-cvt.OutChan() fmt.Println(o) }
Output: Hello
Example (Numchar) ¶
package main import ( "fmt" "github.com/sterlingdevils/pipelines" ) func main() { cvt := pipelines.ConverterPipe[int, string]{}.New( func(i int) (string, error) { return fmt.Sprintf("%010d", i), nil }) cvt.InChan() <- 5 o := <-cvt.OutChan() fmt.Println(o) }
Output: 0000000005
func (ConverterPipe[I, O]) InChan ¶ added in v0.1.0
func (c ConverterPipe[I, O]) InChan() chan<- I
InChan
func (ConverterPipe[I, O]) New ¶ added in v0.1.0
func (c ConverterPipe[I, O]) New(fun func(I) (O, error)) *ConverterPipe[I, O]
Example ¶
package main import ( "fmt" "reflect" "strconv" "github.com/sterlingdevils/pipelines" ) func main() { cvt := pipelines.ConverterPipe[int, string]{}.New( func(i int) (string, error) { return strconv.Itoa(i), nil }) cvt.InChan() <- 5 o := <-cvt.OutChan() fmt.Println(o, reflect.TypeOf(o)) }
Output: 5 string
func (ConverterPipe[I, O]) NewWithChannel ¶ added in v0.1.0
func (ConverterPipe[I, O]) NewWithChannel(in chan I, fun func(I) (O, error)) *ConverterPipe[I, O]
func (ConverterPipe[I, O]) NewWithPipeline ¶ added in v0.1.0
func (c ConverterPipe[I, O]) NewWithPipeline(p Pipeline[I], fun func(I) (O, error)) *ConverterPipe[I, O]
func (ConverterPipe[I, O]) OutChan ¶ added in v0.1.0
func (c ConverterPipe[I, O]) OutChan() <-chan O
OutChan
func (ConverterPipe[_, O]) PipelineChan ¶ added in v0.1.0
func (c ConverterPipe[_, O]) PipelineChan() chan O
PipelineChan returns a R/W channel that is used for pipelining
type Dataer ¶ added in v0.0.6
type Dataer interface { // Data returns a byte slice to the data it holds Data() []byte }
type DirScan ¶ added in v0.1.0
DirScan holds a directry to scan and a Channel to put the filenames onto
func (DirScan) New ¶ added in v0.1.0
New creates a new dir scanner and starts a scanning loop to send filenames to a channel Must pass a WaitGroup it as we create a go routine for the scanner As a writter we assume we own the channel we return, we will close it when our Close() is called
func (DirScan) OutChan ¶ added in v0.1.0
----- Public Methods OutChan returns the output Channel as ReadOnly
func (DirScan) PipelineChan ¶ added in v0.1.0
PipelineChan returns a R/W channel that is used for pipelining
type File ¶ added in v0.1.0
type File struct { Reference string // contains filtered or unexported fields }
type FileDump ¶ added in v0.1.0
type FileDump struct { Outchan *chan string Metricfunc func(name string, val int) // contains filtered or unexported fields }
func (FileDump) InChan ¶ added in v0.1.0
InChan returns a write only channel that the incomming packets will be read from
func (FileDump) NewWithChannel ¶ added in v0.1.0
func (FileDump) NewWithPipeline ¶ added in v0.1.0
type FileNamerDataer ¶ added in v0.1.0
type FileReadPipe ¶ added in v0.1.0
type FileReadPipe struct {
// contains filtered or unexported fields
}
Example (Fileconsume) ¶
package main import ( "fmt" "log" "os" "github.com/sterlingdevils/pipelines" ) func main() { err := os.WriteFile("/tmp/dat1", []byte("This is a test"), 0666) if err != nil { log.Fatalln(err) } fp := pipelines.FileReadPipe{}.New() fp.InChan() <- "/tmp/dat1" fmt.Println(string((<-fp.OutChan()).Data())) fp.Close() }
Output: This is a test
func (*FileReadPipe) InChan ¶ added in v0.1.0
func (f *FileReadPipe) InChan() chan<- string
PipelineChan returns a R/W channel that is used for pipelining
func (FileReadPipe) New ¶ added in v0.1.0
func (f FileReadPipe) New() *FileReadPipe
New creates a new filereadpipe
func (FileReadPipe) NewWithChannel ¶ added in v0.1.0
func (FileReadPipe) NewWithChannel(in chan string) *FileReadPipe
func (FileReadPipe) NewWithPipeline ¶ added in v0.1.0
func (f FileReadPipe) NewWithPipeline(p Pipeline[string]) *FileReadPipe
func (*FileReadPipe) OutChan ¶ added in v0.1.0
func (f *FileReadPipe) OutChan() <-chan Dataer
PipelineChan returns a R/W channel that is used for pipelining
func (*FileReadPipe) PipelineChan ¶ added in v0.1.0
func (f *FileReadPipe) PipelineChan() chan Dataer
PipelineChan returns a R/W channel that is used for pipelining
type FileWriterPipe ¶ added in v0.1.0
type FileWriterPipe struct {
// contains filtered or unexported fields
}
Example ¶
package main import ( "os" "github.com/sterlingdevils/pipelines" ) type FileHolder struct { filename string data []byte } func (f FileHolder) FileName() string { return f.filename } func (f FileHolder) Data() []byte { return f.data } type FileHolder2 struct { filename string data []byte Arbitrary []int } func (f FileHolder2) FileName() string { return f.filename } func (f FileHolder2) Data() []byte { return f.data } func main() { os.Chdir("/tmp") fd := pipelines.FileWriterPipe{}.New() // Send a file fd.InChan() <- FileHolder{filename: "a.out", data: []byte("This is one type of input")} // Send a file as another type fd.InChan() <- FileHolder2{filename: "b.out", data: []byte("This is another type of input")} fd.Close() }
Output:
func (FileWriterPipe) InChan ¶ added in v0.1.0
func (b FileWriterPipe) InChan() chan<- FileNamerDataer
InChan returns a write only channel that the incomming packets will be read from
func (FileWriterPipe) New ¶ added in v0.1.0
func (f FileWriterPipe) New() *FileWriterPipe
New creates a new FileWriterPipe
func (FileWriterPipe) NewWithChannel ¶ added in v0.1.0
func (FileWriterPipe) NewWithChannel(in chan FileNamerDataer) *FileWriterPipe
func (FileWriterPipe) NewWithPipeline ¶ added in v0.1.0
func (f FileWriterPipe) NewWithPipeline(p Pipeline[FileNamerDataer]) *FileWriterPipe
type GeneratorPipe ¶ added in v0.1.0
type GeneratorPipe[T any] struct { Metricfunc func(gobase.MetricsProto) // contains filtered or unexported fields }
Example (Increment) ¶
package main import ( "fmt" "github.com/sterlingdevils/pipelines" ) func main() { i := 0 gen := pipelines.GeneratorPipe[int]{}.New( func() int { i++ return i }) for j := 0; j < 10; j++ { o := <-gen.OutChan() fmt.Println(o) } gen.Close() }
Output: 1 2 3 4 5 6 7 8 9 10
func (GeneratorPipe[T]) New ¶ added in v0.1.0
func (GeneratorPipe[T]) New(fun func() T) *GeneratorPipe[T]
func (GeneratorPipe[T]) OutChan ¶ added in v0.1.0
func (g GeneratorPipe[T]) OutChan() <-chan T
OutChan
func (GeneratorPipe[T]) PipelineChan ¶ added in v0.1.0
func (g GeneratorPipe[T]) PipelineChan() chan T
PipelineChan returns a R/W channel that is used for pipelining
type KeyablePacket ¶ added in v0.1.0
type KeyablePacket struct { // Addr holds a UDP address (with port) for the packet // will be ignored if UDP is created in CLIENT mode Addr net.UDPAddr // Data contains the data DataSlice []byte }
KeyablePacket holds a UDP address and Data from the UDP The channel types for the input and output channel are of this type
func (KeyablePacket) Address ¶ added in v0.1.0
func (p KeyablePacket) Address() net.UDPAddr
func (KeyablePacket) Data ¶ added in v0.1.0
func (p KeyablePacket) Data() []byte
func (KeyablePacket) Key ¶ added in v0.1.0
func (p KeyablePacket) Key() uint64
func (KeyablePacket) Size ¶ added in v0.1.0
func (p KeyablePacket) Size() int
type Keyer ¶ added in v0.0.6
type Keyer[K comparable] interface { Key() K }
type LogPipe ¶ added in v0.1.0
type LogPipe[T any] struct { // contains filtered or unexported fields }
Example ¶
package main import ( "fmt" "github.com/sterlingdevils/pipelines" ) func main() { lg := pipelines.LogPipe[int]{}.New("example") lg.InChan() <- 5 fmt.Println(<-lg.OutChan()) }
Output: 5
func (LogPipe[T]) InChan ¶ added in v0.1.0
func (b LogPipe[T]) InChan() chan<- T
PipelineChan returns a R/W channel that is used for pipelining
func (LogPipe[T]) New ¶ added in v0.1.0
New creates a new logger name is used to put unique label on each log
func (LogPipe[T]) NewWithChannel ¶ added in v0.1.0
func (LogPipe[T]) NewWithPipeline ¶ added in v0.1.0
func (LogPipe[T]) OutChan ¶ added in v0.1.0
func (b LogPipe[T]) OutChan() <-chan T
PipelineChan returns a R/W channel that is used for pipelining
func (LogPipe[T]) PipelineChan ¶ added in v0.1.0
func (b LogPipe[T]) PipelineChan() chan T
PipelineChan returns a R/W channel that is used for pipelining
type NullConsumePipe ¶ added in v0.1.0
type NullConsumePipe[T any] struct { // contains filtered or unexported fields }
Example ¶
package main import ( "fmt" "github.com/sterlingdevils/pipelines" ) func main() { nc := pipelines.NullConsumePipe[any]{}.New() inchan := nc.InChan() inchan <- 1 inchan <- 2 inchan <- 3 inchan <- 4 inchan <- 5 nc.Close() fmt.Println("reached end") }
Output: reached end
func (NullConsumePipe[T]) InChan ¶ added in v0.1.0
func (b NullConsumePipe[T]) InChan() chan<- T
PipelineChan returns a R/W channel that is used for pipelining
func (NullConsumePipe[T]) New ¶ added in v0.1.0
func (n NullConsumePipe[T]) New() *NullConsumePipe[T]
New creates a new logger name is used to put unique label on each log
func (NullConsumePipe[T]) NewWithChannel ¶ added in v0.1.0
func (NullConsumePipe[T]) NewWithChannel(in chan T) *NullConsumePipe[T]
func (NullConsumePipe[T]) NewWithPipeline ¶ added in v0.1.0
func (n NullConsumePipe[T]) NewWithPipeline(p Pipeline[T]) *NullConsumePipe[T]
type OnlyOncePipe ¶ added in v0.1.1
type OnlyOncePipe[T comparable] struct { // contains filtered or unexported fields }
func (OnlyOncePipe[T]) InChan ¶ added in v0.1.1
func (b OnlyOncePipe[T]) InChan() chan<- T
PipelineChan returns a R/W channel that is used for pipelining
func (OnlyOncePipe[T]) New ¶ added in v0.1.1
func (b OnlyOncePipe[T]) New(gc time.Duration, fr time.Duration) *OnlyOncePipe[T]
New creates a new logger name is used to put unique label on each log
func (OnlyOncePipe[T]) NewWithChannel ¶ added in v0.1.1
func (OnlyOncePipe[T]) NewWithChannel(gc time.Duration, fr time.Duration, in chan T) *OnlyOncePipe[T]
func (OnlyOncePipe[T]) NewWithPipeline ¶ added in v0.1.1
func (b OnlyOncePipe[T]) NewWithPipeline(gc time.Duration, fr time.Duration, p Pipeline[T]) *OnlyOncePipe[T]
func (OnlyOncePipe[T]) OutChan ¶ added in v0.1.1
func (b OnlyOncePipe[T]) OutChan() <-chan T
PipelineChan returns a R/W channel that is used for pipelining
func (OnlyOncePipe[T]) PipelineChan ¶ added in v0.1.1
func (b OnlyOncePipe[T]) PipelineChan() chan T
PipelineChan returns a R/W channel that is used for pipelining
type Packet ¶ added in v0.1.0
type Packet struct { // Addr holds a UDP address (with port) for the packet // will be ignored if UDP is created in CLIENT mode Addr net.UDPAddr // Data contains the data DataSlice []byte }
Packet holds a UDP address and Data from the UDP The channel types for the input and output channel are of this type
type Packetable ¶ added in v0.1.0
type Pipeline ¶ added in v0.0.6
type Pipeline[T any] interface { PipelineChaner[T] Closer }
Example ¶
Check we can make a Pipelineable
package main import "github.com/sterlingdevils/pipelines" type Node[T any] struct { } func (n Node[T]) PipelineChan() chan T { return nil } func (n Node[T]) Close() { } // Check we can make a Pipelineable func main() { var p pipelines.Pipeline[int] = Node[int]{} p.Close() }
Output:
type PipelineChaner ¶ added in v0.0.6
type PipelineChaner[T any] interface { // PipelineChan needs to return a R/W chan that is for the output channel, This is for chaining the pipeline PipelineChan() chan T }
type RateLimiterPipe ¶ added in v0.1.1
type RateLimiterPipe[T DataSizer] struct { // contains filtered or unexported fields }
func (*RateLimiterPipe[_]) Close ¶ added in v0.1.1
func (r *RateLimiterPipe[_]) Close()
func (RateLimiterPipe[T]) InChan ¶ added in v0.1.1
func (r RateLimiterPipe[T]) InChan() chan<- T
InChan
func (RateLimiterPipe[T]) New ¶ added in v0.1.1
func (rl RateLimiterPipe[T]) New(rLimit rate.Limit, bLimit int) *RateLimiterPipe[T]
func (RateLimiterPipe[T]) NewWithChannel ¶ added in v0.1.1
func (RateLimiterPipe[T]) NewWithChannel(rLimit rate.Limit, bLimit int, in chan T) *RateLimiterPipe[T]
func (RateLimiterPipe[T]) NewWithPipeline ¶ added in v0.1.1
func (rl RateLimiterPipe[T]) NewWithPipeline(rLimit rate.Limit, bLimit int, p Pipeline[T]) *RateLimiterPipe[T]
func (RateLimiterPipe[T]) OutChan ¶ added in v0.1.1
func (r RateLimiterPipe[T]) OutChan() <-chan T
OutChan
func (RateLimiterPipe[T]) PipelineChan ¶ added in v0.1.1
func (r RateLimiterPipe[T]) PipelineChan() chan T
PipelineChan returns a R/W channel that is used for pipelining
func (*RateLimiterPipe[_]) SetBurst ¶ added in v0.1.1
func (r *RateLimiterPipe[_]) SetBurst(n int)
func (*RateLimiterPipe[_]) SetLimit ¶ added in v0.1.1
func (r *RateLimiterPipe[_]) SetLimit(l rate.Limit)
type RetryPipe ¶ added in v0.1.0
type RetryPipe[K comparable, T Retryable[K]] struct { RetryTime time.Duration ExpireTime time.Duration // contains filtered or unexported fields }
Example ¶
package main import ( "github.com/sterlingdevils/pipelines" ) type rptKeyType uint64 type rptDataType []byte type Obj struct { Sn rptKeyType Data rptDataType } func (o *Obj) Key() rptKeyType { return o.Sn } func main() { retry := pipelines.RetryPipe[rptKeyType, *Obj]{}.New() retry.Close() }
Output:
Example (Inout) ¶
package main import ( "fmt" "time" "github.com/sterlingdevils/gobase" "github.com/sterlingdevils/pipelines" ) type rptKeyType uint64 type rptDataType []byte type Obj struct { Sn rptKeyType Data rptDataType } func (o *Obj) Key() rptKeyType { return o.Sn } func NewObj(timeout time.Duration) (*Obj, error) { return &Obj{}, nil } func main() { sn := (&gobase.SerialNum{}).New() retry := pipelines.RetryPipe[rptKeyType, *Obj]{}.New() go func() { for o := range retry.OutChan() { fmt.Println(o.Key()) } }() for i := 0; i < 10; i++ { o, _ := NewObj(2 * time.Second) o.Sn = rptKeyType(sn.Next()) retry.InChan() <- o } time.Sleep(4 * time.Second) retry.Close() }
Output: 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9
Example (Pointercheck) ¶
package main import ( "fmt" "time" "github.com/sterlingdevils/pipelines" ) type rptKeyType uint64 type rptDataType []byte type Obj struct { Sn rptKeyType Data rptDataType } func (o *Obj) Key() rptKeyType { return o.Sn } func NewObj(timeout time.Duration) (*Obj, error) { return &Obj{}, nil } func main() { retry := pipelines.RetryPipe[rptKeyType, *Obj]{}.New() // Check that we are passing pointer o, _ := NewObj(5 * time.Second) retry.InChan() <- o o.Sn = 5 go func() { for o := range retry.OutChan() { fmt.Println(o.Key()) } }() retry.InChan() <- o time.Sleep(2 * time.Second) retry.Close() }
Output: 5 5
func (*RetryPipe[_, _]) Close ¶ added in v0.1.0
func (r *RetryPipe[_, _]) Close()
Close us
Example ¶
package main import ( "github.com/sterlingdevils/pipelines" ) type rptKeyType uint64 type rptDataType []byte type Obj struct { Sn rptKeyType Data rptDataType } func (o *Obj) Key() rptKeyType { return o.Sn } func main() { retry := pipelines.RetryPipe[rptKeyType, *Obj]{}.New() retry.Close() }
Output:
func (RetryPipe[K, T]) NewWithChannel ¶ added in v0.1.0
New with input channel
func (RetryPipe[K, T]) NewWithPipeline ¶ added in v0.1.0
New with pipeline
func (RetryPipe[_, T]) PipelineChan ¶ added in v0.1.0
func (r RetryPipe[_, T]) PipelineChan() chan T
PipelineChan returns a R/W channel that is used for pipelining
type RetryThing ¶ added in v0.1.0
type RetryThing[K comparable, T Retryable[K]] struct { LastRetry time.Time // contains filtered or unexported fields }
func (RetryThing[_, _]) Created ¶ added in v0.1.0
func (p RetryThing[_, _]) Created() time.Time
func (RetryThing[K, _]) Key ¶ added in v0.1.0
func (p RetryThing[K, _]) Key() K
func (RetryThing[K, T]) New ¶ added in v0.1.0
func (RetryThing[K, T]) New(k K, t T) *RetryThing[K, T]
func (RetryThing[_, T]) Thing ¶ added in v0.1.0
func (p RetryThing[_, T]) Thing() T
type Retryable ¶ added in v0.1.0
type Retryable[K comparable] interface { Keyer[K] }
type ThrottlePipe ¶ added in v0.1.1
type ThrottlePipe[T any] struct { // contains filtered or unexported fields }
func (ThrottlePipe[T]) AddTok ¶ added in v0.1.1
func (b ThrottlePipe[T]) AddTok() chan<- uint64
Returns a write channel used for adding to the number of tokens
func (ThrottlePipe[T]) InChan ¶ added in v0.1.1
func (b ThrottlePipe[T]) InChan() chan<- T
PipelineChan returns a R/W channel that is used for pipelining
func (ThrottlePipe[T]) New ¶ added in v0.1.1
func (b ThrottlePipe[T]) New() *ThrottlePipe[T]
New creates a new logger name is used to put unique label on each log
Example ¶
package main import ( "errors" "fmt" "log" "time" "github.com/sterlingdevils/pipelines" ) func writePipe(p *pipelines.ThrottlePipe[int]) { for i := 0; i < 10; i++ { p.InChan() <- i } } func main() { log.Printf("Starting test\n") throt := pipelines.ThrottlePipe[int]{}.New() conv := pipelines.ConverterPipe[int, int]{}.NewWithPipeline(throt, func(i int) (int, error) { fmt.Printf("Reading: %v\n", i) return 0, errors.New("This is not an error") }) nullpipe := pipelines.NullConsumePipe[int]{}.NewWithPipeline(conv) defer nullpipe.Close() go writePipe(throt) time.Sleep(2 * time.Second) throt.SetTok() <- 5 throt.AddTok() <- 2 time.Sleep(2 * time.Second) throt.SetTok() <- 2 time.Sleep(1 * time.Second) throt.SetTok() <- 3 time.Sleep(1 * time.Second) }
Output: Reading: 0 Reading: 1 Reading: 2 Reading: 3 Reading: 4 Reading: 5 Reading: 6 Reading: 7 Reading: 8 Reading: 9
func (ThrottlePipe[T]) NewWithChannel ¶ added in v0.1.1
func (ThrottlePipe[T]) NewWithChannel(in chan T) *ThrottlePipe[T]
func (ThrottlePipe[T]) NewWithPipeline ¶ added in v0.1.1
func (b ThrottlePipe[T]) NewWithPipeline(p Pipeline[T]) *ThrottlePipe[T]
func (ThrottlePipe[T]) OutChan ¶ added in v0.1.1
func (b ThrottlePipe[T]) OutChan() <-chan T
PipelineChan returns a R/W channel that is used for pipelining
func (ThrottlePipe[T]) PipelineChan ¶ added in v0.1.1
func (b ThrottlePipe[T]) PipelineChan() chan T
PipelineChan returns a R/W channel that is used for pipelining
func (ThrottlePipe[T]) SetTok ¶ added in v0.1.1
func (b ThrottlePipe[T]) SetTok() chan<- uint64
Returns a write channel used for setting the number of tokens
type TypeConverterPipe ¶ added in v0.1.0
Example ¶
package main import ( "fmt" "reflect" "github.com/sterlingdevils/pipelines" ) type Iface interface { Boo() int } type tctA struct{} func (a tctA) Boo() int { return 2 } func main() { cvt := pipelines.TypeConverterPipe[tctA, Iface]{}.New() cvt.InChan() <- tctA{} var o Iface = <-cvt.OutChan() fmt.Println(o.Boo(), reflect.TypeOf(o)) }
Output: 2 pipelines_test.tctA
func (*TypeConverterPipe[_, _]) Close ¶ added in v0.1.0
func (c *TypeConverterPipe[_, _]) Close()
Close
func (TypeConverterPipe[I, O]) InChan ¶ added in v0.1.0
func (c TypeConverterPipe[I, O]) InChan() chan<- I
InChan
func (TypeConverterPipe[I, O]) New ¶ added in v0.1.0
func (t TypeConverterPipe[I, O]) New() *TypeConverterPipe[I, O]
func (TypeConverterPipe[I, O]) NewWithChannel ¶ added in v0.1.0
func (TypeConverterPipe[I, O]) NewWithChannel(in chan I) *TypeConverterPipe[I, O]
func (TypeConverterPipe[I, O]) NewWithPipeline ¶ added in v0.1.0
func (t TypeConverterPipe[I, O]) NewWithPipeline(p Pipeline[I]) *TypeConverterPipe[I, O]
func (TypeConverterPipe[I, O]) OutChan ¶ added in v0.1.0
func (c TypeConverterPipe[I, O]) OutChan() <-chan O
OutChan
func (TypeConverterPipe[_, O]) PipelineChan ¶ added in v0.1.0
func (c TypeConverterPipe[_, O]) PipelineChan() chan O
PipelineChan returns a R/W channel that is used for pipelining
type UDPPipe ¶ added in v0.1.0
type UDPPipe struct {
// contains filtered or unexported fields
}
UDPPipe holds our private data for the component
func (*UDPPipe) Close ¶ added in v0.1.0
func (u *UDPPipe) Close()
Close will shutdown the output channel and cancel the context for the listen
func (UDPPipe) InChan ¶ added in v0.1.0
func (u UDPPipe) InChan() chan<- Packetable
InChan returns a write only channel that the incomming packets will be read from
func (UDPPipe) New ¶ added in v0.1.0
New will create a UDP component with little fuss for the caller it takes just a port. It will always setup a SERVER mode component
Example ¶
package main import ( "fmt" "net" "github.com/sterlingdevils/pipelines" ) const TESTPORT = 9092 func main() { udpcomp, err := pipelines.UDPPipe{}.New(TESTPORT) if err != nil { fmt.Printf("failed to create udp component") } // Send a Packet udpcomp.InChan() <- &pipelines.Packet{Addr: net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: TESTPORT}, DataSlice: []byte("Hello from Us.")} // Receive a Packet and Display it p := <-udpcomp.OutChan() fmt.Printf("%v: %v\n", p.Address(), p.Data()) }
Output: {127.0.0.1 9092 }: [72 101 108 108 111 32 102 114 111 109 32 85 115 46]
func (UDPPipe) NewWithChan ¶ added in v0.1.0
func (u UDPPipe) NewWithChan(port int, in chan Packetable) (*UDPPipe, error)
NewwithChan will create a UDP component with little fuss for the caller it takes just a port and input channel. It will always setup a SERVER mode component
func (UDPPipe) NewWithParams ¶ added in v0.1.0
func (u UDPPipe) NewWithParams(in1 chan Packetable, addr string, ct ConnType, outChanSize int) (*UDPPipe, error)
NewWithParams will return a UDP connection component, it can be setup with as a Server to listen for incomming connections, or a client to connect out to a server. After that client and server mode work the same. Either way it will read from in channel and then send the packet, and it will listen for incomming packets on the socket and put them onto the output channel
This code uses the waitgoup and will add 1 for each routine it starts. The Close method needs to be called so we stop all our routines
NOTE: The input channel we will not close, we assume we do not own it
Example ¶
Example of how to create,receive and send packets
This will create a UDP component and then send a packet, receive the udp, then display it, and check the display is correct.
package main import ( "fmt" "log" "net" "time" "github.com/sterlingdevils/pipelines" ) const TESTPORT = 9092 func main() { in := make(chan pipelines.Packetable, 1) // Must pass in the input channel as we dont assume we own it udpcomp, err := pipelines.UDPPipe{}.NewWithParams(in, ":9092", pipelines.SERVER, 1) if err != nil { log.Fatalln("error creating UDP") } // Wait for 1 second, then send a packet to our self, and display it, exit loopexit: for { select { case <-time.After(time.Second * 1): in <- &pipelines.Packet{Addr: net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: TESTPORT}, DataSlice: []byte("Hello from Us.")} case p := <-udpcomp.OutChan(): fmt.Printf("%v: %v\n", p.Address(), p.Data()) break loopexit } } // Test that when we close we will release the waitgroup udpcomp.Close() // Close the input channel so we stop reading close(in) }
Output: {127.0.0.1 9092 }: [72 101 108 108 111 32 102 114 111 109 32 85 115 46]
func (UDPPipe) NewWithPipeline ¶ added in v0.1.0
NewWithPipeline takes a pipelineable
func (UDPPipe) OutChan ¶ added in v0.1.0
func (u UDPPipe) OutChan() <-chan Packetable
OutChan returns a read only output channel that the incomming UDP packets will be placed onto
func (UDPPipe) PipelineChan ¶ added in v0.1.0
func (u UDPPipe) PipelineChan() chan Packetable
PipelineChan returns a R/W channel that is used for pipelining