pole_rpc

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2021 License: BSD-3-Clause Imports: 33 Imported by: 1

README

pole-rpc

Go Report Card

Very simple RPC remote calling framework, using declarative calling strategy

非常简单的RPC远程调用框架,采用声明式的调用策略

TODO

  • 监听TCP链接的Connect以及DisConnect事件
  • net.Conn的管理
  • RSocketRPC调用实现
  • 使用Option设计模式初始化各个组件
  • logger、基本数据结构、scheduler调度实现
  • 更易于使用的体验
  • metrics指标设置
  • 基于websocket实现RPC调用
  • 基于http2实现RPC调用

Example

func main() {
    ctx, cancelF := context.WithCancel(context.Background())
    defer cancelF()
    
    TestPort = 8000 + rand.Int31n(1000)
    server := createTestServer(ctx)
    // 等待 Server 的启动
    <-server.IsReady
    
    if err := <-server.ErrChan; err != nil {
        t.Error(err)
        return
    }
    
    client, err := createTestClient()
    if err != nil {
        t.Error(err)
        return
    }
    server.RegisterChannelRequestHandler(TestHelloWorld, func (cxt context.Context, rpcCtx RpcServerContext) {
        fmt.Printf("receive client requst : %#v\n", rpcCtx.GetReq())
        for i := 0; i < 10; i++ {
            _ = rpcCtx.Send(&ServerResponse{
                Code: int32(i),
            })
        }
    })
    waitG := sync.WaitGroup{}
    waitG.Add(10)
    
    uuidHolder := atomic.Value{}
    
    call := func (resp *ServerResponse, err error) {
        waitG.Done()
        RpcLog.Info("response %#v", resp)
        assert.Equalf(t, uuidHolder.Load().(string), resp.RequestId, "req-id must equal")
    }
    
    rpcCtx, err := client.RequestChannel(ctx, createTestEndpoint(), call)
    if err != nil {
        t.Error(err)
        return
    }
    
    reqId := uuid.New().String()
    uuidHolder.Store(reqId)
    rpcCtx.Send(&ServerRequest{
        FunName:   TestHelloWorld,
        RequestId: reqId,
    })
    waitG.Wait()
    
    waitG.Add(10)
    reqId = uuid.New().String()
    uuidHolder.Store(reqId)
    rpcCtx.Send(&ServerRequest{
        FunName:   TestHelloWorld,
        RequestId: reqId,
    })
    waitG.Wait()
}

Documentation

Index

Constants

View Source
const (
	SegmentShift = 7
	SegmentSize  = 2 << (SegmentShift - 1)
)
View Source
const (
	ErrorLevel = "[error]"
	WarnLevel  = "[warn]"
	InfoLevel  = "[info]"
	DebugLevel = "[debug]"

	TimeFormatStr = "2006-01-02 15:04:05"
)
View Source
const LogPrefix = "%s %s %s=>%d : "
View Source
const (
	RequestID = "Pole-Request-ID"
)

Variables

View Source
var EmptyBytes []byte
View Source
var ErrorCannotResponse = fmt.Errorf("cann't send reponse to client")
View Source
var (
	ErrorNotImplement = errors.New("not implement")
)

Functions

func ArrayCopy

func ArrayCopy(src []interface{}, srcPos int32, target []interface{}, targetPos int32, length int32)

func FillTargetElement

func FillTargetElement(array []interface{}, e interface{})

func GetCaller

func GetCaller(depth int) (string, int)

GetCaller 获取调用的代码行

func GetPanicStack

func GetPanicStack() string

GetPanicStack 获取堆栈信息

func Go

func Go(arg interface{}, work func(arg interface{}))

Go 将一个方法异步放入协程池中执行

func InitLogger

func InitLogger(baseDir string) (err error)

InitLogger 初始化一个默认的 Logger

func ResetGlobalLogger

func ResetGlobalLogger(logger Logger)

ResetGlobalLogger 重新设置 Logger

func ToJson

func ToJson(v interface{}) string

ToJson JSON 格式化打印

Types

type BaseTransportClient

type BaseTransportClient struct {
	EventChan chan ConnectEvent
	Watchers  []ConnectEventListener
	Filters   []func(req *ServerRequest)
	CancelFs  []context.CancelFunc
	// contains filtered or unexported fields
}

func (*BaseTransportClient) AddChain

func (btc *BaseTransportClient) AddChain(filter func(req *ServerRequest))

func (*BaseTransportClient) AddWatcher

func (btc *BaseTransportClient) AddWatcher(watcher ConnectEventListener)

func (*BaseTransportClient) DoFilter

func (btc *BaseTransportClient) DoFilter(req *ServerRequest)

type BinarySearchTree

type BinarySearchTree struct {
	// contains filtered or unexported fields
}

func NewBinarySearchTree

func NewBinarySearchTree(compare func(a, b interface{}) int) *BinarySearchTree

func (*BinarySearchTree) Delete

func (bTree *BinarySearchTree) Delete(v interface{})

func (*BinarySearchTree) Find

func (bTree *BinarySearchTree) Find(v interface{}) *node

func (*BinarySearchTree) FindMax

func (bTree *BinarySearchTree) FindMax() *node

func (*BinarySearchTree) FindMin

func (bTree *BinarySearchTree) FindMin() *node

func (*BinarySearchTree) FindNearbyLeft

func (bTree *BinarySearchTree) FindNearbyLeft(v interface{}) *node
					5
				  /   \
				 3     8
				/ \   / \
            1   4 7   9

if you find 5, will return 3, if find 1, will return nil

func (*BinarySearchTree) FindNearbyRight

func (bTree *BinarySearchTree) FindNearbyRight(v string) *node
					5
				  /   \
				 3     8
				/ \   / \
            1   4 7   9

if you find 5, will return 7, if find 8, will return 9

func (*BinarySearchTree) Insert

func (bTree *BinarySearchTree) Insert(v interface{}, replaceOld bool)

func (*BinarySearchTree) Range

func (bTree *BinarySearchTree) Range(call func(n *node))

func (*BinarySearchTree) SeekLevel

func (bTree *BinarySearchTree) SeekLevel() [][]*node

type CSliceOption

type CSliceOption func(opts *CSliceOptions)

type CSliceOptions

type CSliceOptions struct {
	// contains filtered or unexported fields
}

type ClientOption

type ClientOption struct {
	ConnectType ConnectType
	OpenTSL     bool
}

type ClientOptions

type ClientOptions func(opt *ClientOption)

type ClientRpcContext

type ClientRpcContext struct {
	// contains filtered or unexported fields
}

func (*ClientRpcContext) Close

func (rpc *ClientRpcContext) Close() error

func (*ClientRpcContext) Send

func (rpc *ClientRpcContext) Send(resp *ServerRequest)

type ConcurrentMap

type ConcurrentMap struct {
	// contains filtered or unexported fields
}

func NewConcurrentMap

func NewConcurrentMap() *ConcurrentMap

NewConcurrentMap 创建一个新的 ConcurrentMap

func (*ConcurrentMap) Clear

func (cm *ConcurrentMap) Clear()

Clear 清空 map

func (*ConcurrentMap) ComputeIfAbsent

func (cm *ConcurrentMap) ComputeIfAbsent(key interface{}, function func(key interface{}) interface{}) interface{}

ComputeIfAbsent 懒Put操作,通过 key 计算是否存在该 key,如果存在,直接返回,否则执行 function 方法计算对应的 value

func (*ConcurrentMap) Contains

func (cm *ConcurrentMap) Contains(k interface{}) bool

Contains 判断是否包含某个 key

func (*ConcurrentMap) ForEach

func (cm *ConcurrentMap) ForEach(consumer func(k, v interface{}))

ForEach 遍历所有的 key-value

func (*ConcurrentMap) Get

func (cm *ConcurrentMap) Get(k interface{}) interface{}

Get 根据 key 获取一个数据

func (*ConcurrentMap) Keys

func (cm *ConcurrentMap) Keys() []interface{}

Keys 获取所有的 key 数组

func (*ConcurrentMap) Put

func (cm *ConcurrentMap) Put(k, v interface{})

Put 存入一个键值对

func (*ConcurrentMap) Remove

func (cm *ConcurrentMap) Remove(k interface{})

Remove 根据 key 删除一个 key-value

func (*ConcurrentMap) Size

func (cm *ConcurrentMap) Size() int

Size 返回map的元素个数

func (*ConcurrentMap) Values

func (cm *ConcurrentMap) Values() []interface{}

Values 获取所有的 value 数组

type ConcurrentSet

type ConcurrentSet struct {
	// contains filtered or unexported fields
}

func NewConcurrentSet

func NewConcurrentSet() *ConcurrentSet

NewConcurrentSet 创建一个并发的 Set

func (*ConcurrentSet) Add

func (s *ConcurrentSet) Add(value interface{})

func (*ConcurrentSet) Range

func (s *ConcurrentSet) Range(f func(value interface{}))

func (*ConcurrentSet) Remove

func (s *ConcurrentSet) Remove(value interface{})

type ConcurrentSlice

type ConcurrentSlice struct {
	// contains filtered or unexported fields
}

func NewConcurrentSlice

func NewConcurrentSlice(opts ...CSliceOption) *ConcurrentSlice

NewConcurrentSlice 创建并发的 slice

func (*ConcurrentSlice) Add

func (cs *ConcurrentSlice) Add(v interface{})

Add 添加一个元素

func (*ConcurrentSlice) ForEach

func (cs *ConcurrentSlice) ForEach(consumer func(index int32, v interface{}))

ForEach 遍历所有的元素

func (*ConcurrentSlice) Get

func (cs *ConcurrentSlice) Get(index int32) (interface{}, error)

Get 获取某个 index 对应的元素

func (*ConcurrentSlice) GetFirst

func (cs *ConcurrentSlice) GetFirst() interface{}

GetFirst 获取第一个元素

func (*ConcurrentSlice) GetLast

func (cs *ConcurrentSlice) GetLast() interface{}

GetLast 获取最后一个元素

func (*ConcurrentSlice) Remove

func (cs *ConcurrentSlice) Remove(v interface{})

Remove 移除一个元素

func (*ConcurrentSlice) Size

func (cs *ConcurrentSlice) Size() int32

Size 当前slice的大小

type ConnManager

type ConnManager struct {
	// contains filtered or unexported fields
}

func (*ConnManager) AddConnectEventListener

func (cm *ConnManager) AddConnectEventListener(listener ConnectEventListener)

AddConnectEventListener

func (*ConnManager) PutConn

func (cm *ConnManager) PutConn(conn *transport.TCPConn)

PutConn 添加一个 transport.TCPConn

func (*ConnManager) RemoveConn

func (cm *ConnManager) RemoveConn(conn *transport.TCPConn)

RemoveConn 移除某一个 transport.TCPConn

func (*ConnManager) RemoveConnectEventListener

func (cm *ConnManager) RemoveConnectEventListener(listener ConnectEventListener)

RemoveConnectEventListener

type ConnectEvent

type ConnectEvent struct {
	EventType ConnectEventType
	Conn      net.Conn
}

type ConnectEventListener

type ConnectEventListener func(eventType ConnectEventType, con net.Conn)

type ConnectEventType

type ConnectEventType int8
const (
	ConnectEventForConnected ConnectEventType = iota
	ConnectEventForDisConnected
)

func (ConnectEventType) String

func (i ConnectEventType) String() string

type ConnectType

type ConnectType string
const (
	ConnectTypeRSocket ConnectType = "RSocket"
	ConnectWebSocket   ConnectType = "WebSocket"
)

type ConsoleLogSink

type ConsoleLogSink struct {
}

func (*ConsoleLogSink) OnEvent

func (fl *ConsoleLogSink) OnEvent(level LogLevel, format string, args ...interface{})

func (*ConsoleLogSink) Start

func (fl *ConsoleLogSink) Start(opt LogOption)

type CtxFuture

type CtxFuture struct {
	// contains filtered or unexported fields
}

func NewCtxFuture

func NewCtxFuture(ctx context.Context, cancel context.CancelFunc) *CtxFuture

func (*CtxFuture) Cancel

func (f *CtxFuture) Cancel()

type DefaultEndpointRepository

type DefaultEndpointRepository struct {
	// contains filtered or unexported fields
}

EndpointRepository 管理实例的仓库

func (*DefaultEndpointRepository) Put

func (erp *DefaultEndpointRepository) Put(name string, endpoint Endpoint)

Put 为某个服务添加一个服务实例

func (*DefaultEndpointRepository) Remove

func (erp *DefaultEndpointRepository) Remove(name string, endpoint Endpoint)

Remove 从某个服务中移除实例

func (*DefaultEndpointRepository) SelectOne

func (erp *DefaultEndpointRepository) SelectOne(name string) (bool, Endpoint)

SelectOne 选择一个服务的实例进行随机访问

type Endpoint

type Endpoint struct {
	Key  string
	Host string
	Port int32
	// contains filtered or unexported fields
}

Endpoint 实例的链接信息

func (Endpoint) GetKey

func (e Endpoint) GetKey() string

type EndpointRepository

type EndpointRepository interface {
	//SelectOne 选择一个服务的实例进行随机访问
	SelectOne(name string) (bool, Endpoint)
	//Put 为某个服务添加一个服务实例
	Put(name string, endpoint Endpoint)
	//Remove 从某个服务中移除实例
	Remove(name string, endpoint Endpoint)
}

func NewDefaultEndpointRepository

func NewDefaultEndpointRepository() EndpointRepository

NewDefaultEndpointRepository 创建一个默认的 EndpointRepository

type FileLogSink

type FileLogSink struct {
	// contains filtered or unexported fields
}

func (*FileLogSink) OnEvent

func (fl *FileLogSink) OnEvent(level LogLevel, format string, args ...interface{})

func (*FileLogSink) Start

func (fl *FileLogSink) Start(opt LogOption)

type FluxFuture

type FluxFuture struct {
	// contains filtered or unexported fields
}

func NewFluxFuture

func NewFluxFuture(origin flux.Flux) *FluxFuture

func (*FluxFuture) Cancel

func (f *FluxFuture) Cancel()

type Future

type Future interface {
	Cancel()
	// contains filtered or unexported methods
}

异步任务的Future持有,可以通过这个取消一个异步任务的运行

func DelaySchedule

func DelaySchedule(work func(), delay time.Duration) Future

DelaySchedule 利用 time.After 实现的延迟执行

func DoTickerSchedule

func DoTickerSchedule(work func(), delay time.Duration) Future

DoTickerSchedule 利用 time.Ticker 实现的固定周期的执行

func DoTimerSchedule

func DoTimerSchedule(work func(), delay time.Duration, supplier func() time.Duration) Future

DoTimerSchedule 利用 time.Timer 实现的周期执行,其中每次任务执行的间隔是可以动态调整的,通过 supplier func() time.Duration 函数 如果在 work 方法运行期间出现 panic,则无法保证任务可以继续正常执行,因此需要 work 自行 defer 去 recover 住 panic 的 Error 信息并进行处理

type HashTimeWheel

type HashTimeWheel struct {
	// contains filtered or unexported fields
}

func NewTimeWheel

func NewTimeWheel(opt ...Option) *HashTimeWheel

func (*HashTimeWheel) DelayExec

func (htw *HashTimeWheel) DelayExec(f TimeFunction, delay time.Duration) Future

DelayExec 延迟执行一个函数 f TimeFunction 任务函数 delay time.Duration : 执行延迟多久

func (*HashTimeWheel) ScheduleExec

func (htw *HashTimeWheel) ScheduleExec(f TimeFunction, delay, period time.Duration) Future

ScheduleExec 定时调度执行,返回一个 Future,如果不想继续执行任务的话,就直接调用 Future.Cancel() f TimeFunction 任务函数 delay time.Duration : 首次执行延迟多久 period time.Duration : 每次任务间隔多久执行

func (*HashTimeWheel) Start

func (htw *HashTimeWheel) Start()

Start 必须显示的执行此方法来开启时间轮

func (*HashTimeWheel) Stop

func (htw *HashTimeWheel) Stop()

Stop 关闭一个时间轮,所有的任务都不会在处理

type Job

type Job struct {
	F   func(arg interface{})
	Arg interface{}
}

type LogEvent

type LogEvent struct {
	Level  LogLevel
	Format string
	Args   []interface{}
}

type LogLevel

type LogLevel int32
const (
	Debug LogLevel = iota
	Info
	Warn
	Error
)

type LogOption

type LogOption struct {
	Name         string
	RollingCycle time.Duration
}

type LogOptions

type LogOptions func(opt *LogOption)

type LogSink

type LogSink interface {
	//Start 开启 LogSink
	Start(opt LogOption)

	//OnEvent 处理所有的 LogEvent 事件
	OnEvent(level LogLevel, format string, args ...interface{})
}

type Logger

type Logger interface {
	//SetLevel 设置日志等级
	SetLevel(level LogLevel)

	//Debug debug 级别日志打印
	Debug(format string, args ...interface{})

	//Info info 级别日志打印
	Info(format string, args ...interface{})

	//Warn warn 级别日志打印
	Warn(format string, args ...interface{})

	//Error error 级别日志打印
	Error(format string, args ...interface{})

	//Close 关闭一个 Logger
	Close()

	//Sink 获取日志的 LogSink 对象,Sink 是实际日志输出的接口
	Sink() LogSink
}
var RpcLog Logger = NewTestLogger("pole-rpc-test")

func NewLogger

func NewLogger(options ...LogOptions) (Logger, error)

NewLogger 创建一个 Logger,并设置日志的文件名为

func NewLoggerWithSink

func NewLoggerWithSink(sink LogSink, options ...LogOptions) Logger

NewLoggerWithSink 构建一个 Logger, 但是日志的真实输出的 LogSink 可以自定义实现

func NewTestLogger

func NewTestLogger(name string) Logger

NewTestLogger 构建测试用的 Logger, 打印在控制台上

type MonoFuture

type MonoFuture struct {
	// contains filtered or unexported fields
}

func NewMonoFuture

func NewMonoFuture(origin mono.Mono) *MonoFuture

func (*MonoFuture) Cancel

func (f *MonoFuture) Cancel()

func (*MonoFuture) IsDone

func (f *MonoFuture) IsDone() bool

type Option

type Option func(opts *Options)

type Options

type Options struct {
	Tick        int64
	SlotNum     int32
	Interval    time.Duration
	MaxDealTask int32
}

type RSocketClient

type RSocketClient struct {
	// contains filtered or unexported fields
}

func (*RSocketClient) AddChain

func (c *RSocketClient) AddChain(filter func(req *ServerRequest))

func (*RSocketClient) CheckConnection

func (c *RSocketClient) CheckConnection(endpoint Endpoint) (bool, error)

func (*RSocketClient) Close

func (c *RSocketClient) Close() error

func (*RSocketClient) RegisterConnectEventWatcher

func (c *RSocketClient) RegisterConnectEventWatcher(watcher func(eventType ConnectEventType, conn net.Conn))

func (*RSocketClient) Request

func (c *RSocketClient) Request(ctx context.Context, endpoint Endpoint, req *ServerRequest) (*ServerResponse, error)

func (*RSocketClient) RequestChannel

func (c *RSocketClient) RequestChannel(ctx context.Context, endpoint Endpoint, call UserCall) (RpcClientContext, error)

type RSocketDispatcher

type RSocketDispatcher struct {
	// contains filtered or unexported fields
}

func (*RSocketDispatcher) FindReqChannelHandler

func (r *RSocketDispatcher) FindReqChannelHandler(key string) RequestChannelHandler

func (*RSocketDispatcher) FindReqRespHandler

func (r *RSocketDispatcher) FindReqRespHandler(key string) RequestResponseHandler

type RSocketServer

type RSocketServer struct {
	IsReady chan int8

	ConnMgr *ConnManager
	ErrChan chan error
	// contains filtered or unexported fields
}

func (*RSocketServer) AddConnectEventListener

func (rs *RSocketServer) AddConnectEventListener(listener ConnectEventListener)

AddConnectEventListener

func (*RSocketServer) RegisterChannelRequestHandler

func (rs *RSocketServer) RegisterChannelRequestHandler(funName string, handler RequestChannelHandler)

func (*RSocketServer) RegisterRequestHandler

func (rs *RSocketServer) RegisterRequestHandler(funName string, handler RequestResponseHandler)

func (*RSocketServer) RemoveConnectEventListener

func (rs *RSocketServer) RemoveConnectEventListener(listener ConnectEventListener)

RemoveConnectEventListener

type RequestChannelHandler

type RequestChannelHandler func(cxt context.Context, rpcCtx RpcServerContext)

type RequestResponseHandler

type RequestResponseHandler func(cxt context.Context, rpcCtx RpcServerContext)

type RoutinePool

type RoutinePool struct {
	// contains filtered or unexported fields
}
var DefaultScheduler *RoutinePool = NewRoutinePool(16, 128)

func NewRoutinePool

func NewRoutinePool(size, cacheSize int32) *RoutinePool

NewRoutinePool 构建一个新的协程池 size int32 协程数量 cacheSize 任务缓存通道大小

func (*RoutinePool) Close

func (rp *RoutinePool) Close()

Close 关闭协程池

func (*RoutinePool) Resize

func (rp *RoutinePool) Resize(newSize int32)

Resize 重新调整协程池的大小

func (*RoutinePool) SetPanicHandler

func (rp *RoutinePool) SetPanicHandler(panicHandler func(err interface{}))

SetPanicHandler 设置出现panic时的处理函数

func (*RoutinePool) Submit

func (rp *RoutinePool) Submit(task Job)

Submit(task func()) 提交一个函数任务

type RpcClientContext

type RpcClientContext interface {
	Send(resp *ServerRequest)

	Close() error
}

type RpcServerContext

type RpcServerContext interface {
	GetReq() *ServerRequest

	Send(resp *ServerResponse) error

	Complete()
}

type Segment

type Segment struct {
	// contains filtered or unexported fields
}

func NewSegment

func NewSegment(owner *SegmentList) *Segment

func (*Segment) Add

func (s *Segment) Add(e interface{})

func (*Segment) AddAll

func (s *Segment) AddAll(src []interface{}, srcPos, size int32)

func (*Segment) Cap

func (s *Segment) Cap() int32

func (*Segment) Clear

func (s *Segment) Clear()

func (*Segment) Get

func (s *Segment) Get(index int32) (interface{}, error)

func (*Segment) IsEmpty

func (s *Segment) IsEmpty() bool

func (*Segment) IsReachEnd

func (s *Segment) IsReachEnd() bool

func (*Segment) PeekFirst

func (s *Segment) PeekFirst() interface{}

func (*Segment) PeekLast

func (s *Segment) PeekLast() interface{}

func (*Segment) RemoveFromFirst

func (s *Segment) RemoveFromFirst(toIndex int32) int32

func (*Segment) RemoveFromFirstWhen

func (s *Segment) RemoveFromFirstWhen(predicate func(v interface{}) bool) int32

func (*Segment) RemoveFromLastWhen

func (s *Segment) RemoveFromLastWhen(predicate func(v interface{}) bool) int32

func (*Segment) Size

func (s *Segment) Size() int32

type SegmentList

type SegmentList struct {
	// contains filtered or unexported fields
}

func NewSegmentList

func NewSegmentList() *SegmentList

NewSegmentList

func (*SegmentList) Add

func (sl *SegmentList) Add(e interface{})

func (*SegmentList) AddAll

func (sl *SegmentList) AddAll(arr []interface{})

func (*SegmentList) Clear

func (sl *SegmentList) Clear()

func (*SegmentList) Get

func (sl *SegmentList) Get(index int32) (interface{}, error)

func (*SegmentList) GetFirst

func (sl *SegmentList) GetFirst() *Segment

func (*SegmentList) GetLast

func (sl *SegmentList) GetLast() *Segment

func (*SegmentList) IsEmpty

func (sl *SegmentList) IsEmpty() bool

func (*SegmentList) PeekFirst

func (sl *SegmentList) PeekFirst() interface{}

func (*SegmentList) PeekLast

func (sl *SegmentList) PeekLast() interface{}

func (*SegmentList) RemoveFromFirst

func (sl *SegmentList) RemoveFromFirst(toIndex int32)

func (*SegmentList) RemoveFromFirstWhen

func (sl *SegmentList) RemoveFromFirstWhen(predicate func(v interface{}) bool)

func (*SegmentList) RemoveFromLastWhen

func (sl *SegmentList) RemoveFromLastWhen(predicate func(v interface{}) bool)

func (*SegmentList) SegmentsSize

func (sl *SegmentList) SegmentsSize() int32

func (*SegmentList) Size

func (sl *SegmentList) Size() int32

type ServerOption

type ServerOption struct {
	ConnectType ConnectType
	Label       string
	Port        int32
	OpenTSL     bool
}

type ServerOptions

type ServerOptions func(opt *ServerOption)

type ServerRequest

type ServerRequest struct {
	FunName   string            `protobuf:"bytes,1,opt,name=FunName,proto3" json:"FunName,omitempty"`
	Body      *any.Any          `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"`
	RequestId string            `protobuf:"bytes,3,opt,name=requestId,proto3" json:"requestId,omitempty"`
	Header    map[string]string `` /* 153-byte string literal not displayed */
	// contains filtered or unexported fields
}

func (*ServerRequest) Descriptor deprecated

func (*ServerRequest) Descriptor() ([]byte, []int)

Deprecated: Use ServerRequest.ProtoReflect.Descriptor instead.

func (*ServerRequest) GetBody

func (x *ServerRequest) GetBody() *any.Any

func (*ServerRequest) GetFunName

func (x *ServerRequest) GetFunName() string

func (*ServerRequest) GetHeader

func (x *ServerRequest) GetHeader() map[string]string

func (*ServerRequest) GetRequestId

func (x *ServerRequest) GetRequestId() string

func (*ServerRequest) ProtoMessage

func (*ServerRequest) ProtoMessage()

func (*ServerRequest) ProtoReflect

func (x *ServerRequest) ProtoReflect() protoreflect.Message

func (*ServerRequest) Reset

func (x *ServerRequest) Reset()

func (*ServerRequest) String

func (x *ServerRequest) String() string

type ServerResponse

type ServerResponse struct {
	FunName   string            `protobuf:"bytes,1,opt,name=FunName,proto3" json:"FunName,omitempty"`
	RequestId string            `protobuf:"bytes,2,opt,name=requestId,proto3" json:"requestId,omitempty"`
	Header    map[string]string `` /* 153-byte string literal not displayed */
	Code      int32             `protobuf:"varint,3,opt,name=code,proto3" json:"code,omitempty"`
	Body      *any.Any          `protobuf:"bytes,4,opt,name=body,proto3" json:"body,omitempty"`
	Msg       string            `protobuf:"bytes,6,opt,name=msg,proto3" json:"msg,omitempty"`
	// contains filtered or unexported fields
}

func (*ServerResponse) Descriptor deprecated

func (*ServerResponse) Descriptor() ([]byte, []int)

Deprecated: Use ServerResponse.ProtoReflect.Descriptor instead.

func (*ServerResponse) GetBody

func (x *ServerResponse) GetBody() *any.Any

func (*ServerResponse) GetCode

func (x *ServerResponse) GetCode() int32

func (*ServerResponse) GetFunName

func (x *ServerResponse) GetFunName() string

func (*ServerResponse) GetHeader

func (x *ServerResponse) GetHeader() map[string]string

func (*ServerResponse) GetMsg

func (x *ServerResponse) GetMsg() string

func (*ServerResponse) GetRequestId

func (x *ServerResponse) GetRequestId() string

func (*ServerResponse) ProtoMessage

func (*ServerResponse) ProtoMessage()

func (*ServerResponse) ProtoReflect

func (x *ServerResponse) ProtoReflect() protoreflect.Message

func (*ServerResponse) Reset

func (x *ServerResponse) Reset()

func (*ServerResponse) String

func (x *ServerResponse) String() string

type Set

type Set struct {
	// contains filtered or unexported fields
}

func NewSet

func NewSet() *Set

NewSet 创建一个非协程安全的Set

func NewSetWithValues

func NewSetWithValues(arr ...interface{}) *Set

NewSetWithValues 根据给的数据创建一个 Set

func (*Set) Add

func (s *Set) Add(value interface{})

Add 添加元素

func (*Set) AddAll

func (s *Set) AddAll(values ...interface{})

AddAll 批量添加元素

func (*Set) AddAllWithSet

func (s *Set) AddAllWithSet(set *Set)

AddAllWithSet

func (*Set) Contain

func (s *Set) Contain(value interface{}) bool

Contain

func (*Set) IsEmpty

func (s *Set) IsEmpty() bool

IsEmpty 判断 set 是否为空

func (*Set) Range

func (s *Set) Range(f func(value interface{}))

Range 遍历 Set 里面的所有元素

func (*Set) Remove

func (s *Set) Remove(value interface{})

Remove

func (*Set) RemoveAll

func (s *Set) RemoveAll(arr []interface{})

RemoveAll

func (*Set) RemoveAllWithSet

func (s *Set) RemoveAllWithSet(set *Set)

RemoveAllWithSet 移除某些 set 里面的数据

func (*Set) RetainAll

func (s *Set) RetainAll(arr ...interface{})

RetainAll

func (*Set) RetainAllWithSet

func (s *Set) RetainAllWithSet(set *Set)

RetainAllWithSet

func (*Set) Size

func (s *Set) Size() int

Size

func (*Set) ToSlice

func (s *Set) ToSlice() []interface{}

ToSlice 从 Set 转为 Slice

type TimeFunction

type TimeFunction interface {
	Run()
}

type TransportClient

type TransportClient interface {
	//RegisterConnectEventWatcher 客户端监听和服务端的会话的状态
	RegisterConnectEventWatcher(watcher func(eventType ConnectEventType, conn net.Conn))
	//CheckConnection 检查链接
	CheckConnection(endpoint Endpoint) (bool, error)
	//AddChain 添加请求处理链
	AddChain(filter func(req *ServerRequest))
	//Request 发起 request-response 请求
	Request(ctx context.Context, endpoint Endpoint, req *ServerRequest) (*ServerResponse, error)
	//RequestChannel 发起 request-channel 请求
	RequestChannel(ctx context.Context, endpoint Endpoint, call UserCall) (RpcClientContext, error)
	//Close 关闭客户端
	Close() error
}

func NewTransportClient

func NewTransportClient(options ...ClientOptions) (TransportClient, error)

NewTransportClient 构建一个 TransportClient

type TransportServer

type TransportServer interface {
	//AddConnectEventListener 服务端的监听和客户端的会话的状态
	AddConnectEventListener(listener ConnectEventListener)
	//RemoveConnectEventListener
	RemoveConnectEventListener(listener ConnectEventListener)
	//RegisterRequestHandler 注册一个 Request-Response的Server端处理者,名称为name
	RegisterRequestHandler(funName string, handler RequestResponseHandler)
	//RegisterChannelRequestHandler 注册一个 Request-Channel的Server端处理者,名称为name
	RegisterChannelRequestHandler(funName string, handler RequestChannelHandler)
}

func NewTransportServer

func NewTransportServer(ctx context.Context, options ...ServerOptions) (TransportServer, error)

NewTransportServer 构建一个 TransportServer

type TreeMap

type TreeMap struct {
	BinarySearchTree
	// contains filtered or unexported fields
}

TreeMap 后面要优化成为红黑树,或者较为简单的 AVL 树

func NewTreeMap

func NewTreeMap(compare func(a, b interface{}) int) *TreeMap

NewTreeMap 创建一个 TreeMap

func (*TreeMap) Clear

func (tMap *TreeMap) Clear()

func (*TreeMap) ComputeIfAbsent

func (tMap *TreeMap) ComputeIfAbsent(key interface{}, supplier func() interface{}) interface{}

func (*TreeMap) Get

func (tMap *TreeMap) Get(key interface{}) interface{}

func (*TreeMap) IsEmpty

func (tMap *TreeMap) IsEmpty() bool

func (*TreeMap) Put

func (tMap *TreeMap) Put(key, val interface{})

Put 添加一个 key-value 键值对

func (*TreeMap) RangeEntry

func (tMap *TreeMap) RangeEntry(consumer func(k, v interface{}))

func (*TreeMap) RangeLessThan

func (tMap *TreeMap) RangeLessThan(key interface{}, consumer func(k, v interface{}))

func (*TreeMap) RemoveKey

func (tMap *TreeMap) RemoveKey(key interface{})

func (*TreeMap) Size

func (tMap *TreeMap) Size() int64

type UserCall

type UserCall func(resp *ServerResponse, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL