buffer

package
v0.1.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2024 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DiscardAnyMessage

func DiscardAnyMessage[T any](receiver Receiver[T])

DiscardAnyMessage is a function that discards all messages from a receiver.

Parameters:

  • receiver: The receiver of messages.

Behaviors:

  • Use go DiscardAnyMessage(receiver) to discard all messages from the receiver.

Types

type Buffer

type Buffer[T any] struct {
	// contains filtered or unexported fields
}

Buffer is a thread-safe, generic data structure that allows multiple goroutines to produce and consume elements in a synchronized manner. It is implemented as a queue and uses channels to synchronize the goroutines. The Buffer should be initialized with the Init method before use.

func NewBuffer

func NewBuffer[T any]() *Buffer[T]

NewBuffer creates a new Buffer instance.

Parameters:

  • bufferSize: The size of the buffer for the send and receive channels. Must be a non-negative integer. If a negative integer is provided, the method will panic with an *ers.InvalidParameterError.

Returns:

  • *Buffer: A pointer to the newly created Buffer instance.
  • error: An error of type *ers.InvalidParameterError if the buffer size is negative.

Information: To close the buffer, just close the send-only channel. Once that is done, a cascade of events will happen:

  • The goroutine that listens for incoming messages will stop listening and exit.
  • The goroutine that sends messages from the Buffer to the receive channel will stop sending messages once the Buffer is empty, and then exit.
  • The Buffer will be cleaned up.

Of course, a Close method is also provided to manually close the Buffer but it is not necessary to call it if the send-only channel is closed.

func (*Buffer[T]) CleanBuffer

func (b *Buffer[T]) CleanBuffer()

CleanBuffer removes all elements from the Buffer, effectively resetting it to an empty state. Precalculated elements are kept as they are no longer in the buffer but in the channel. It locks the firstMutex to ensure thread-safety during the operation.

This method is safe for concurrent use by multiple goroutines.

func (*Buffer[T]) Close

func (b *Buffer[T]) Close()

Close implements the Runner interface.

func (*Buffer[T]) IsClosed

func (b *Buffer[T]) IsClosed() bool

IsClosed implements the Runner interface.

func (*Buffer[T]) Receive

func (b *Buffer[T]) Receive() (T, bool)

Receive implements the Receiver interface.

func (*Buffer[T]) Send

func (b *Buffer[T]) Send(msg T) bool

Send implements the Sender interface.

func (*Buffer[T]) Start

func (b *Buffer[T]) Start()

Start implements the Runner interface.

type BufferCondition

type BufferCondition int

BufferCondition is an enumeration of the possible conditions of the Buffer.

const (
	// IsEmpty indicates that the Buffer is empty.
	IsEmpty BufferCondition = iota

	// IsRunning indicates that the Buffer is running.
	IsRunning
)

func (BufferCondition) String

func (bc BufferCondition) String() string

String implements Common.Enum interface.

type ChannelThrough

type ChannelThrough[T any] struct {
	// contains filtered or unexported fields
}

ChannelThrough is a type of buffer that sends messages from multiple receivers to a single sender.

func NewChannelThrough

func NewChannelThrough[T any](sender olru.SenderRunner[T], receivers ...Receiver[T]) *ChannelThrough[T]

NewChannelThrough creates a new channel through buffer.

Parameters:

  • sender: The sender of messages.
  • receivers: The receivers of messages.

Returns:

  • *ChannelThrough: The new channel through buffer.

Behaviors:

  • It ignores nil receivers.
  • If the sender is nil, it will discard all messages from the receivers.
  • If no receivers are provided, the buffer will close immediately.
  • Because it closes automatically, there is no Close() method. Thus, if all receivers are closed, the handler will close the sender in a cascading manner.

func (*ChannelThrough[T]) IsRunning

func (ct *ChannelThrough[T]) IsRunning() bool

IsRunning is a method that returns true if the handler is running.

Returns:

  • bool: True if the handler is running, false otherwise.

func (*ChannelThrough[T]) Run

func (ct *ChannelThrough[T]) Run()

Run is a method that runs the handler.

type Debugger

type Debugger struct {
	// contains filtered or unexported fields
}

Debugger is a struct that provides a way to print debug messages.

func NewDebugger

func NewDebugger(logger *log.Logger) *Debugger

NewDebugger is a function that creates a new debugger.

Parameters:

  • logger: The logger to use.

Returns:

  • *Debugger: The new debugger.

func (*Debugger) Close

func (d *Debugger) Close()

Close implements the Runner interface.

func (*Debugger) GetDebugMode

func (d *Debugger) GetDebugMode() bool

GetDebugMode is a function that returns the debug mode.

Returns:

  • bool: The debug mode.

func (*Debugger) IsClosed

func (d *Debugger) IsClosed() bool

IsClosed returns true if the runner is closed, false otherwise.

func (*Debugger) Printf

func (d *Debugger) Printf(format string, v ...interface{})

Printf is a function that prints formatted text.

'\n' is always appended to the end of the format string.

Parameters:

  • format: The format string.
  • v: The values to print.

func (*Debugger) Println

func (d *Debugger) Println(v ...interface{})

Println is a function that prints a line.

Parameters:

  • v: The values to print.

func (*Debugger) Start

func (d *Debugger) Start()

Start implements the Runner interface.

func (*Debugger) ToggleDebugMode

func (d *Debugger) ToggleDebugMode(active bool)

ToggleDebugMode is a function that toggles the debug mode.

Parameters:

  • active: The flag to set the debug mode.

func (*Debugger) Write

func (d *Debugger) Write(p []byte) (n int, err error)

Write is a function that writes to the debugger.

'\n' is always appended to the end of the bytes.

Parameters:

  • p: The bytes to write.

Returns:

  • int: Always the length of the bytes.
  • error: Always nil.

type IsEmptyObserver

type IsEmptyObserver struct {
	// contains filtered or unexported fields
}

IsEmptyObserver is an observer for the size of the queue.

func NewIsEmptyObserver

func NewIsEmptyObserver(action func(int)) (*IsEmptyObserver, error)

NewIsEmptyObserver creates a new IsEmptyObserver instance.

Parameters:

  • action: The function to be called when the size changes.

Returns:

  • *IsEmptyObserver: A pointer to the newly created IsEmptyObserver.
  • error: An error of type *uc.ErrInvalidParameter if action is nil.

func (*IsEmptyObserver) Notify

func (o *IsEmptyObserver) Notify(size int)

Notify implements the rws.Observer interface.

type Receiver

type Receiver[T any] interface {
	// Receive receives a message from the Buffer.
	//
	// Returns:
	//   - T: The message received.
	//   - bool: False if the Buffer is closed, true otherwise.
	Receive() (T, bool)
}

Receiver is the interface that wraps the Receive method.

type Redirect

type Redirect[T any] struct {
	// contains filtered or unexported fields
}

Redirect is a handler that redirects messages from a receiver to multiple senders.

func NewRedirect

func NewRedirect[T any](receiver Receiver[T], senders ...olru.SenderRunner[T]) *Redirect[T]

NewRedirect creates a new redirect handler.

Parameters:

  • receiver: The receiver of messages.
  • senders: The senders of messages.

Returns:

  • *Redirect: The new redirect handler.

Behaviors:

  • It ignores nil senders.
  • Because it closes automatically, there is no Close() method. Thus, if the receiver is closed, the handler will close all senders in a cascading manner.
  • If no senders are provided, the handler will discard all messages from the receiver.

func (*Redirect[T]) IsRunning

func (r *Redirect[T]) IsRunning() bool

IsRunning is a method that returns true if the handler is running.

Returns:

  • bool: True if the handler is running, false otherwise.

func (*Redirect[T]) Run

func (r *Redirect[T]) Run()

Run is a method that runs the handler.

type SafeQueue

type SafeQueue[T any] struct {
	// contains filtered or unexported fields
}

SafeQueue is a generic type that represents a thread-safe queue data structure with or without a limited capacity, implemented using a linked list.

func NewSafeQueue

func NewSafeQueue[T any]() *SafeQueue[T]

NewSafeQueue is a function that creates and returns a new instance of a SafeQueue.

Return:

  • *SafeQueue[T]: A pointer to the newly created SafeQueue. Never returns nil.

func (*SafeQueue[T]) Capacity

func (queue *SafeQueue[T]) Capacity() int

Capacity implements the Queuer interface.

Always returns -1.

func (*SafeQueue[T]) Clear

func (queue *SafeQueue[T]) Clear()

Clear implements the Queuer interface.

func (*SafeQueue[T]) Copy

func (queue *SafeQueue[T]) Copy() *SafeQueue[T]

Copy implements the Queuer interface.

func (*SafeQueue[T]) Dequeue

func (queue *SafeQueue[T]) Dequeue() (T, bool)

Dequeue implements the Queuer interface.

func (*SafeQueue[T]) Enqueue

func (queue *SafeQueue[T]) Enqueue(value T) bool

Enqueue implements the Queuer interface.

Always returns true.

func (*SafeQueue[T]) EnqueueMany

func (queue *SafeQueue[T]) EnqueueMany(values []T) int

Enqueue implements the Queuer interface.

Always returns true.

func (*SafeQueue[T]) GoString

func (queue *SafeQueue[T]) GoString() string

GoString implements the fmt.GoStringer interface.

func (*SafeQueue[T]) IsEmpty

func (queue *SafeQueue[T]) IsEmpty() bool

IsEmpty implements the Queuer interface.

func (*SafeQueue[T]) IsFull

func (queue *SafeQueue[T]) IsFull() bool

IsFull implements the Queuer interface.

Always returns false.

func (*SafeQueue[T]) ObserveSize

func (queue *SafeQueue[T]) ObserveSize(f func(size int))

ObserveSize adds an observer to the size of the queue.

Parameters:

  • f: The function to be called when the size changes.

If f is nil, the observer is removed.

func (*SafeQueue[T]) Peek

func (queue *SafeQueue[T]) Peek() (T, bool)

Peek implements the Queuer interface.

func (*SafeQueue[T]) Size

func (queue *SafeQueue[T]) Size() int

Size implements the Queuer interface.

func (*SafeQueue[T]) Slice

func (queue *SafeQueue[T]) Slice() []T

Slice implements the Queuer interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL