server

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetListenHost

func GetListenHost(port int) string

func Listen

func Listen(ctx context.Context, shard uint, topic string, prefixes [][]byte) chan []byte

Listen returns a channel of messages

func ListenSingle

func ListenSingle(ctx context.Context, shard uint, topic string, start []byte, prefixes [][]byte) error

ListenSingle returns nil if a matching new item is found, otherwise an error

func ReceiveNew

func ReceiveNew(shard uint, topic string, uid []byte)

Types

type Db

type Db interface {
	GetMsg(*RequestSingle) (*Msg, error)
	GetMsgs(*Request) ([]*Msg, error)
	SaveMsgs([]*Msg) error
	GetCount(*RequestCount) (uint64, error)
	GetTopicList() ([]TopicInfo, error)
	DeleteMessages(*RequestDelete) error
}

type Msg

type Msg struct {
	Topic   string
	Uid     []byte
	Message []byte
}

func (Msg) GetShard

func (m Msg) GetShard() uint

type MsgDone

type MsgDone struct {
	Msgs []*Msg
	Done chan error
}

func NewMsgDone

func NewMsgDone(msgs []*Msg) *MsgDone

type PubSub

type PubSub struct {
	Incr  int64
	Subs  map[int64]*Subscribe
	Mutex sync.Mutex
}

func (*PubSub) Close

func (s *PubSub) Close(id int64)

func (*PubSub) Publish

func (s *PubSub) Publish(shard uint, topic string, uid []byte)

func (*PubSub) Subscribe

func (s *PubSub) Subscribe(shard uint, topic string, start []byte, prefixes [][]byte) *Subscribe

type Request

type Request struct {
	Topic    string
	Prefixes [][]byte // Filter only these prefixes, e.g. prefix = "b". Ignore Start if more than 1, except overload
	Start    []byte   // Seek to this location to start, e.g. seek to "baltimore", can be overloaded to say last item
	Uids     [][]byte // When set, Start and Prefixes are ignored
	Max      uint32
	Wait     bool
	Newest   bool
	Context  context.Context
}

type RequestCount

type RequestCount struct {
	Topic  string
	Prefix []byte
}

type RequestDelete

type RequestDelete struct {
	Topic string
	Uids  [][]byte
}

type RequestOld

type RequestOld struct {
	Topic  string
	Offset uint64
	Max    uint32
	Wait   bool
}

type RequestSingle

type RequestSingle struct {
	Topic string
	Uid   []byte
}

type Server

type Server struct {
	Port    int
	Shard   uint
	Stopped bool

	MsgDoneChan chan *MsgDone
	Timeout     time.Duration
	Grpc        *grpc.Server
	queue_pb.UnimplementedQueueServer
	// contains filtered or unexported fields
}

func NewServer

func NewServer(port int, shard uint) *Server

func (*Server) DeleteMessages

func (s *Server) DeleteMessages(ctx context.Context, request *queue_pb.MessageUids) (*queue_pb.ErrorReply, error)

func (*Server) GetMessage

func (s *Server) GetMessage(_ context.Context, request *queue_pb.RequestSingle) (*queue_pb.Message, error)

func (*Server) GetMessageCount

func (s *Server) GetMessageCount(ctx context.Context, request *queue_pb.CountRequest) (*queue_pb.TopicCount, error)

func (*Server) GetMessages

func (s *Server) GetMessages(ctx context.Context, request *queue_pb.Request) (*queue_pb.Messages, error)

func (*Server) GetStreamMessages

func (s *Server) GetStreamMessages(request *queue_pb.RequestStream, server queue_pb.Queue_GetStreamMessagesServer) error

func (*Server) Run

func (s *Server) Run() error

func (*Server) SaveMessages

func (s *Server) SaveMessages(_ context.Context, messages *queue_pb.Messages) (*queue_pb.ErrorReply, error)

func (*Server) SaveMsgs

func (s *Server) SaveMsgs(msgs []*Msg) error

func (*Server) Serve

func (s *Server) Serve() error

func (*Server) Start

func (s *Server) Start() error

func (*Server) StartMessageChan

func (s *Server) StartMessageChan()

func (*Server) Stop

func (s *Server) Stop()

type Subscribe

type Subscribe struct {
	Id       int64
	Shard    uint
	Topic    string
	Start    []byte
	Prefixes [][]byte
	UidChan  chan []byte
	PubSub   *PubSub
}

func (*Subscribe) Close

func (s *Subscribe) Close()

type TopicInfo

type TopicInfo struct {
	Name  string
	Count uint64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL