notificationcenter

package module
v2.3.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2024 License: Apache-2.0 Imports: 9 Imported by: 13

README

Notificationcenter Pub/Sub Library

Build Status Go Report Card GoDoc Coverage Status

License: Apache 2.0

The NotificationCenter library provides a unified interface for publish/subscribe (pub/sub) messaging in Go applications. It simplifies asynchronous communication between services in serverless and microservices architectures by abstracting the complexities of various message brokers.

With NotificationCenter, you can seamlessly integrate different pub/sub backends like Kafka, NATS, Redis, PostgreSQL, and more without altering your application logic. This promotes decoupled architectures, enhancing performance, reliability, and scalability.

Table of Contents

Features

  • Unified Interface: Interact with multiple pub/sub backends using a consistent API.
  • Easy Integration: Quickly set up publishers and subscribers with minimal configuration.
  • Backend Flexibility: Swap out message brokers without changing your application code.
  • Event-Driven Architecture: Facilitate loosely coupled communication between services.
  • Scalability: Improve performance and reliability by decoupling application components.

Supported Modules

Installation

Install the library using go get:

go get github.com/geniusrabbit/notificationcenter/v2

Usage Examples

Below are basic examples demonstrating how to use NotificationCenter in your Go application.

Import the Package
import (
  nc "github.com/geniusrabbit/notificationcenter/v2"
)
Create a Publisher

Create a new publisher using one of the supported backends. For example, using NATS:

import (
  "github.com/geniusrabbit/notificationcenter/v2/nats"
  "log"
)

// Create a new NATS publisher
eventStream, err := nats.NewPublisher(
  nats.WithNatsURL("nats://hostname:4222"),
)
if err != nil {
  log.Fatal(err)
}

// Register the publisher with NotificationCenter
err = nc.Register("events", eventStream)
if err != nil {
  log.Fatal(err)
}
Publish Messages

You can publish messages using global functions or by obtaining a publisher interface.

Using Global Functions:

import (
  "context"
)

// Define your message structure
type Message struct {
  Title string
}

// Publish a message globally
nc.Publish(context.Background(), "events", Message{Title: "Event 1"})

Using Publisher Interface:

// Get the publisher interface
eventsPublisher := nc.Publisher("events")

// Publish a message
eventsPublisher.Publish(context.Background(), Message{Title: "Event 2"})
Subscribe to Messages

Create a subscriber and register it with NotificationCenter.

import (
  "context"
  "fmt"
  "github.com/geniusrabbit/notificationcenter/v2"
  "github.com/geniusrabbit/notificationcenter/v2/nats"
  "github.com/geniusrabbit/notificationcenter/v2/interval"
  "time"
)

func main() {
  ctx := context.Background()

  // Create a NATS subscriber
  eventsSubscriber := nats.MustNewSubscriber(
    nats.WithTopics("events"),
    nats.WithNatsURL("nats://hostname:4222"),
    nats.WithGroupName("group"),
  )
  nc.Register("events", eventsSubscriber)

  // Optional: Create a time interval subscriber (e.g., for periodic tasks)
  refreshSubscriber := interval.NewSubscriber(5 * time.Minute)
  nc.Register("refresh", refreshSubscriber)

  // Subscribe to the "events" stream
  nc.Subscribe("events", func(msg nc.Message) error {
    // Process the received message
    fmt.Printf("Received message: %v\n", msg.Data())

    // Acknowledge the message if necessary
    return msg.Ack()
  })

  // Subscribe to the "refresh" stream for periodic tasks
  nc.Subscribe("refresh", func(msg nc.Message) error {
    // Perform your periodic task here
    fmt.Println("Performing periodic refresh")
    return msg.Ack()
  })

  // Start listening for messages
  nc.Listen(ctx)
}

TODO

  • Add support for Amazon SQS
  • Add support for Redis queue
  • Add support for RabbitMQ
  • Add support for MySQL notifications
  • Add support for PostgreSQL notifications
  • Remove deprecated metrics from the queue
  • Add support for NATS & NATS Streaming
  • Add support for Kafka queue
  • Add support for native Go channels
  • Add support for Time Interval Execution

License

NotificationCenter is licensed under the Apache 2.0 License.


By using NotificationCenter, you can focus on building the core functionality of your application without worrying about the intricacies of different messaging infrastructures. Feel free to contribute to the project or report any issues you encounter.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidRegisterParameter     = errors.New(`invalid register parameter`)
	ErrUndefinedPublisherInterface  = errors.New(`undefined publisher interface`)
	ErrUndefinedSubscriberInterface = errors.New(`undefined subscriber interface`)
	ErrInterfaceAlreadySubscribed   = errors.New("[notificationcenter] interface already subscribed")
)

Error list...

View Source
var DefaultRegistry = NewRegistry()

DefaultRegistry is global registry

Functions

func Close

func Close() error

Close notification center

func Listen

func Listen(ctx context.Context) error

Listen runs subscribers listen interface

func OnClose

func OnClose() <-chan bool

OnClose event will be executed only after closing all interfaces

Usecases in the application makes subsribing for the finishing event very convinient

```go

func myDatabaseObserver() {
  <- notificationcenter.OnClose()
  // ... Do something
}

```

func Publish

func Publish(ctx context.Context, name string, messages ...any) error

Publish one or more messages to the pub-service

func Register

func Register(params ...any) error

Register one or more Publisher or Subscriber services. As input parameters must be order of parameters {Name, interface}

Example: ```

nc.Register(
  "events", kafka.MustNewSubscriber(),
  "notifications", nats.MustNewSubscriber(),
)

```

func Subscribe

func Subscribe(ctx context.Context, name string, receiver any) error

Subscribe new handler on some particular subscriber interface by name

Types

type ErrorHandler

type ErrorHandler func(msg Message, err error)

ErrorHandler type to process error value

type FuncPublisher

type FuncPublisher func(context.Context, ...any) error

FuncPublisher provides custom function wrapper for the custom publisher processor

func (FuncPublisher) Publish

func (f FuncPublisher) Publish(ctx context.Context, messages ...any) error

Publish method call the original custom publisher function

type FuncReceiver

type FuncReceiver func(msg Message) error

FuncReceiver implements Receiver interface for a single function

func (FuncReceiver) Receive

func (f FuncReceiver) Receive(msg Message) error

Receive message from sub-service to process it with function

type Message

type Message interface {
	// Context of the message
	Context() context.Context

	// Unical message ID (depends on transport)
	ID() string

	// Body returns message data as bytes
	Body() []byte

	// Acknowledgment of the message processing
	Ack() error
}

Message describes the access methods to the message original object

type ModelSubscriber

type ModelSubscriber struct {

	// Error handler pointer
	ErrorHandler ErrorHandler

	// Panic handler pointer
	PanicHandler PanicHandler
	// contains filtered or unexported fields
}

ModelSubscriber provedes subscibe functionality implementation

func (*ModelSubscriber) Close

func (s *ModelSubscriber) Close() error

Close all receivers if supports io.Closer interface

func (*ModelSubscriber) ProcessMessage

func (s *ModelSubscriber) ProcessMessage(msg Message) error

ProcessMessage by all receivers

func (*ModelSubscriber) Subscribe

func (s *ModelSubscriber) Subscribe(ctx context.Context, receiver Receiver) error

Subscribe new receiver to receive messages from the subsribtion

type MultiPublisher

type MultiPublisher []Publisher

MultiPublisher wrapper

func (MultiPublisher) Publish

func (p MultiPublisher) Publish(ctx context.Context, messages ...any) error

Publish one or more messages to the banch of pub-services

type PanicHandler

type PanicHandler func(msg Message, recoverData any)

PanicHandler type to process panic action

type Publisher

type Publisher interface {
	// Publish one or more messages to the pub-service
	Publish(ctx context.Context, messages ...any) error
}

Publisher pipeline base declaration

func PublisherByName

func PublisherByName(name string) Publisher

PublisherByName returns pub interface by name if exists or Nil otherwise

type Receiver

type Receiver interface {
	Receive(msg Message) error
}

Receiver describe interface of message processing

func ExtFuncReceiver

func ExtFuncReceiver(f any, decs ...decoder.Decoder) Receiver

ExtFuncReceiver wraps function argument with arbitrary input data type

func ReceiverFrom

func ReceiverFrom(handler any) Receiver

ReceiverFrom converts income handler type to Receiver interface

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry provides functionality of access to pub/sub interfaces by string names.

func NewRegistry

func NewRegistry() *Registry

NewRegistry init new registry object

func (*Registry) Close

func (r *Registry) Close() error

Close notification center

func (*Registry) Listen

func (r *Registry) Listen(ctx context.Context) (err error)

Listen runs subscribers listen interface

func (*Registry) OnClose

func (r *Registry) OnClose() <-chan bool

OnClose event will be executed only after closing all interfaces

Usecases in the application makes subsribing for the finishing event very convinient

```go

func myDatabaseObserver() {
  <- notificationcenter.OnClose()
  // ... Do something
}

```

func (*Registry) Publish

func (r *Registry) Publish(ctx context.Context, name string, messages ...any) error

Publish one or more messages to the pub-service

func (*Registry) Publisher

func (r *Registry) Publisher(name string) Publisher

Publisher returns pub interface by name if exists or Nil otherwise

func (*Registry) Register

func (r *Registry) Register(params ...any) error

Register one or more Publisher or Subscriber services. As input parameters must be order of parameters {Name, interface}

Example: ```

nc.Register(
  "events", kafka.MustNewSubscriber(),
  "notifications", nats.MustNewSubscriber(),
)

```

func (*Registry) Subscribe

func (r *Registry) Subscribe(ctx context.Context, name string, receiver any) error

Subscribe new handler on some particular subscriber interface by name

func (*Registry) Subscriber

func (r *Registry) Subscriber(name string) Subscriber

Subscriber returns sub interface by name if exists or Nil otherwise

type Subscriber

type Subscriber interface {
	io.Closer

	// Subscribe new receiver to receive messages from the subsribtion
	Subscribe(ctx context.Context, receiver Receiver) error

	// Start processing queue
	Listen(ctx context.Context) error
}

Subscriber provides methods of working with subscription

func SubscriberByName

func SubscriberByName(name string) Subscriber

SubscriberByName returns sub interface by name if exists or Nil otherwise

Directories

Path Synopsis
internal
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
Package pg provides posibility to subscribe to internal postgres events.
Package pg provides posibility to subscribe to internal postgres events.
wrappers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL