channelutil

package
v0.0.0-...-742bdff Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2024 License: MIT Imports: 4 Imported by: 0

README

channelutil

Channel utils provides a set of utilities for mux and demux channels specifically cloning and joining channels. This is useful when you want to send the same data to multiple channels or when you want to send data from multiple channels to a single channel.

Cloning

A Simple approach for cloning a channel is to send data received from a channel to multiple channels by looping over the channels and sending the data to each channel. This is a blocking operation and totally depends on what/How data of the channel is being consumed. if anyone of the channel is blocked for some reason then the other channels will also be blocked. and this happens often if the number of channels is large.

To overcome this CloneChannels implements a relay channels which receives data from a single channel and sends it to multiple channels (5 by default) in a non-blocking way using select statement. since select statement picks the channel that is ready to receive data, it is non-blocking and if data is sent to x channel data is put into buffer of other channels and auto drain is triggered when a buffer of particular channel is full.

		If sinks > 5
		relay channels are used that relay data from root node to leaf node (i.e in this case channel)

		1. sinks are grouped into 5 with 1 relay channel for each group
		2. Each group is passed to worker
		3. Relay are fed to Clone i.e Recursion
	
	
			Ex:
                                    $ 			 <-  Source Channel
	                          /   \
	                         $     $			 <-  Relay Channels
			        / \    / \
			       $   $  $   $		 <-  Leaf Channels (i.e Sinks)

		*Simplicity purpose 2 childs are shown for each nodebut each node(except root node) has 5 childs
Joining

Joining is the opposite of cloning. It receives data from multiple channels and sends it to a single channel and again simple approach is to loop over the channels and send data to the single channel but this is blocking if the single channel is blocked for some reason then all the channels will be blocked and this happens often if the number of channels is large. If channels are known in advance then select statement will be better and easier approach. But if number of channels are not known in advance or it is dynamic then JoinChannels is the way to go.

Go Standard library provides reflect.SelectDir for this purpose but it is known to be very slow and inefficient. JoinChannels implements a relay channel which receives data from multiple channels and sends it to a single channel in a non-blocking way using select statement when a source channel is completely drained it is set to nil and select statement will not pick that case again

	
		If sources > 5
		relay channels are used that relay data from leaf nodes to root node (i.e in this case channel)

		1. sources are grouped into 5 with 1 relay channel for each group
		2. Each group is passed to worker
		3. Relay are fed to Join i.e Recursion
	
	
		Ex:
			$   $ $   $		 <-  Leaf Channels (i.e Sources)
			 \ /   \ /
		          $  	$		 <-  Relay Channels
			   \   /
			     $           <- Sink Channel

		*Simplicity purpose 2 childs are shown for each node but each node has 5 childs
	
References

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateNChannels

func CreateNChannels[T any](count int, bufflen int) map[int]chan T

CreateNChannels creates and returns N channels

Types

type CloneChannels

type CloneChannels[T any] struct {
	Log *log.Logger
	// contains filtered or unexported fields
}

CloneChannels provides method to Clone channels

func NewCloneChannels

func NewCloneChannels[T any](opts *CloneOptions) *CloneChannels[T]

NewCloneChannels returns new instance of CloneChannels

func (*CloneChannels[T]) Clone

func (s *CloneChannels[T]) Clone(ctx context.Context, src chan T, sinks ...chan<- T) error

Clone takes data from source channel(src) and sends them to sinks(send only channel) without being totally unfair

func (*CloneChannels[T]) Wait

func (s *CloneChannels[T]) Wait()

Waits until cloning is finished

type CloneOptions

type CloneOptions struct {
	MaxDrain  int // Max buffers to drain at once(default 3)
	Threshold int // Threshold(default 5) is buffer length at which drains are activated
}

CloneOptions provides options for Cloning channels

type JoinChannels

type JoinChannels[T any] struct {
	Log *log.Logger
	// contains filtered or unexported fields
}

JoinChannels provides method to Join channels data obtained from multiple channels is sent to one channel this is useful when you have multiple sources and you want to send data to one channel

func (*JoinChannels[T]) Join

func (j *JoinChannels[T]) Join(ctx context.Context, sink chan T, sources ...<-chan T) error

JoinChannels Joins Many Channels to Create One

func (*JoinChannels[T]) Wait

func (j *JoinChannels[T]) Wait()

Wait

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL