prc

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2024 License: MIT Imports: 23 Imported by: 1

Documentation

Index

Constants

View Source
const (
	LocalhostPhysicalAddress = "localhost" // 无网络本地
)

Variables

View Source
var File_process_id_proto protoreflect.FileDescriptor
View Source
var File_shared_proto protoreflect.FileDescriptor
View Source
var Shared_ServiceDesc = grpc.ServiceDesc{
	ServiceName: "prc.Shared",
	HandlerType: (*SharedServer)(nil),
	Methods:     []grpc.MethodDesc{},
	Streams: []grpc.StreamDesc{
		{
			StreamName:    "StreamHandler",
			Handler:       _Shared_StreamHandler_Handler,
			ServerStreams: true,
			ClientStreams: true,
		},
	},
	Metadata: "shared.proto",
}

Shared_ServiceDesc is the grpc.ServiceDesc for Shared service. It's only intended for direct use with grpc.RegisterService, and not to be introspected or modified (even as a copy)

Functions

func RegisterSharedServer

func RegisterSharedServer(s grpc.ServiceRegistrar, srv SharedServer)

Types

type BatchDeliveryMessage

type BatchDeliveryMessage struct {
	Messages []*DeliveryMessage `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"`
	// contains filtered or unexported fields
}

func (*BatchDeliveryMessage) Descriptor deprecated

func (*BatchDeliveryMessage) Descriptor() ([]byte, []int)

Deprecated: Use BatchDeliveryMessage.ProtoReflect.Descriptor instead.

func (*BatchDeliveryMessage) GetMessages

func (x *BatchDeliveryMessage) GetMessages() []*DeliveryMessage

func (*BatchDeliveryMessage) ProtoMessage

func (*BatchDeliveryMessage) ProtoMessage()

func (*BatchDeliveryMessage) ProtoReflect

func (x *BatchDeliveryMessage) ProtoReflect() protoreflect.Message

func (*BatchDeliveryMessage) Reset

func (x *BatchDeliveryMessage) Reset()

func (*BatchDeliveryMessage) String

func (x *BatchDeliveryMessage) String() string

type DeliveryMessage

type DeliveryMessage struct {
	Sender      *ProcessId `protobuf:"bytes,1,opt,name=sender,proto3" json:"sender,omitempty"`                              // 发送方
	Receiver    *ProcessId `protobuf:"bytes,2,opt,name=receiver,proto3" json:"receiver,omitempty"`                          // 接收方
	MessageType string     `protobuf:"bytes,3,opt,name=message_type,json=messageType,proto3" json:"message_type,omitempty"` // 消息类型名称
	MessageData []byte     `protobuf:"bytes,4,opt,name=message_data,json=messageData,proto3" json:"message_data,omitempty"` // 消息数据
	System      bool       `protobuf:"varint,5,opt,name=system,proto3" json:"system,omitempty"`                             // 是否是系统消息
	// contains filtered or unexported fields
}

func (*DeliveryMessage) Descriptor deprecated

func (*DeliveryMessage) Descriptor() ([]byte, []int)

Deprecated: Use DeliveryMessage.ProtoReflect.Descriptor instead.

func (*DeliveryMessage) GetMessageData

func (x *DeliveryMessage) GetMessageData() []byte

func (*DeliveryMessage) GetMessageType

func (x *DeliveryMessage) GetMessageType() string

func (*DeliveryMessage) GetReceiver

func (x *DeliveryMessage) GetReceiver() *ProcessId

func (*DeliveryMessage) GetSender

func (x *DeliveryMessage) GetSender() *ProcessId

func (*DeliveryMessage) GetSystem

func (x *DeliveryMessage) GetSystem() bool

func (*DeliveryMessage) ProtoMessage

func (*DeliveryMessage) ProtoMessage()

func (*DeliveryMessage) ProtoReflect

func (x *DeliveryMessage) ProtoReflect() protoreflect.Message

func (*DeliveryMessage) Reset

func (x *DeliveryMessage) Reset()

func (*DeliveryMessage) String

func (x *DeliveryMessage) String() string

type Farewell

type Farewell struct {
	Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"`
	// contains filtered or unexported fields
}

func (*Farewell) Descriptor deprecated

func (*Farewell) Descriptor() ([]byte, []int)

Deprecated: Use Farewell.ProtoReflect.Descriptor instead.

func (*Farewell) GetAddress

func (x *Farewell) GetAddress() string

func (*Farewell) ProtoMessage

func (*Farewell) ProtoMessage()

func (*Farewell) ProtoReflect

func (x *Farewell) ProtoReflect() protoreflect.Message

func (*Farewell) Reset

func (x *Farewell) Reset()

func (*Farewell) String

func (x *Farewell) String() string

type FunctionalGRPCLaunchBeforeHook

type FunctionalGRPCLaunchBeforeHook func(server *grpc.Server)

FunctionalGRPCLaunchBeforeHook 是一个函数式的GRPC启动前的钩子

func (FunctionalGRPCLaunchBeforeHook) OnGRPCLaunchBefore

func (f FunctionalGRPCLaunchBeforeHook) OnGRPCLaunchBefore(server *grpc.Server)

OnGRPCLaunchBefore GRPC启动前

type FunctionalPhysicalAddressResolver

type FunctionalPhysicalAddressResolver func(id *ProcessId) Process

FunctionalPhysicalAddressResolver 是一个函数类型的物理地址解析器,它可以通过一个函数来实现 Resolve 方法。

func (FunctionalPhysicalAddressResolver) Resolve

type FunctionalResourceControllerConfigurator

type FunctionalResourceControllerConfigurator func(config *ResourceControllerConfiguration)

FunctionalResourceControllerConfigurator 是用于配置 ResourceController 的配置器

func (FunctionalResourceControllerConfigurator) Configure

Configure 配置 ActorSystem

type FunctionalShareOpenedHook

type FunctionalShareOpenedHook func(target PhysicalAddress)

FunctionalShareOpenedHook 是一个函数式的共享打开后的钩子

func (FunctionalShareOpenedHook) OnShareOpened

func (f FunctionalShareOpenedHook) OnShareOpened(target PhysicalAddress)

OnShareOpened 共享打开后

type FunctionalSharedClosedHook

type FunctionalSharedClosedHook func(target PhysicalAddress)

FunctionalSharedClosedHook 是一个函数式的共享关闭后的钩子

func (FunctionalSharedClosedHook) OnShareClosed

func (f FunctionalSharedClosedHook) OnShareClosed(target PhysicalAddress)

OnShareClosed 共享关闭后

type FunctionalSharedConfigurator

type FunctionalSharedConfigurator func(config *SharedConfiguration)

FunctionalSharedConfigurator 函数式共享配置器

func (FunctionalSharedConfigurator) Configure

Configure 配置

type FunctionalSharedStartHook

type FunctionalSharedStartHook func()

FunctionalSharedStartHook 是一个函数式的共享启动时的钩子

func (FunctionalSharedStartHook) OnSharedStart

func (f FunctionalSharedStartHook) OnSharedStart()

OnSharedStart 共享启动

type GRPCLaunchBeforeHook

type GRPCLaunchBeforeHook interface {
	// OnGRPCLaunchBefore GRPC启动前
	OnGRPCLaunchBefore(server *grpc.Server)
}

GRPCLaunchBeforeHook GRPC启动前的钩子

type Handshake

type Handshake struct {
	Address string `protobuf:"bytes,1,opt,name=address,proto3" json:"address,omitempty"`
	// contains filtered or unexported fields
}

func (*Handshake) Descriptor deprecated

func (*Handshake) Descriptor() ([]byte, []int)

Deprecated: Use Handshake.ProtoReflect.Descriptor instead.

func (*Handshake) GetAddress

func (x *Handshake) GetAddress() string

func (*Handshake) ProtoMessage

func (*Handshake) ProtoMessage()

func (*Handshake) ProtoReflect

func (x *Handshake) ProtoReflect() protoreflect.Message

func (*Handshake) Reset

func (x *Handshake) Reset()

func (*Handshake) String

func (x *Handshake) String() string

type LogicalAddress

type LogicalAddress = string

LogicalAddress 逻辑地址是用于标识内容的本地内部地址

type Message

type Message = any

Message 消息是用于传递的数据

type PhysicalAddress

type PhysicalAddress = string

PhysicalAddress 物理地址是用于标识内容的网络地址

type PhysicalAddressResolver

type PhysicalAddressResolver interface {
	Resolve(id *ProcessId) Process
}

PhysicalAddressResolver 物理地址解析器是用来解析非本地地址的接口,它应返回一个与远程建立连接并能够与之交互的进程。

type Process

type Process interface {
	// Initialize 初始化进程
	Initialize(rc *ResourceController, id *ProcessId)

	// DeliveryUserMessage 投递用户消息,用户消息应该是优先级低于系统消息的消息,具体情况根据进程实现方式有所不同。
	//   - 通常情况下,该 sender 均是作为发送方的存在,不排除一些特殊情况,需要重定向发送方以确保消息响应时候被到目标进程,那么 forward 将表示重定向后的发送方
	//   - receiver 在大多数情况下应该都表示进程本身,在一些跨节点转发的情况下,可能会是其他进程
	DeliveryUserMessage(receiver, sender, forward *ProcessId, message Message)

	// DeliverySystemMessage 投递具有最高优先级的系统消息,具体情况根据进程实现方式有所不同。
	//   - 通常情况下,该 sender 均是作为发送方的存在,不排除一些特殊情况,需要重定向发送方以确保消息响应时候被到目标进程,那么 forward 将表示重定向后的发送方
	//   - receiver 在大多数情况下应该都表示进程本身,在一些跨节点转发的情况下,可能会是其他进程
	DeliverySystemMessage(receiver, sender, forward *ProcessId, message Message)

	// IsTerminated 告知进程是否已经终止,如果已死,那么该进程的引用将会尝试将缓存更新为存活的进程
	IsTerminated() bool

	// Terminate 终止进程,通常情况下,该方法将会向进程投递一个包含发起方的终止消息,具体情况根据进程实现方式有所不同。
	//  - 该函数在进程从资源控制器中取消注册时将被调用
	Terminate(source *ProcessId)
}

type ProcessId

type ProcessId struct {
	LogicalAddress  string `protobuf:"bytes,1,opt,name=logical_address,json=logicalAddress,proto3" json:"logical_address,omitempty"`
	PhysicalAddress string `protobuf:"bytes,2,opt,name=physical_address,json=physicalAddress,proto3" json:"physical_address,omitempty"`
	// contains filtered or unexported fields
}

func NewProcessId

func NewProcessId(physicalAddress PhysicalAddress, logicalAddress LogicalAddress) *ProcessId

func (*ProcessId) Clone

func (pid *ProcessId) Clone() *ProcessId

Clone 克隆进程 ID

func (*ProcessId) Derivation

func (pid *ProcessId) Derivation(name string) *ProcessId

Derivation 衍生一个新的进程 Id

func (*ProcessId) Descriptor deprecated

func (*ProcessId) Descriptor() ([]byte, []int)

Deprecated: Use ProcessId.ProtoReflect.Descriptor instead.

func (*ProcessId) Equal

func (pid *ProcessId) Equal(id *ProcessId) bool

Equal 比较两个进程 ID 是否相同

func (*ProcessId) GetLogicalAddress

func (pid *ProcessId) GetLogicalAddress() LogicalAddress

GetLogicalAddress 加载进程 ID 的逻辑地址,在任何时候都应该通过该函数获取逻辑地址

func (*ProcessId) GetPhysicalAddress

func (pid *ProcessId) GetPhysicalAddress() PhysicalAddress

GetPhysicalAddress 加载进程 ID 的物理地址,在任何时候都应该通过该函数获取物理地址

func (*ProcessId) ProtoMessage

func (*ProcessId) ProtoMessage()

func (*ProcessId) ProtoReflect

func (x *ProcessId) ProtoReflect() protoreflect.Message

func (*ProcessId) Reset

func (x *ProcessId) Reset()

func (*ProcessId) String

func (x *ProcessId) String() string

func (*ProcessId) URL

func (pid *ProcessId) URL() *url.URL

URL 获取进程 Id 的 URL

type ResourceController

type ResourceController struct {
	// contains filtered or unexported fields
}

ResourceController 是一个支持分布式、集群架构的资源控制器,它将所有资源视为进程(Process),进程之间通过 ProcessId 进行通信。

func NewResourceController

func NewResourceController(configurator ...ResourceControllerConfigurator) *ResourceController

NewResourceController 创建一个新的资源控制器

func (*ResourceController) Belong

func (rc *ResourceController) Belong(id *ProcessId) bool

Belong 检查 id 是否属于该资源控制器。该函数并不检查进程是否存在,只检查进程的归属关系。

func (*ResourceController) GetPhysicalAddress

func (rc *ResourceController) GetPhysicalAddress() PhysicalAddress

GetPhysicalAddress 获取资源控制器的物理地址

func (*ResourceController) GetProcess

func (rc *ResourceController) GetProcess(id *ProcessId) (process Process)

GetProcess 获取一个进程

func (*ResourceController) Register

func (rc *ResourceController) Register(id *ProcessId, process Process) (pid *ProcessId, exist bool)

Register 向资源控制器注册一个进程,如果进程已存在,将会返回已有的 ProcessId 和一个标识是否已存在的状态信息,这对于进程的重复注册检测是非常有用的

func (*ResourceController) RegisterResolver

func (rc *ResourceController) RegisterResolver(resolver ...PhysicalAddressResolver)

RegisterResolver 注册用于物理地址解析的解析器,解析器应返回一个可用进程。

  • 解析器需要依赖于外部的进程管理,本身不会涉及进程的注册与反注册

func (*ResourceController) Unregister

func (rc *ResourceController) Unregister(killer *ProcessId, target *ProcessId)

Unregister 从资源控制器注销一个进程

type ResourceControllerConfiguration

type ResourceControllerConfiguration struct {
	// contains filtered or unexported fields
}

ResourceControllerConfiguration 是 ActorSystem 的配置

func (*ResourceControllerConfiguration) WithLoggerProvider

WithLoggerProvider 设置日志提供者

func (*ResourceControllerConfiguration) WithNotFoundSubstitute

func (c *ResourceControllerConfiguration) WithNotFoundSubstitute(substitute Process) *ResourceControllerConfiguration

WithNotFoundSubstitute 设置未找到处理器的替代处理器

func (*ResourceControllerConfiguration) WithPhysicalAddress

WithPhysicalAddress 设置物理地址

type ResourceControllerConfigurator

type ResourceControllerConfigurator interface {
	// Configure 配置 ActorSystem
	Configure(config *ResourceControllerConfiguration)
}

ResourceControllerConfigurator 是用于配置 ResourceController 的配置器

type ShareOpenedHook

type ShareOpenedHook interface {
	// OnShareOpened 共享打开后
	OnShareOpened(target PhysicalAddress)
}

ShareOpenedHook 共享打开后的钩子

type Shared

type Shared struct {
	// contains filtered or unexported fields
}

Shared 是用于对资源控制器进行网络共享的数据结构,它任需要主动的向特定已知的资源管理器发起交互。

func NewShared

func NewShared(rc *ResourceController, configurator ...SharedConfigurator) *Shared

NewShared 创建一个资源控制器的共享

func (*Shared) Close

func (s *Shared) Close(err ...error)

Close 关闭共享,可指定错误进行关闭,如果指定错误并且存在对运行时的错误处理器,那么将执行

func (*Shared) Dead

func (s *Shared) Dead()

Dead 设置共享彻底关闭,将无法再继续重启

func (*Shared) GetCodec

func (s *Shared) GetCodec() codec.Codec

GetCodec 获取网络共享数据的编解码器

func (*Shared) GetResourceController

func (s *Shared) GetResourceController() *ResourceController

GetResourceController 获取资源控制器

func (*Shared) Share

func (s *Shared) Share() error

Share 共享资源控制器,开始监听网络活动

type SharedClient

type SharedClient interface {
	StreamHandler(ctx context.Context, opts ...grpc.CallOption) (Shared_StreamHandlerClient, error)
}

SharedClient is the client API for Shared service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.

func NewSharedClient

func NewSharedClient(cc grpc.ClientConnInterface) SharedClient

type SharedClosedHook

type SharedClosedHook interface {
	// OnShareClosed 共享关闭后
	OnShareClosed(target PhysicalAddress)
}

SharedClosedHook 共享关闭后的钩子

type SharedConfiguration

type SharedConfiguration struct {
	// contains filtered or unexported fields
}

SharedConfiguration 共享配置

func (*SharedConfiguration) WithCodec

func (c *SharedConfiguration) WithCodec(codec codec.Codec) *SharedConfiguration

WithCodec 设置编解码器

func (*SharedConfiguration) WithConsecutiveRestartLimit

func (c *SharedConfiguration) WithConsecutiveRestartLimit(limit int) *SharedConfiguration

WithConsecutiveRestartLimit 设置连续重启限制,当 limit > 0 且连续重启失败到达 limit 时,将进行停止,而非继续重启。

  • 如果需要控制重启间隔可使用 WithRestartInterval 或 WithFixedRestartInterval 方法。

func (*SharedConfiguration) WithFixedRestartInterval

func (c *SharedConfiguration) WithFixedRestartInterval(interval time.Duration) *SharedConfiguration

WithFixedRestartInterval 使用固定间隔设置重启间隔。

  • 该配置将会覆盖 WithRestartInterval 方法的设置。

func (*SharedConfiguration) WithGRPCLaunchBeforeHooks

func (c *SharedConfiguration) WithGRPCLaunchBeforeHooks(hooks ...GRPCLaunchBeforeHook) *SharedConfiguration

WithGRPCLaunchBeforeHooks 设置 GRPC 服务器钩子,该方法将在创建 GRPC 服务器后调用

func (*SharedConfiguration) WithRestartInterval

func (c *SharedConfiguration) WithRestartInterval(baseDelay, maxDelay time.Duration) *SharedConfiguration

WithRestartInterval 使用退避指数设置重启间隔,maxRetries 为最大重试次数,baseDelay 为基础延迟,maxDelay 为最大延迟。

  • 该配置将会覆盖 WithFixedRestartInterval 方法的设置。

func (*SharedConfiguration) WithRuntimeErrorHandler

func (c *SharedConfiguration) WithRuntimeErrorHandler(handler ErrorPolicyDecisionHandler) *SharedConfiguration

WithRuntimeErrorHandler 设置运行时错误处理器

func (*SharedConfiguration) WithShareClosedHooks

func (c *SharedConfiguration) WithShareClosedHooks(hooks ...SharedClosedHook) *SharedConfiguration

WithShareClosedHooks 设置共享连接关闭时钩子

func (*SharedConfiguration) WithShareOpenedHooks

func (c *SharedConfiguration) WithShareOpenedHooks(hooks ...ShareOpenedHook) *SharedConfiguration

WithShareOpenedHooks 设置共享连接打开时钩子

func (*SharedConfiguration) WithSharedHook

func (c *SharedConfiguration) WithSharedHook(hook SharedStartHook) *SharedConfiguration

WithSharedHook 设置共享钩子

func (*SharedConfiguration) WithUnknownReceiverRedirect

func (c *SharedConfiguration) WithUnknownReceiverRedirect(redirect func(message Message) *ProcessId) *SharedConfiguration

WithUnknownReceiverRedirect 设置未知接收者重定向

type SharedConfigurator

type SharedConfigurator interface {
	// Configure 配置
	Configure(config *SharedConfiguration)
}

SharedConfigurator 共享配置器

type SharedErrorMessage

type SharedErrorMessage struct {
	Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
	// contains filtered or unexported fields
}

func (*SharedErrorMessage) Descriptor deprecated

func (*SharedErrorMessage) Descriptor() ([]byte, []int)

Deprecated: Use SharedErrorMessage.ProtoReflect.Descriptor instead.

func (*SharedErrorMessage) GetMessage

func (x *SharedErrorMessage) GetMessage() string

func (*SharedErrorMessage) ProtoMessage

func (*SharedErrorMessage) ProtoMessage()

func (*SharedErrorMessage) ProtoReflect

func (x *SharedErrorMessage) ProtoReflect() protoreflect.Message

func (*SharedErrorMessage) Reset

func (x *SharedErrorMessage) Reset()

func (*SharedErrorMessage) String

func (x *SharedErrorMessage) String() string

type SharedMessage

type SharedMessage struct {

	// Types that are assignable to MessageType:
	//
	//	*SharedMessage_Handshake
	//	*SharedMessage_Farewell
	//	*SharedMessage_DeliveryMessage
	//	*SharedMessage_BatchDeliveryMessage
	MessageType isSharedMessage_MessageType `protobuf_oneof:"message_type"`
	// contains filtered or unexported fields
}

func (*SharedMessage) Descriptor deprecated

func (*SharedMessage) Descriptor() ([]byte, []int)

Deprecated: Use SharedMessage.ProtoReflect.Descriptor instead.

func (*SharedMessage) GetBatchDeliveryMessage

func (x *SharedMessage) GetBatchDeliveryMessage() *BatchDeliveryMessage

func (*SharedMessage) GetDeliveryMessage

func (x *SharedMessage) GetDeliveryMessage() *DeliveryMessage

func (*SharedMessage) GetFarewell

func (x *SharedMessage) GetFarewell() *Farewell

func (*SharedMessage) GetHandshake

func (x *SharedMessage) GetHandshake() *Handshake

func (*SharedMessage) GetMessageType

func (m *SharedMessage) GetMessageType() isSharedMessage_MessageType

func (*SharedMessage) ProtoMessage

func (*SharedMessage) ProtoMessage()

func (*SharedMessage) ProtoReflect

func (x *SharedMessage) ProtoReflect() protoreflect.Message

func (*SharedMessage) Reset

func (x *SharedMessage) Reset()

func (*SharedMessage) String

func (x *SharedMessage) String() string

type SharedMessage_BatchDeliveryMessage

type SharedMessage_BatchDeliveryMessage struct {
	BatchDeliveryMessage *BatchDeliveryMessage `protobuf:"bytes,4,opt,name=batch_delivery_message,json=batchDeliveryMessage,proto3,oneof"` // 传递多条消息
}

type SharedMessage_DeliveryMessage

type SharedMessage_DeliveryMessage struct {
	DeliveryMessage *DeliveryMessage `protobuf:"bytes,3,opt,name=delivery_message,json=deliveryMessage,proto3,oneof"` // 传递单条消息
}

type SharedMessage_Farewell

type SharedMessage_Farewell struct {
	Farewell *Farewell `protobuf:"bytes,2,opt,name=farewell,proto3,oneof"` // 告别
}

type SharedMessage_Handshake

type SharedMessage_Handshake struct {
	Handshake *Handshake `protobuf:"bytes,1,opt,name=handshake,proto3,oneof"` // 握手
}

type SharedPolicyDecision

type SharedPolicyDecision uint8
const (
	SharedPolicyDecisionStop SharedPolicyDecision = iota
	SharedPolicyDecisionRestart
)

type SharedServer

type SharedServer interface {
	StreamHandler(Shared_StreamHandlerServer) error
	// contains filtered or unexported methods
}

SharedServer is the server API for Shared service. All implementations must embed UnimplementedSharedServer for forward compatibility

type SharedStartHook

type SharedStartHook interface {
	// OnSharedStart 共享启动
	OnSharedStart()
}

SharedStartHook 共享启动时的钩子

type Shared_StreamHandlerClient

type Shared_StreamHandlerClient interface {
	Send(*SharedMessage) error
	Recv() (*SharedMessage, error)
	grpc.ClientStream
}

type Shared_StreamHandlerServer

type Shared_StreamHandlerServer interface {
	Send(*SharedMessage) error
	Recv() (*SharedMessage, error)
	grpc.ServerStream
}

type UnimplementedSharedServer

type UnimplementedSharedServer struct {
}

UnimplementedSharedServer must be embedded to have forward compatible implementations.

func (UnimplementedSharedServer) StreamHandler

type UnsafeSharedServer

type UnsafeSharedServer interface {
	// contains filtered or unexported methods
}

UnsafeSharedServer may be embedded to opt out of forward compatibility for this service. Use of this interface is not recommended, as added methods to SharedServer will result in compilation errors.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL