Documentation ¶
Index ¶
- Constants
- Variables
- func NewGrpcServer(options *GrpcServerOptions) *grpc.Server
- func UnaryIPInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, ...) (interface{}, error)
- type Broker
- type GrpcServer
- func (gRPC *GrpcServer) CreateChannel(ctx context.Context, req *pb.CreateChannelRequest) (*pb.CreateChannelResponse, error)
- func (gRPC *GrpcServer) Publish(ctx context.Context, req *pb.PublishRequest) (*pb.PublishResponse, error)
- func (gRPC *GrpcServer) Subscribe(req *pb.SubscribeRequest, stream pb.MQService_SubscribeServer) error
- type GrpcServerOptions
- type Server
- func (s *Server) CreateChannel(ctx context.Context, req *pb.CreateChannelRequest) (*pb.CreateChannelResponse, error)
- func (s *Server) Publish(ctx context.Context, req *pb.PublishRequest) (*pb.PublishResponse, error)
- func (s *Server) Subscribe(req *pb.SubscribeRequest, stream pb.MQService_SubscribeServer) error
- type ServerOptions
- type Service
- type ServiceOptions
Constants ¶
const IPContextKey contextKey = "ip"
Variables ¶
var ( // ErrFailedToSaveMessage is returned when the broker fails to save a message ErrFailedToSaveMessage = errors.New("error: failed to save message") // ErrUnableToCreateChannel is returned when the broker fails to create a channel ErrUnableToCreateChannel = errors.New("error: unable to create channel") // ErrChannelDoesNotExist is returned when the broker tries to publish a message to a non-existent channel ErrChannelDoesNotExist = errors.New("error: channel does not exist") // ErrChannelAlreadyExists is returned when the broker tries to create a channel that already exists ErrChannelAlreadyExists = errors.New("error: channel already exists") // ErrSubscriberDoesNotExist is returned when the broker tries to unsubscribe a subscriber from a channel in which the subscriber does not exist ErrSubscriberDoesNotExist = errors.New("error: subscriber is not subscribed to the channel") )
Functions ¶
func NewGrpcServer ¶
func NewGrpcServer(options *GrpcServerOptions) *grpc.Server
NewGrpcServer returns a new gRPC server
func UnaryIPInterceptor ¶
func UnaryIPInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error)
UnaryIPInterceptor is a gRPC interceptor that adds client ip and request id to the context
Types ¶
type Broker ¶
type Broker interface {
// contains filtered or unexported methods
}
Broker defines the interface for the message broker
type GrpcServer ¶
type GrpcServer struct { pb.UnimplementedMQServiceServer // contains filtered or unexported fields }
GrpcServer is the gRPC server
func (*GrpcServer) CreateChannel ¶
func (gRPC *GrpcServer) CreateChannel(ctx context.Context, req *pb.CreateChannelRequest) (*pb.CreateChannelResponse, error)
CreateChannel gRPC endpoint
func (*GrpcServer) Publish ¶
func (gRPC *GrpcServer) Publish(ctx context.Context, req *pb.PublishRequest) (*pb.PublishResponse, error)
Publish gRPC endpoint
func (*GrpcServer) Subscribe ¶
func (gRPC *GrpcServer) Subscribe(req *pb.SubscribeRequest, stream pb.MQService_SubscribeServer) error
Subscribe gRPC endpoint
type GrpcServerOptions ¶
GrpcServerOptions contains the options for the gRPC server
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the broker service implementation for gRPC
func NewServer ¶
func NewServer(logger *zap.Logger, options *ServerOptions) *Server
NewServer returns a new broker server
func (*Server) CreateChannel ¶
func (s *Server) CreateChannel(ctx context.Context, req *pb.CreateChannelRequest) (*pb.CreateChannelResponse, error)
gRPC implementation of the CreateChannel method
func (*Server) Publish ¶
func (s *Server) Publish(ctx context.Context, req *pb.PublishRequest) (*pb.PublishResponse, error)
gRPC implementation of the Publish method
func (*Server) Subscribe ¶
func (s *Server) Subscribe(req *pb.SubscribeRequest, stream pb.MQService_SubscribeServer) error
gRPC implementation of the Subscribe method
type ServerOptions ¶
ServerOptions represents the options for the broker server
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service is the implementation of the Broker interface
func NewService ¶
func NewService(logger *zap.Logger, options *ServiceOptions) *Service
NewService returns a new broker service
type ServiceOptions ¶
ServiceOptions represents the options for the broker service