ipc

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2024 License: MIT Imports: 7 Imported by: 0

README

IPC

Implements both the BUS and RPC messaging patterns for both inter-process and inner-process peers.

Features:

  • BUS
    • Publisher based on Nats MQ
    • Subscriber based on Nats MQ
    • Publisher based on inner-process EventBus
    • Subscriber based on inner-process EventBus
  • RPC
    • RPC Server based on Nats MQ
    • RPC Client based on Nats MQ
    • RPC Server based on inner-process moderator
    • RPC Client based on inner-process moderator

Examples:

package main

import (
	"fmt"
	"github.com/zourva/pareto/ipc"
	"time"
)

func main() {
	// TODO
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

type Bus interface {
	// Publish publishes data on the given topic.
	//  This method is goroutine-safe.
	Publish(topic string, data []byte) error

	// Subscribe subscribes data on the given topic by registering a callback.
	//  This method is goroutine-safe.
	Subscribe(topic string, fn Handler) error

	// SubscribeOnce calls Unsubscribe
	//  This method is goroutine-safe.
	SubscribeOnce(topic string, fn Handler) error

	// Unsubscribe removes handler registered for a topic.
	// Returns error if there are no handlers subscribed to the topic.
	//  This method is goroutine-safe.
	Unsubscribe(topic string, fn Handler) error
}

Bus provides a message bus and expose API using pub/sub pattern.

func NewBus

func NewBus(conf *BusConf) (Bus, error)

NewBus returns a new Bus endpoint and connects itself to the given broker. If conf is nil, bus endpoint will not be created.

func NewEventBus

func NewEventBus(conf *BusConf) (Bus, error)

func NewNatsBus

func NewNatsBus(conf *BusConf) (Bus, error)

NewNatsBus creates a Bus endpoint according to the conf.

Returns nil and any error when failed.

type BusConf

type BusConf struct {
	//Name of the bus, optional but recommended.
	Name string

	//Type defines broker type managing the bus.
	Type BusType

	//Broker is the address used as a mediator-pattern endpoint.
	Broker string
}

type BusType

type BusType int
const (
	InterProcBus BusType = iota + 1
	InnerProcBus
)

type CalleeHandler

type CalleeHandler = func(data []byte) ([]byte, error)

CalleeHandler abstracts the RPC server side universal handler.

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus - box for handlers and callbacks. based on https://github.com/asaskevich/EventBus/blob/master/event_bus.go

func (*EventBus) Publish

func (bus *EventBus) Publish(topic string, data []byte) error

Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.

func (*EventBus) Subscribe

func (bus *EventBus) Subscribe(topic string, fn Handler) error

Subscribe subscribes to a topic. Returns error if `fn` is not a function.

func (*EventBus) SubscribeOnce

func (bus *EventBus) SubscribeOnce(topic string, fn Handler) error

SubscribeOnce subscribes to a topic once. Handler will be removed after executing. Returns error if `fn` is not a function.

func (*EventBus) Unsubscribe

func (bus *EventBus) Unsubscribe(topic string, fn Handler) error

Unsubscribe removes callback defined for a topic. Returns error if there are no callbacks subscribed to the topic.

type Handler

type Handler func([]byte)

type InProcRPC

type InProcRPC struct {
	// contains filtered or unexported fields
}

InProcRPC implements RPC interface, including both server and client.

func (*InProcRPC) Call

func (r *InProcRPC) Call(name string, args ...interface{}) (reflect.Value, error)

Call calls an remote service identified by its name with the given args.

func (*InProcRPC) CallV2

func (r *InProcRPC) CallV2(name string, data []byte, timeout time.Duration) ([]byte, error)

func (*InProcRPC) Expose

func (r *InProcRPC) Expose(name string, fn interface{})

Expose exposes a service by associating a function handler.

func (*InProcRPC) ExposeV2

func (r *InProcRPC) ExposeV2(name string, handler CalleeHandler) error

type InProcRPCBroker

type InProcRPCBroker struct {
	sync.Mutex
	// contains filtered or unexported fields
}

InProcRPCBroker holds handlers of rpc service.

func (*InProcRPCBroker) Resolve

func (r *InProcRPCBroker) Resolve(name string, args []reflect.Value) (reflect.Value, error)

Resolve get registered handler of the given name. return nil if not found.

type Messager

type Messager struct {
	Bus
	RPC
}

Messager is a communication endpoint peer, which combines messaging patterns, namely Bus and RPC. It acts as an RPC server or client, a BUS subscriber or BUS publisher, or both.

func NewMessager

func NewMessager(conf *MessagerConf) (*Messager, error)

NewMessager creates a messager using the given config.

NOTE:
Bus endpoint will be created iff BusConf is not nil.
RPC channel will be created iff RPCConf is not nil.

Returns nil when both conf are nil.

type MessagerConf

type MessagerConf struct {
	BusConf *BusConf
	RpcConf *RPCConf
}

type NatsBus

type NatsBus struct {
	*nats.Conn
	// contains filtered or unexported fields
}

NatsBus implements the Bus interface, thus can be used as a publisher, a subscriber or both. It creates a connection to a nats broker and use the connection object as the underlying carrier for bus messaging patterns.

func (*NatsBus) Publish

func (n *NatsBus) Publish(topic string, data []byte) error

func (*NatsBus) Subscribe

func (n *NatsBus) Subscribe(topic string, fn Handler) error

func (*NatsBus) SubscribeOnce

func (n *NatsBus) SubscribeOnce(topic string, fn Handler) error

func (*NatsBus) Unsubscribe

func (n *NatsBus) Unsubscribe(topic string, fn Handler) error

Unsubscribe

This method is goroutine-safe.

type NatsRPC

type NatsRPC struct {
	*nats.Conn
	// contains filtered or unexported fields
}

NatsRPC implements the RPC interface, thus can be used as an RPCServer, an RPCClient or both. It creates a connection to a nats broker and use the connection object as the underlying carrier for RR messaging patterns.

func (*NatsRPC) Call

func (r *NatsRPC) Call(name string, args ...interface{}) (reflect.Value, error)

Call calls a remote service identified by its name with the given args.

func (*NatsRPC) CallV2

func (r *NatsRPC) CallV2(name string, data []byte, timeout time.Duration) ([]byte, error)

CallV2 calls a remote service identified by its name with the given args and expects response data or error, in the time limited by timeout.

func (*NatsRPC) Expose

func (r *NatsRPC) Expose(name string, fn interface{})

Expose exposes a service by associating a handler.

func (*NatsRPC) ExposeV2

func (r *NatsRPC) ExposeV2(name string, handler CalleeHandler) error

ExposeV2 exposes a service by associating a handler.

type RPC

type RPC interface {
	RPCServer
	RPCClient
}

RPC implements both sides of RPC service.

func NewInProcRPC

func NewInProcRPC(conf *RPCConf) (RPC, error)

func NewNatsRPC

func NewNatsRPC(conf *RPCConf) (RPC, error)

NewNatsRPC creates an RPC channel according to the conf.

Returns nil and any error when failed.

func NewRPC

func NewRPC(conf *RPCConf) (RPC, error)

NewRPC creates a bidirectional RPC-pattern messager.

type RPCClient

type RPCClient interface {
	//Call calls a remote method identified by its name with the given args.
	Call(name string, args ...interface{}) (reflect.Value, error)

	//CallV2 calls a remote service identified by its name with the given args
	//and expects response data or error, in the time limited by timeout.
	CallV2(name string, data []byte, timeout time.Duration) ([]byte, error)
}

RPCClient defines caller side of an RPC service.

type RPCConf

type RPCConf struct {
	//Name of the RPC stream, optional but recommended.
	Name string

	//Type defines the carrier used to exchange RPC messages.
	Type RpcType

	//Broker is the address used as a mediator-pattern endpoint.
	Broker string
}

type RPCServer

type RPCServer interface {
	//Expose register a method to rpc server by associating a function handler.
	Expose(name string, fn interface{})

	//ExposeV2 register a method to rpc server by associating a handler.
	//Serialization based style.
	ExposeV2(name string, handler CalleeHandler) error
}

RPCServer defines callee side of an RPC service.

type RpcType

type RpcType int
const (
	InterProcRpc RpcType = iota + 1
	InnerProcRpc
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL