drpc

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2019 License: Apache-2.0 Imports: 11 Imported by: 0

README

dRPC

dRPC is a means of communication between processes local to the same physical system, via a Unix Domain Socket. At any given time a process may act as a client, a server, or both, though each listening dRPC server needs its own Unix Domain Socket.

The server will fail to create the socket if something already exists at that location in the filesystem, even if it is an older incarnation of the socket. Optionally, your application may wish to unlink that filesystem location before creating the socket.

dRPC calls are defined by module and method identifiers. The dRPC module can be thought of as a package of related functions. The dRPC method indicates a specific function to be executed by the server. If the method requires an input, it should be marshalled in the body of the dRPC call. The server will respond with a dRPC response structure, which may include a method-specific response in the body.

The DAOS dRPC implementation is dependent on Protocol Buffers to define the structures passed over the dRPC channel. Any structure to be sent via dRPC as part of a call or response must be defined in a .proto file.

Go API

In Go, the drpc package includes both client and server functionality, which is outlined below. For documentation of the C API, see here.

The dRPC call and response are represented by the Protobuf-generated drpc.Call and drpc.Response structures.

Go Client

The dRPC client is represented by the drpc.ClientConnection object.

Basic Client Workflow
  1. Create a new client connection with the path to the dRPC server's Unix Domain Socket:
    conn := drpc.NewClientConnection("/var/run/my_socket.sock")
    
  2. Connect to the dRPC server:
    err := conn.Connect()
    
  3. Create your drpc.Call and send it to the server:
    call := drpc.Call{}
    // Set up the Call with module, method, and body
    resp, err := drpc.SendMsg(call)
    
    An error indicates that the drpc.Call couldn't be sent, or an invalid drpc.Response was received. If there is no error returned, the content of the drpc.Response should still be checked for errors reported by the server.
  4. Send as many calls as desired.
  5. Close the connection when finished:
    conn.Close()
    
Go Server

The dRPC server is represented by the drpc.DomainSocketServer object.

Individual dRPC modules must be registered with the server in order to handle incoming dRPC calls for that module. To create a dRPC module, create an object that implements the drpc.Module interface. The module ID must be unique.

Basic Server Workflow
  1. Create the new DomainSocketServer with the server's Unix Domain Socket:
    drpcServer, err := drpc.NewDomainSocketServer("/var/run/my_socket.sock")
    
  2. Register the dRPC modules that the server needs to handle:
    drpcServer.RegisterRPCModule(&MyExampleModule{})
    drpcServer.RegisterRPCModule(&AnotherExampleModule{})
    
  3. Start the server to kick off the Goroutine to start listening for and handling incoming connections:
    err = drpc.Start()
    
  4. When it is time to shut down the server, close down the listening Goroutine:
    drpcServer.Shutdown()
    

Documentation

Index

Constants

View Source
const (
	// ModuleSecurityAgent is the dRPC module for security tasks in the
	// DAOS agent
	ModuleSecurityAgent = C.DRPC_MODULE_SEC_AGENT
	// ModuleMgmt is the dRPC module for management service tasks
	ModuleMgmt = C.DRPC_MODULE_MGMT
	// ModuleSrv is the dRPC module for tasks relating to server setup
	ModuleSrv = C.DRPC_MODULE_SRV
	// ModuleSecurity is the dRPC module for security tasks in the DAOS
	// server
	ModuleSecurity = C.DRPC_MODULE_SEC
)
View Source
const (
	// MethodKillRank is a ModuleMgmt method
	MethodKillRank = C.DRPC_METHOD_MGMT_KILL_RANK
	// MethodSetRank is a ModuleMgmt method
	MethodSetRank = C.DRPC_METHOD_MGMT_SET_RANK
	// MethodCreateMS is a ModuleMgmt method
	MethodCreateMS = C.DRPC_METHOD_MGMT_CREATE_MS
	// MethodStartMS is a ModuleMgmt method
	MethodStartMS = C.DRPC_METHOD_MGMT_START_MS
	// MethodJoin is a ModuleMgmt method
	MethodJoin = C.DRPC_METHOD_MGMT_JOIN
	// MethodGetAttachInfo is a ModuleMgmt method
	MethodGetAttachInfo = C.DRPC_METHOD_MGMT_GET_ATTACH_INFO
	// MethodPoolCreate is a ModuleMgmt method
	MethodPoolCreate = C.DRPC_METHOD_MGMT_POOL_CREATE
	// MethodPoolDestroy is a ModuleMgmt method
	MethodPoolDestroy = C.DRPC_METHOD_MGMT_POOL_DESTROY
	// MethodBioHealth is a ModuleMgmt method
	MethodBioHealth = C.DRPC_METHOD_MGMT_BIO_HEALTH_QUERY
	// MethodSetUp is a ModuleMgmt method
	MethodSetUp = C.DRPC_METHOD_MGMT_SET_UP
	// MethodSmdDevs is a ModuleMgmt method
	MethodSmdDevs = C.DRPC_METHOD_MGMT_SMD_LIST_DEVS
	// MethodSmdPools is a ModuleMgmt method
	MethodSmdPools = C.DRPC_METHOD_MGMT_SMD_LIST_POOLS
	// MethodPoolGetACL is a ModuleMgmt method
	MethodPoolGetACL = C.DRPC_METHOD_MGMT_POOL_GET_ACL
	// MethodListPools is a ModuleMgmt method
	MethodListPools = C.DRPC_METHOD_MGMT_LIST_POOLS
	// MethodPoolOverwriteACL is a ModuleMgmt method
	MethodPoolOverwriteACL = C.DRPC_METHOD_MGMT_POOL_OVERWRITE_ACL
	// MethodPoolUpdateACL is a ModuleMgmt method
	MethodPoolUpdateACL = C.DRPC_METHOD_MGMT_POOL_UPDATE_ACL
	// MethodPoolDeleteACL is a ModuleMgmt method
	MethodPoolDeleteACL = C.DRPC_METHOD_MGMT_POOL_DELETE_ACL
)
View Source
const MaxMsgSize = 16384

MaxMsgSize is the maximum drpc message size that may be sent. Using a packetsocket over the unix domain socket means that we receive a whole message at a time without knowing its size. So for this reason we need to restrict the maximum message size so we can preallocate a buffer to put all of the information in. Corresponding C definition is found in include/daos/drpc.h

View Source
const (
	// MethodNotifyReady is a ModuleSrv method
	MethodNotifyReady = C.DRPC_METHOD_SRV_NOTIFY_READY
)
View Source
const (
	// MethodRequestCredentials is a ModuleSecurityAgent method
	MethodRequestCredentials = C.DRPC_METHOD_SEC_AGENT_REQUEST_CREDS
)
View Source
const (
	// MethodValidateCredentials is a ModuleSecurity method
	MethodValidateCredentials = C.DRPC_METHOD_SEC_VALIDATE_CREDS
)

Variables

View Source
var Status_name = map[int32]string{
	0: "SUCCESS",
	1: "SUBMITTED",
	2: "FAILURE",
	3: "UNKNOWN_MODULE",
	4: "UNKNOWN_METHOD",
}
View Source
var Status_value = map[string]int32{
	"SUCCESS":        0,
	"SUBMITTED":      1,
	"FAILURE":        2,
	"UNKNOWN_MODULE": 3,
	"UNKNOWN_METHOD": 4,
}

Functions

This section is empty.

Types

type Call

type Call struct {
	Module               int32    `protobuf:"varint,1,opt,name=module,proto3" json:"module,omitempty"`
	Method               int32    `protobuf:"varint,2,opt,name=method,proto3" json:"method,omitempty"`
	Sequence             int64    `protobuf:"varint,3,opt,name=sequence,proto3" json:"sequence,omitempty"`
	Body                 []byte   `protobuf:"bytes,4,opt,name=body,proto3" json:"body,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

* Call is a structure outlining a function call to be executed over a drpc channel.

module is the numerical identifier for the drpc module to process the call method is the specific method within the module sequence is the internal sequence counter for matching calls to responses body is the opaque data of the function call arguments

func (*Call) Descriptor

func (*Call) Descriptor() ([]byte, []int)

func (*Call) GetBody

func (m *Call) GetBody() []byte

func (*Call) GetMethod

func (m *Call) GetMethod() int32

func (*Call) GetModule

func (m *Call) GetModule() int32

func (*Call) GetSequence

func (m *Call) GetSequence() int64

func (*Call) ProtoMessage

func (*Call) ProtoMessage()

func (*Call) Reset

func (m *Call) Reset()

func (*Call) String

func (m *Call) String() string

func (*Call) XXX_DiscardUnknown

func (m *Call) XXX_DiscardUnknown()

func (*Call) XXX_Marshal

func (m *Call) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Call) XXX_Merge

func (dst *Call) XXX_Merge(src proto.Message)

func (*Call) XXX_Size

func (m *Call) XXX_Size() int

func (*Call) XXX_Unmarshal

func (m *Call) XXX_Unmarshal(b []byte) error

type ClientConnection

type ClientConnection struct {
	sync.Mutex
	// contains filtered or unexported fields
}

ClientConnection represents a client connection to a dRPC server

func NewClientConnection

func NewClientConnection(socket string) *ClientConnection

NewClientConnection creates a new dRPC client

func (*ClientConnection) Close

func (c *ClientConnection) Close() error

Close shuts down the connection to the Unix Domain Socket

func (*ClientConnection) Connect

func (c *ClientConnection) Connect() error

Connect opens a connection to the internal Unix Domain Socket path

func (*ClientConnection) IsConnected

func (c *ClientConnection) IsConnected() bool

IsConnected indicates whether the client connection is currently active

func (*ClientConnection) SendMsg

func (c *ClientConnection) SendMsg(msg *Call) (*Response, error)

SendMsg sends a message to the connected dRPC server, and returns the response to the caller.

type DomainSocketClient

type DomainSocketClient interface {
	sync.Locker
	IsConnected() bool
	Connect() error
	Close() error
	SendMsg(call *Call) (*Response, error)
}

DomainSocketClient is the interface to a dRPC client communicating over a Unix Domain Socket

type DomainSocketServer

type DomainSocketServer struct {
	// contains filtered or unexported fields
}

DomainSocketServer is the object that listens for incoming dRPC connections, maintains the connections for sessions, and manages the message processing.

func NewDomainSocketServer

func NewDomainSocketServer(ctx context.Context, log logging.Logger, sock string) (*DomainSocketServer, error)

NewDomainSocketServer returns a new unstarted instance of a DomainSocketServer for the specified unix domain socket path.

func (*DomainSocketServer) Listen

func (d *DomainSocketServer) Listen()

Listen listens for incoming connections on the UNIX domain socket and creates individual sessions for each one.

func (*DomainSocketServer) RegisterRPCModule

func (d *DomainSocketServer) RegisterRPCModule(mod Module)

RegisterRPCModule takes a Module and associates it with the given DomainSocketServer so it can be used to process incoming dRPC calls.

func (*DomainSocketServer) Shutdown

func (d *DomainSocketServer) Shutdown()

Shutdown places the state of the server to shutdown which terminates the Listen go routine and starts the cleanup of all open connections.

func (*DomainSocketServer) Start

func (d *DomainSocketServer) Start() error

Start sets up the dRPC server socket and kicks off the listener goroutine.

type Module

type Module interface {
	HandleCall(*Session, int32, []byte) ([]byte, error)
	ID() int32
}

Module is an interface that a type must implement to provide the functionality needed by the ModuleService to process dRPC requests.

type ModuleService

type ModuleService struct {
	// contains filtered or unexported fields
}

ModuleService is the collection of Modules used by DomainSocketServer to be used to process messages.

func NewModuleService

func NewModuleService(log logging.Logger) *ModuleService

NewModuleService creates an initialized ModuleService instance

func (*ModuleService) GetModule

func (r *ModuleService) GetModule(id int32) (Module, bool)

GetModule fetches the module for the given ID. Returns true if found, false otherwise.

func (*ModuleService) ProcessMessage

func (r *ModuleService) ProcessMessage(session *Session, msgBytes []byte) ([]byte, error)

ProcessMessage is the main entry point into the ModuleService. It accepts a marshaled drpc.Call instance, processes it, calls the handler in the appropriate Module, and marshals the result into the body of a drpc.Response.

func (*ModuleService) RegisterModule

func (r *ModuleService) RegisterModule(mod Module) error

RegisterModule will take in a type that implements the Module interface and ensure that no other module is already registered with that module identifier.

type Response

type Response struct {
	Sequence             int64    `protobuf:"varint,1,opt,name=sequence,proto3" json:"sequence,omitempty"`
	Status               Status   `protobuf:"varint,2,opt,name=status,proto3,enum=drpc.Status" json:"status,omitempty"`
	Body                 []byte   `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

* Response is the data to the Call with a given sequence number.

sequence is the sequence number of the drpc call for the response. status represents the return/faulure value of the call body represents the returned data if a call returns more than just a status.

func (*Response) Descriptor

func (*Response) Descriptor() ([]byte, []int)

func (*Response) GetBody

func (m *Response) GetBody() []byte

func (*Response) GetSequence

func (m *Response) GetSequence() int64

func (*Response) GetStatus

func (m *Response) GetStatus() Status

func (*Response) ProtoMessage

func (*Response) ProtoMessage()

func (*Response) Reset

func (m *Response) Reset()

func (*Response) String

func (m *Response) String() string

func (*Response) XXX_DiscardUnknown

func (m *Response) XXX_DiscardUnknown()

func (*Response) XXX_Marshal

func (m *Response) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Response) XXX_Merge

func (dst *Response) XXX_Merge(src proto.Message)

func (*Response) XXX_Size

func (m *Response) XXX_Size() int

func (*Response) XXX_Unmarshal

func (m *Response) XXX_Unmarshal(b []byte) error

type Session

type Session struct {
	Conn net.Conn
	// contains filtered or unexported fields
}

Session represents an individual client connection to the Domain Socket Server.

func NewSession

func NewSession(conn net.Conn, svc *ModuleService) *Session

NewSession creates a new dRPC Session object

func (*Session) Close

func (s *Session) Close()

Close closes the session

func (*Session) ProcessIncomingMessage

func (s *Session) ProcessIncomingMessage() error

ProcessIncomingMessage listens for an incoming message on the session, calls its handler, and sends the response.

type Status

type Status int32

* Status represents the valid values for a response status.

const (
	Status_SUCCESS        Status = 0
	Status_SUBMITTED      Status = 1
	Status_FAILURE        Status = 2
	Status_UNKNOWN_MODULE Status = 3
	Status_UNKNOWN_METHOD Status = 4
)

func (Status) EnumDescriptor

func (Status) EnumDescriptor() ([]byte, []int)

func (Status) String

func (x Status) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL