Documentation ¶
Index ¶
- Variables
- type FetchDirection
- type FetchRequest
- func (fr *FetchRequest) Done()
- func (fr *FetchRequest) Error(err error)
- func (fr *FetchRequest) Errors() <-chan error
- func (fr *FetchRequest) Init()
- func (fr *FetchRequest) IsDone() bool
- func (fr *FetchRequest) Messages() <-chan *FetchedMessage
- func (fr *FetchRequest) Push(id uint64, message []byte)
- func (fr *FetchRequest) PushError(err error)
- func (fr *FetchRequest) PushFetchMessage(fm *FetchedMessage)
- func (fr *FetchRequest) Ready() int
- type FetchedMessage
- type MessagePartition
- type MessageStore
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrRequestDone = errors.New("Fetch request is done")
Functions ¶
This section is empty.
Types ¶
type FetchDirection ¶
type FetchDirection int
const ( DirectionOneMessage FetchDirection = 0 DirectionForward FetchDirection = 1 DirectionBackwards FetchDirection = -1 // TODO Bogdan decide the channel size and if should be customizable FetchBufferSize = 10 )
type FetchRequest ¶
type FetchRequest struct { sync.RWMutex // Partition is the Store name to search for messages Partition string // StartID is the message sequence id to start StartID uint64 // EndID is the message sequence id to finish. If will not be used. EndID uint64 // Direction has 3 possible values: // Direction == 0: Only the Message with StartId // Direction == 1: Fetch also the next Count Messages with a higher MessageId // Direction == -1: Fetch also the next Count Messages with a lower MessageId Direction FetchDirection // Count is the maximum number of messages to return Count int // MessageC is the channel to send the message back to the receiver MessageC chan *FetchedMessage // ErrorC is a channel if an error occurs ErrorC chan error // StartC Through this channel , the total number or result // is returned, before sending the first message. // The Fetch() methods blocks on putting the number to the start channel. StartC chan int // contains filtered or unexported fields }
FetchRequest is used for fetching messages in a MessageStore.
func NewFetchRequest ¶
func NewFetchRequest(partition string, start, end uint64, direction FetchDirection, count int) *FetchRequest
NewFetchRequest creates a new FetchRequest pointer initialized with provided values if `count` is negative will be set to MaxInt32
func (*FetchRequest) Done ¶
func (fr *FetchRequest) Done()
func (*FetchRequest) Error ¶
func (fr *FetchRequest) Error(err error)
func (*FetchRequest) Errors ¶
func (fr *FetchRequest) Errors() <-chan error
func (*FetchRequest) Init ¶
func (fr *FetchRequest) Init()
func (*FetchRequest) IsDone ¶
func (fr *FetchRequest) IsDone() bool
func (*FetchRequest) Messages ¶
func (fr *FetchRequest) Messages() <-chan *FetchedMessage
func (*FetchRequest) Push ¶
func (fr *FetchRequest) Push(id uint64, message []byte)
func (*FetchRequest) PushError ¶
func (fr *FetchRequest) PushError(err error)
func (*FetchRequest) PushFetchMessage ¶
func (fr *FetchRequest) PushFetchMessage(fm *FetchedMessage)
func (*FetchRequest) Ready ¶
func (fr *FetchRequest) Ready() int
Ready returns the count of messages that will be returned meaning that the fetch is starting. It reads the number from the StartC channel.
type FetchedMessage ¶
FetchedMessage is a struct containing a pair: guble Message and its ID.
type MessagePartition ¶
type MessageStore ¶
type MessageStore interface { // Store a message within a partition. // The message id must be equal to MaxMessageId +1. // So the caller has to maintain the consistence between // fetching an id and storing the message. Store(partition string, messageID uint64, data []byte) error // Generates a new ID for the message if it's new and stores it // Returns the size of the new message or error // Takes the message and cluster node ID as parameters. StoreMessage(*protocol.Message, uint8) (int, error) // Fetch fetches a set of messages. // The results, as well as errors are communicated asynchronously using // the channels, supplied by the FetchRequest. Fetch(*FetchRequest) // MaxMessageId returns the highest message id for a particular partition MaxMessageID(partition string) (uint64, error) // DoInTx executes the supplied function within the locking context of the message partition. // This ensures, that wile the code is executed, no change to the supplied maxMessageId can occur. // The error result if the fnToExecute or an error while locking will be returned by DoInTx. DoInTx(partition string, fnToExecute func(uint64) error) error // GenerateNextMsgId generates a new message ID based on a timestamp in a strictly monotonically order GenerateNextMsgID(partition string, nodeID uint8) (uint64, int64, error) Partition(string) (MessagePartition, error) // Partitions returns a slice of `MessagePartition` available in the store Partitions() ([]MessagePartition, error) }
MessageStore is an interface for a persistence backend storing topics.
Directories ¶
Path | Synopsis |
---|---|
Package filestore is a filesystem-based implementation of the MessageStore interface.
|
Package filestore is a filesystem-based implementation of the MessageStore interface. |
Click to show internal directories.
Click to hide internal directories.