wirenet

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2020 License: Apache-2.0 Imports: 19 Imported by: 5

README

go-wirenet

Simple bidirectional TCP stream server. Useful for NAT traversal.

Coverage Status Go Documentation Go Report Card

Design

Design

Client-Server
// client <-> server
client1 join  to the server  ---------NAT------> server
client2 join to the server   ---------NAT------> server
client3 join to the server   ---------NAT------> server
client4 join to the server   ---------NAT------> server

// call from the server
call client1 from the server ---------NAT------> client1
call client 2 from the server --------NAT------> client2  
call client 3 from the server --------NAT------> client3  
call client 4 from the server --------NAT------> client4

// call from the client
call server from the client1 ---------NAT------> server
call server from the client2 ---------NAT------> server  
call server from the client3  --------NAT------> server  
call server from the client4  --------NAT------> server
Client-Hub
// clients <-> hub
client1 join to the hub  ---------NAT------> hub
client2 join to the hub  ---------NAT------> hub
client3 join to the hub  ---------NAT------> hub
client4 join to the hub  ---------NAT------> hub

// call from the client
call client2 from the client1 ---------NAT------> client2
call client1 from the client2 ---------NAT------> client1 
call client2 from the client3  --------NAT------> client2  
call client1 from the client4  --------NAT------> client1

Table of contents

Installation
go get github.com/mediabuyerbot/go-wirenet
Examples
Creating connection
import "github.com/mediabuyerbot/go-wirenet"

// make server side
wire, err := wirenet.Mount(":8989", nil)
if err != nil {
    handleError(err)
}
if err := wire.Connect(); err != nil {
    handleError(err)
}

// OR make client side 
wire, err := wirenet.Join(":8989", nil)
if err != nil {
    handleError(err)
}

// connection
if err := wire.Connect(); err != nil {
    handleError(err)
}
Stream handling
import "github.com/mediabuyerbot/go-wirenet"

// server side
wire, err := wirenet.Mount(":8989", nil)
if err != nil {
    handleError(err)
}
// or client side
wire, err := wirenet.Join(":8989", nil)
if err != nil {
    handleError(err)
}

backupStream := func(ctx context.Context, stream wirenet.Stream) {
    file, err := os.Open("/backup.log")
    ...
    // write to stream
    n, err := stream.ReadFrom(file)
    ...
    stream.Close()
}

openChromeStream := func(ctx context.Context, stream wirenet.Stream) {
      // read from stream
      n, err := stream.WriteTo(os.Stdout)
} 

wire.Stream("backup", backupStream)
wire.Stream("openChrome", openChromeStream)

if err := wire.Connect(); err != nil {
    handleError(err)
}
Stream opening
// make options
opts := []wirenet.Option{
   wirenet.WithSessionOpenHook(func(session wirenet.Session) {
   		     hub.registerSession(session)	
   		}),
   		wirenet.WithSessionCloseHook(func(session wirenet.Session) {
   			 hub.unregisterSession(session)
        }),
}
// make client side
wire, err := wirenet.Join(":8989", opts...)
// OR make server side
wire, err := wirenet.Mount(":8989", opts...)

...

// find an open session in some repository
sess := hub.findSession("sessionID")
stream, err := sess.OpenStream("backup")
if err != nil {
   handleError(err)
}
defer stream.Close()
 
backup, err := os.Open("/backup.log")
if err != nil {
   handleError(err)
}
defer backup.Close()

// write to stream
n, err := stream.ReadFrom(backup)
...
Writing to stream
wire.Stream("account.set", func(ctx context.Context, stream wirenet.Stream) {
   // write to stream using writer 
   writer := stream.Writer()
   for {
      n, err := fileOne.Read(buf)
      if err != nil {
          handleError(err)
          break
      }
   	  n, err := writer.Write(buf[:n])
      ...
   }
   // EOF frame
   writer.Close()
   
   for {
         n, err := fileTwo.Read(buf)
         if err != nil {
             handleError(err)
             break
         }
      	  n, err := writer.Write(buf[:n])
         ...
      }
      // EOF frame
      writer.Close() 
   ...

   // or write to stream (recommended) 
   n, err := stream.ReadFrom(fileOne)
   ...
   n, err := stream.ReadFrom(fileTwo)
})
Reading from stream
wire.Stream("account.set", func(ctx context.Context, stream wirenet.Stream) {
   // reading from stream using reader 
   reader := stream.Reader()
   buf := make([]byte, wirenet.BufSize)
   n, err := reader.Read(buf)
   // EOF frame
   reader.Close()
   ...

   // or reader from stream (recommended)  
   n, err := stream.WriteTo(file)
   ...
})
Using authentication

server

tokenValidator := func(streamName string, id wirenet.Identification, token wirenet.Token) error {
   if streamName == "public" {
      return nil 
   }
   return validate(token)
}

wire, err := wirenet.Mount(":8989", wirenet.WithTokenValidator(tokenValidator))
go func() {
	if err := wire.Connect(); err != nil {
	   handleError(err)
    }
}()

<-terminate()
wire.Close()

client

 token := wirenet.Token("token")
 identification := wirenet.Identification("uuid")
 wire, err := wirenet.Join(":8989",
 		wirenet.WithIdentification(identification, token),
 )
 if err := wire.Connect(); err != nil {
    handleError(err)
 }
Using SSL/TLS certs

server

// make keys 
// ./certs/server.key
// ./certs/server.pem
tlsConf, err := wirenet.LoadCertificates("server", "./certs")
if err != nil {
	handleError(err)
}
wire, err := wirenet.Mount(":8989", wirenet.WithTLS(tlsConf))
go func() {
	if err := wire.Connect(); err != nil {
	   handleError(err)
    }
}()

<-terminate()
wire.Close()

client

// make keys 
// ./certs/client.key
// ./certs/client.pem
tlsConf, err := wirenet.LoadCertificates("client", "./certs")
if err != nil {
	handleError(err)
}
wire, err := wirenet.Mount(":8989", wirenet.WithTLS(tlsConf))
if err := wire.Connect(); err != nil {
    handleError(err)
}
Shutdown
timeout := 120*time.Second 
wire, err := wirenet.Mount(":8989",
    // Waiting time for completion of all streams
    wirenet.WithSessionCloseTimeout(timeout),
)
go func() {
	if err := wire.Connect(); err != nil {
	   handleError(err)
    }
}()

<-terminate()

wire.Close()
KeepAlive
// server side
wire, err := wirenet.Mount(":8989",
     WithKeepAlive(true),
     WithKeepAliveInterval(30 * time.Second), 
)

// OR client side
wire, err := wirenet.Join(":8989",
     WithKeepAlive(true),
     WithKeepAliveInterval(30 * time.Second), 
)

go func() {
	if err := wire.Connect(); err != nil {
	   handleError(err)
    }
}()

<-terminate()

wire.Close()
Hub mode
Attention! The name of the stream for each client must be unique!

hub

 hub, err := wirenet.Hub(":8989")
 hub.Connect()

client1

client1, err := wirenet.Join(":8989")
client1.Stream("client1:readBalance", func(ctx context.Context, s Stream) {})
go func() {
   client1.Connect()
}()
...
sess, err := client2.Session("uuid")
stream, err := sess.OpenStream("client2:readBalance")
<-termiate()
client1.Close()

client2

client2, err := wirenet.Join(":8989")
client2.Stream("client2:readBalance", func(ctx context.Context, s Stream) {})
go func() {
   client2.Connect()
}()
...
sess, err := client2.Session("uuid")
stream, err := sess.OpenStream("client1:readBalance")
<-termiate()
client2.Close()
Options
wirenet.WithConnectHook(hook func(io.Closer)) Option
wirenet.WithSessionOpenHook(hook wirenet.SessionHook) Option
wirenet.WithSessionCloseHook(hook wirenet.SessionHook) Option
wirenet.WithIdentification(id wirenet.Identification, token wirenet.Token) Option
wirenet.WithTokenValidator(v wirenet.TokenValidator) Option                   // server side
wirenet.WithTLS(conf *tls.Config) Option
wirenet.WithRetryWait(min, max time.Duration) Option
wirenet.WithRetryMax(n int) Option
wirenet.WithReadWriteTimeouts(read, write time.Duration) Option
wirenet.WithSessionCloseTimeout(dur time.Duration) Option

Documentation

Overview

Package wirenet provides a streaming Bidirectional TCP server. Useful for NAT traversal.

Index

Constants

View Source
const (
	DefaultKeepAliveInterval   = 15 * time.Second
	DefaultEnableKeepAlive     = true
	DefaultReadTimeout         = 10 * time.Second
	DefaultWriteTimeout        = 10 * time.Second
	DefaultAcceptBacklog       = 256
	DefaultSessionCloseTimeout = 5 * time.Second
	DefaultRetryMax            = 100
	DefaultRetryWaitMax        = 60 * time.Second
	DefaultRetryWaitMin        = 5 * time.Second
)
View Source
const (
	BufSize = 1 << 10
)

Variables

View Source
var (
	// ErrStreamClosed is returned when named stream is closed.
	ErrStreamClosed = errors.New("wirenet: read/write on closed stream")

	// ErrStreamHandlerNotFound is returned when stream handler not found.
	ErrStreamHandlerNotFound = errors.New("wirenet: stream handler not found")

	// ErrSessionNotFound is returned when session not found.
	ErrSessionNotFound = errors.New("wirenet: session not found")

	// ErrWireClosed is returned when Wire is closed client or server side.
	ErrWireClosed = errors.New("wirenet: wire closed")

	// ErrAddrEmpty is returned when addr is empty. See Mount(), Join(), Hub().
	ErrAddrEmpty = errors.New("wirenet: listener address is empty")

	// ErrSessionClosed is returned when session is closed.
	ErrSessionClosed = errors.New("wirenet: session closed")

	// ErrUnknownCertificateName is returned when certificate name is empty. See LoadCertificates().
	ErrUnknownCertificateName = errors.New("wirenet: unknown certificate name")
)

Functions

func DefaultRetryPolicy

func DefaultRetryPolicy(min, max time.Duration, attemptNum int) time.Duration

func LoadCertificates

func LoadCertificates(name string, certPath string) (*tls.Config, error)

Types

type ErrorHandler

type ErrorHandler func(context.Context, error)

ErrorHandler is used for error logging.

type Handler

type Handler func(context.Context, Stream)

Handler is used to handle payload in a named stream.

type Identification

type Identification []byte

type OpError

type OpError struct {
	Op             string
	SessionID      uuid.UUID
	LocalAddr      net.Addr
	RemoteAddr     net.Addr
	Identification Identification
	Err            error
}

func (*OpError) Error

func (e *OpError) Error() string

type Option

type Option func(*wire)

func WithConnectHook

func WithConnectHook(hook func(io.Closer)) Option

func WithErrorHandler

func WithErrorHandler(h ErrorHandler) Option

func WithIdentification

func WithIdentification(id Identification, token Token) Option

func WithKeepAlive

func WithKeepAlive(flag bool) Option

func WithKeepAliveInterval

func WithKeepAliveInterval(interval time.Duration) Option

func WithLogWriter

func WithLogWriter(w io.Writer) Option

func WithReadWriteTimeouts

func WithReadWriteTimeouts(read, write time.Duration) Option

func WithRetryMax

func WithRetryMax(n int) Option

func WithRetryPolicy

func WithRetryPolicy(rp RetryPolicy) Option

func WithRetryWait

func WithRetryWait(min, max time.Duration) Option

func WithSessionCloseHook

func WithSessionCloseHook(hook SessionHook) Option

func WithSessionCloseTimeout

func WithSessionCloseTimeout(dur time.Duration) Option

func WithSessionOpenHook

func WithSessionOpenHook(hook SessionHook) Option

func WithTLS

func WithTLS(conf *tls.Config) Option

func WithTokenValidator

func WithTokenValidator(v TokenValidator) Option

type RetryPolicy

type RetryPolicy func(min, max time.Duration, attemptNum int) time.Duration

RetryPolicy retry policy is used when there is no connection to the server. Used only on the client side. The default is DefaultRetryPolicy, but you can write your own policy.

type Session

type Session interface {

	// ID returns an unique session id.
	ID() uuid.UUID

	// IsClosed returns a true flag if the session is closed, otherwise returns a false flag.
	IsClosed() bool

	// Close closes gracefully shutdown the all active streams.
	Close() error

	// StreamNames returns a list of open stream names.
	StreamNames() []string

	// OpenStream opens a named stream and returns it.
	// After the named stream is successfully opened, an authentication frame is sent.
	OpenStream(name string) (Stream, error)

	// Identification returns some information specified by the user on the client side using WithIdentification().
	Identification() Identification

	// CloseWire closes gracefully shutdown the server without interrupting any active connections.
	CloseWire() error
}

Session represents a connection between a client and a server. Each session can have from one to N named streams. Each named stream is a payload processing (file transfer, video transfer, etc.).

type SessionHook

type SessionHook func(Session)

SessionHook is used when opening or closing a session, and each session interception is performed in a separate goroutine.

type Sessions

type Sessions map[uuid.UUID]Session

Sessions represents a map of active sessions. The key is session id and session is value.

type ShutdownError

type ShutdownError struct {
	Errors []error
}

ShutdownError is the error type with errors that occurred when closing a sessions. The error is used only when closing a wired connection.

func NewShutdownError

func NewShutdownError() *ShutdownError

NewShutdownError constructs a new ShutdownError.

func (*ShutdownError) Add

func (e *ShutdownError) Add(er error)

Add adds an error to the container.

func (*ShutdownError) Error

func (e *ShutdownError) Error() string

Error returns a list of all errors in a string representation. Each error is separated by a symbol \n.

func (*ShutdownError) IsFilled

func (e *ShutdownError) IsFilled() bool

IsFilled returns a true flag if the container is full, otherwise returns a false flag.

type Stream

type Stream interface {

	// ID returns an unique stream id.
	ID() uuid.UUID

	// Session returns the instance of the session to which the stream.
	Session() Session

	// Name returns a name of stream.
	Name() string

	// IsClosed returns a true flag if the stream is closed, otherwise returns a false flag.
	IsClosed() bool

	// Reader returns a reader.
	Reader() io.ReadCloser

	// Writer returns a writer.
	Writer() io.WriteCloser

	io.Closer
	io.ReaderFrom
	io.WriterTo
}

Stream is a named stream for streaming data. The size of the transmitted data is unlimited since the transfer is carried out in chunks. Default chunk size is 1024 bytes.

type Token

type Token []byte

type TokenValidator

type TokenValidator func(streamName string, id Identification, token Token) error

TokenValidator used to validate the token if the token is set on the client side with WithIdentification(). Token and identifier can be any sequence of bytes.

type Wire

type Wire interface {

	// Sessions returns a list of active sessions.
	Sessions() Sessions

	// Session returns the session by UUID.
	Session(sessionID uuid.UUID) (Session, error)

	// Stream registers the handler for the given name.
	// If a named stream already exists, stream overwrite.
	Stream(name string, h Handler)

	// Close gracefully shutdown the server without interrupting any active connections.
	Close() error

	// Connect creates a new connection.
	// If the wire is on the client-side, then used dial().
	// If the wire is on the server-side, then used listener().
	Connect() error
}

Wire is used to initialize a server or client side connection.

func Hub

func Hub(addr string, opts ...Option) (Wire, error)

Hub constructs a new connection point with the given addr and Options as the server side.

func Join

func Join(addr string, opts ...Option) (Wire, error)

Join constructs a new connection point with the given addr and Options as the client side.

func Mount

func Mount(addr string, opts ...Option) (Wire, error)

Mount constructs a new connection point with the given addr and Options as the server side.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL