ipc

package module
v1.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2024 License: MIT Imports: 21 Imported by: 2

README

golang-ipc

Testing codecov Go Report Card

Golang Inter-process communication library for Mac/Linux forked from james-barrow/golang-ipc with the following features added:

  • Adds the configurable ability to spawn multiple clients. In order to allow multiple client connections, multiple socket connections are dynamically allocated
  • Adds ReadTimed methods which return after the time.Duration provided
  • Adds a ConnectionPool instance to easily poll read requests from multiple clients and easily close connections
  • Adds improved logging for better visibility
  • Removes race conditions by using sync.Mutex locks
  • Improves and adds more tests
  • Makes both StartClient and StartServer blocking, omitting the need for time.Sleep between Server and Client instantiation. All tests are ran with 0 millisecond wait times using IPC_WAIT=0
  • Removes Windows support (oh wait, that's not a feature?)
Overview

A simple-to-use package that uses unix sockets on Mac/Linux to create a communication channel between two go processes.

Usage

Create a server with the default configuration and start listening for the client:

s, err := ipc.StartServer(&ServerConfig{Name:"<name of connection>"})
if err != nil {
	log.Println(err)
	return
}

Create a client and connect to the server:

c, err := ipc.StartClient(&ClientConfig{Name:"<name of connection>"})
if err != nil {
	log.Println(err)
	return
}
Read messages

Read each message sent (blocking):

for {

	//message, err := s.Read() // server
	message, err := c.Read() // client
	
	if err == nil {
	// handle error
	}
	
	// do something with the received messages
}

Read each message sent until a specific duration has surpassed.

for {

	message, err := c.ReadTimed(5*time.Second)
	
	if  message == ipc.TimeoutMessage {
		continue
    }   
	
	if err == nil && c.StatusCode() != ipc.Connecting {
	
	} 
}
MultiClient Mode

Allow polling of newly created clients on each iteration until a specific duration has surpassed.

s, err := ipc.StartServer(&ServerConfig{Name:"<name of connection>", MultiClient: true})
    if err != nil {
    log.Println(err)
    return
}

for {
    s.Connections.ReadTimed(5*time.Second, func(srv *ipc.Server, message *ipc.Message, err error) {
        if  message == ipc.TimeoutMessage {
            continue
        }
        
        if message.MsgType == -1 && message.Status == "Connected" {
        
        }
    })
}
  • Server.Connections.ReadTimed will block until the slowest ReadTimed callback completes.
  • Server.Connections.ReadTimedFastest will unblock after the first ReadTimed callback completes.

While ReadTimedFastest will result in faster iterations, it will also result in more running goroutines in scenarios where clients requests are not evenly distributed.

To get a better idea of how these work, run the following examples:

Using ReadTimed:

go run --race example/multiclient/multiclient.go

Using ReadTimedFastest:

FAST=true go run --race example/multiclient/multiclient.go

Notice that the Server receives messages faster and the process will finish faster

Message Struct

All received messages are formatted into the type Message

type Message struct {
	Err     error  // details of any error
	MsgType int    // 0 = reserved , -1 is an internal message (disconnection or error etc), all messages recieved will be > 0
	Data    []byte // message data received
	Status  string // the status of the connection
}
Write a message

//err := s.Write(1, []byte("<Message for client"))
err := c.Write(1, []byte("<Message for server"))

if err == nil {
// handle error
}

Advanced Configuration

Server options:

config := &ipc.ServerConfig{
	Name: (string),            // the name of the queue (required)
	Encryption: (bool),        // allows encryption to be switched off (bool - default is true)
	MaxMsgSize: (int) ,        // the maximum size in bytes of each message ( default is 3145728 / 3Mb)
	UnmaskPermissions: (bool), // make the socket writeable for other users (default is false)
	MultiMode: (bool),         // allow the server to connect with multiple clients
}

Client options:

config := &ipc.ClientConfig  {
	Name: (string),             // the name of the queue needs to match the name of the ServerConfig (required)
	Encryption: (bool),         // allows encryption to be switched off (bool - default is true)
	Timeout: (time.Duration),   // duration to wait while attempting to connect to the server (default is 0 no timeout)
	RetryTimer: (time.Duration),// duration to wait before iterating the dial loop or reconnecting (default is 1 second)
}

By default, the Timeout value is 0 which allows the dial loop to iterate in perpetuity until a connection to the server is established.

In scenarios where a perpetually attempting to reconnect is impractical, a Timeout value should be provided. When the connection times out, no further retries will be attempted.

When a Client is no longer used, ensure that the .Close() method is called to prevent unnecessary perpetual connection attempts.

Encryption

By default, the connection established will be encrypted, ECDH384 is used for the key exchange and AES 256 GCM is used for the cipher.

Encryption can be switched off by passing in a custom configuration to the server & client start function:

Encryption: false
Unix Socket Permissions

Under most configurations, a socket created by a user will by default not be writable by another user, making it impossible for the client and server to communicate if being run by separate users. The permission mask can be dropped during socket creation by passing a custom configuration to the server start function. This will make the socket writable for any user.

UnmaskPermissions: true	

Testing

The package has been tested on Mac and Linux and has extensive test coverage. The following commands will run all the tests and examples with race condition detection enabled.

make test run

You can change the speed of the tests by providing a value for the IPC_WAIT environment variable. A value > 5 will specify the amount of milliseconds to wait in between critical intervals whereas a value <= 5 will resolve to the amount of seconds to wait in between the same. The default value is 10 milliseconds. You can also provide the IPC_DEBUG=true environment variable to set the logrus.Loglevel to debug mode. The following command will make the tests run in debug mode while waiting 500ms in between critical intervals:

IPC_WAIT=500 IPC_DEBUG=true make test run

Documentation

Index

Constants

View Source
const (
	VERSION                = 2       // ipc package VERSION
	MAX_MSG_SIZE           = 3145728 // 3Mb  - Maximum bytes allowed for each message
	DEFAULT_WAIT           = 10
	DEFAULT_LOG_LEVEL      = logrus.ErrorLevel //
	SOCKET_NAME_BASE       = "/tmp/"
	SOCKET_NAME_EXT        = ".sock"
	CLIENT_CONNECT_MSGTYPE = 12
	ENCRYPT_BY_DEFAULT     = true
)

Variables

View Source
var TimeoutMessage = &Message{MsgType: 2, Err: errors.New("timed_out")}

Functions

func GetDefaultClientConnectWait added in v1.2.6

func GetDefaultClientConnectWait() int

func Sleep added in v1.3.0

func Sleep()

Sleep change the sleep time by using IPC_WAIT env variable (seconds)

Types

type Actor added in v1.2.6

type Actor struct {
	// contains filtered or unexported fields
}

func NewActor added in v1.2.6

func NewActor(ac *ActorConfig) Actor

func (*Actor) Close added in v1.2.6

func (a *Actor) Close()

Close - closes the connection

func (*Actor) Read added in v1.2.6

func (a *Actor) Read() (*Message, error)

Read - blocking function, reads each message received if MsgType is a negative number it's an internal message

func (*Actor) ReadTimed added in v1.3.0

func (a *Actor) ReadTimed(duration time.Duration) (*Message, error)

func (*Actor) ReadTimedTimeoutMessage added in v1.3.4

func (a *Actor) ReadTimedTimeoutMessage(duration time.Duration, onTimeoutMessage *Message) (*Message, error)

func (*Actor) Status added in v1.2.6

func (a *Actor) Status() string

Status - returns the current connection status as a string

func (*Actor) StatusCode added in v1.2.6

func (a *Actor) StatusCode() Status

StatusCode - returns the current connection status

func (*Actor) String added in v1.3.2

func (a *Actor) String() string

func (*Actor) Write added in v1.2.6

func (a *Actor) Write(msgType int, message []byte) error

Write - writes a message to the ipc connection. msgType - denotes the type of data being sent. 0 is a reserved type for internal messages and errors.

func (*Actor) WriteMessage added in v1.3.4

func (a *Actor) WriteMessage(msg *Message) error

type ActorConfig added in v1.2.6

type ActorConfig struct {
	IsServer     bool
	ServerConfig *ServerConfig
	ClientConfig *ClientConfig
}

type Client

type Client struct {
	Actor

	ClientId int
	// contains filtered or unexported fields
}

Client - holds the details of the client connection and config.

func NewClient added in v1.3.0

func NewClient(name string, config *ClientConfig) (*Client, error)

func StartClient

func StartClient(config *ClientConfig) (*Client, error)

StartClient - start the ipc client. ipcName = is the name of the unix socket or named pipe that the client will try and connect to.

func StartClientPool added in v1.3.4

func StartClientPool(config *ClientConfig) (*Client, error)

func (*Client) ByteReader added in v1.3.0

func (c *Client) ByteReader(a *Actor, buff []byte) bool

func (*Client) String added in v1.3.0

func (c *Client) String() string

getStatus - get the current status of the connection

type ClientConfig

type ClientConfig struct {
	Name        string
	Timeout     time.Duration // the duration to wait before abandoning a dial attempt
	RetryTimer  time.Duration // the duration to wait in dial loop iteration and reconnect attempts
	LogLevel    string
	MultiClient bool
	Encryption  bool
}

ClientConfig - used to pass configuration overrides to ClientStart()

type ConnectionPool added in v1.3.4

type ConnectionPool struct {
	Servers      []*Server
	ServerConfig *ServerConfig
	Logger       *logrus.Logger
	// contains filtered or unexported fields
}

func (*ConnectionPool) Close added in v1.3.4

func (sm *ConnectionPool) Close()

func (*ConnectionPool) MapExec added in v1.3.4

func (sm *ConnectionPool) MapExec(callback func(*Server), from string)

func (*ConnectionPool) Read added in v1.3.4

func (sm *ConnectionPool) Read(callback func(*Server, *Message, error))

func (*ConnectionPool) ReadTimed added in v1.3.4

func (sm *ConnectionPool) ReadTimed(duration time.Duration, callback func(*Server, *Message, error))

ReadTimed will call ReadTimed on all connections waiting for the slowest one to finish

func (*ConnectionPool) ReadTimedFastest added in v1.3.4

func (sm *ConnectionPool) ReadTimedFastest(duration time.Duration, callback func(*Server, *Message, error))

ReadTimed will call ReadTimed on all connections waiting for the fastest one to finish

type Message

type Message struct {
	Err     error  // details of any error
	MsgType int    // 0 = reserved , -1 is an internal message (disconnection or error etc), all messages received will be > 0
	Data    []byte // message data received
	Status  string // the status of the connection
}

Message - contains the received message

type Server

type Server struct {
	Actor

	Connections *ConnectionPool
	// contains filtered or unexported fields
}

Server - holds the details of the server connection & config.

func NewServer added in v1.3.0

func NewServer(name string, config *ServerConfig) (*Server, error)

func StartServer

func StartServer(config *ServerConfig) (*Server, error)

StartServer - starts the ipc server.

func StartServerPool added in v1.3.4

func StartServerPool(config *ServerConfig) (*Server, error)

func (*Server) ByteReader added in v1.3.0

func (s *Server) ByteReader(a *Actor, buff []byte) bool

func (*Server) Close

func (s *Server) Close()

Close - closes the connection

type ServerConfig

type ServerConfig struct {
	Name              string
	MaxMsgSize        int
	UnmaskPermissions bool
	LogLevel          string
	MultiClient       bool
	Encryption        bool
}

ServerConfig - used to pass configuration overrides to ServerStart()

type Status

type Status int

Status - Status of the connection

const (
	// NotConnected - 0
	NotConnected Status = iota
	// Listening - 1
	Listening
	// Connecting - 2
	Connecting
	// Connected - 3
	Connected
	// ReConnecting - 4
	ReConnecting
	// Closed - 5
	Closed
	// Closing - 6
	Closing
	// Error - 7
	Error
	// Timeout - 8
	Timeout
	// Disconnected - 9
	Disconnected
)

func (Status) String

func (status Status) String() string

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL