intracom

package module
v1.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2024 License: MIT Imports: 3 Imported by: 1

README

intracom

codecov

NOTE: Intracom went through a big refactor from using mutex locks to not using locks. It should still be used with caution and treated as an alpha project. A few basic examples have been provided in the examples/ folder

Small pub/sub library to perform intra-communication within a go program and its routines. It is a wrapper around go channels and uses no mutex locks.

Under the hood

Intracom starts its own routines to handle Subscribe, unsubscribe (closure), Register, and unregister (closure) calls as if they were requests. They go down a "requests" channel where a routine in charge of receiving those requests manages its own local state of the publisher/subscriber channels.

Any new registered topic is given its own routine to handle new subscribe/unsubscribe requests as well as publishing. Only NEW subscribes and first-time unsubscribes will interrupt the "broadcaster" doing the publishing only within this specific topic. This means you can register many topics and slow publishers and/or slow subscribers do not impede the broadcasts being done across other topics.

Though a drawback of this can be the subscribers within the same topic CAN impede each other from receiving messages usually due to the buffer filling up and backpressuring against the publisher. But this can be somewhat controlled by using different buffer policies. A BufferPolicy is effectively you choosing how the broadcaster of the topic will react to a given subscriber when it reaches a full buffer for that specific subscribers channel. Using DropNone or a Drop with timeout policy on any consumer of a topic can cause the next broadcast to delay. So keep this in mind when choosing a buffer policy.

Things to Know + Gotchas

Closing early

Calling .Close() on the intracom instance should ALWAYS be the last thing you do, it is your clean up call. Intracom instance will no longer be usable once you call it. This means it will likely exist at the top level, maybe even the main routine of your program. The intracom instance can be shared to many go routines for use but while keeping these in mind.

  1. Whoever creates the Intracom instance should be the who calls .Start() and the one who calls .Close()
  2. Because of #1, .Start() and .Close() are not thread-safe.
  3. If you do .Close() too early and have a late unsubscribe or unregister follow that up, this would cause a panic on a closed channel but the bound function call just performed has a panic recovery implemented that will log at the ERROR level.
  4. If you attempt to .Register() or .Subscribe() after a .Close() has been initiated, you will experience a panic. This is expected as you should NOT be trying to continue using an unusable intracom instance.
Duplicate Register calls

Calling .Register(<your-topic>) against the same topic beyond the first time will always hand you back a reference to the already existing publishing channel as well as a callable function bound to that topic that when called will unregister that topic. Unregistering a topic will close all subscriber channels to that topic.

Duplicate Subscription calls

Calling .Subscribe(<your-subscriber-config>) with the same config beyond the first time will always hand you back a reference to the already existing go channel as well as a callable function bound to that subscriber topic and consumer name that when called will unsubscribe that topic. Unsubscribing will close the specific subscriber channel for that consumer.

Early Subscribers

Calling .Subscribe(<your-subscriber-config>) before a topic is registered will trigger a register to happen and immediately attach your subscriber to that topic. This helps when launching publishers and subscribers between separate go routines. Behind the scenes when .Register(<topic>) is called late it will re-act the same way as calling Register duplicate times. You will be given the already created publishing channel that was created during the early subscribers triggered register.

Late Subscribers

Late subscribers to a topic will always be given the LAST message published to the topic (if any have been published). So when joining as a new consumer you can be sure if there were any previous publishes, you will at least get the most recent published message.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BufferPolicy added in v1.1.0

type BufferPolicy int
const (
	DropNone BufferPolicy = iota
	DropOldest
	DropOldestAfterTimeout
	DropNewest
	DropNewestAfterTimeout
)

type Intracom

type Intracom[T any] struct {
	// contains filtered or unexported fields
}

Intracom is an in-memory pub/sub wrapper to enable communication between routines.

func New

func New[T any](name string) *Intracom[T]

New returns a new Intracom instance.

func (*Intracom[T]) Close

func (i *Intracom[T]) Close() error

Close will shutdown the Intracom instance and all of its subscriptions. Intracom instance will no longer be usable after calling this function. This function is not safe to call from multiple routines and should only be called once from the parent thread designated to manage the Intracom instance.

NOTE: Calling Register() or Subscribe() after calling Close will panic.

Returns: - error: nil if successful, error if not.

func (*Intracom[T]) Register added in v1.1.0

func (i *Intracom[T]) Register(topic string) (chan<- T, func())

Register will register a topic with the Intracom instance. It is safe to call this function multiple times for the same topic. If the topic already exists, this function will return the existing publisher channel.

Parameters: - topic: name of the topic to register

Returns: - publishC: the channel used to publish messages to the topic - unregister: a function bound to this topic that can be used to unregister the topic

func (*Intracom[T]) SetLogHandler added in v1.1.0

func (i *Intracom[T]) SetLogHandler(handler slog.Handler)

SetLogHandler will set the logger used by the Intracom instance. If this function is not called, the slog Default() log handler will be used. NOTE: This function must be called before intracom.Start()

Parameters: - handler: a slog.Handler interface to use for logging

func (*Intracom[T]) Start added in v1.1.0

func (i *Intracom[T]) Start() error

Start will enable the broker to start processing requests. If the broker is already running, this function will return an error.

NOTE: This function must be called before Register, Subscribe, or Close. This function is not thread-safe and should only be called once from the parent thread designated to manage the Intracom instance.

func (*Intracom[T]) Subscribe

func (i *Intracom[T]) Subscribe(conf SubscriberConfig) (<-chan T, func())

Subscribe will be used to subscribe to a topic. It is safe to call this function multiple times from multiple routines.

* If the topic does not exist, it will be automatically registered. * If the consumer group already exists, the existing channel will be returned. * If the consumer group does not exist, it will be created.

Parameters:

  • conf: a pointer to a SubscriberConfig struct, a nil pointer will use default values (not recommended). Default values: -- Topic: "" -- ConsumerGroup: "" -- BufferSize: 1 -- BufferPolicy: DropNone

Returns: - ch: the channel used to receive messages from the topic - unsubscribe: a function bound to this subscription that can be used to unsubscribe the consumer group

type SubscriberConfig added in v1.1.0

type SubscriberConfig struct {
	Topic         string
	ConsumerGroup string
	BufferSize    int
	BufferPolicy  BufferPolicy
	DropTimeout   time.Duration
}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL