Documentation ¶
Index ¶
- Variables
- type AnnotatedEventDAG
- type BaseService
- type ClockValue
- type Clonable
- type Event
- func (e *Event) Clone() Clonable
- func (e *Event) IsMessageReceive() bool
- func (e *Event) IsMessageSend() bool
- func (e *Event) IsTimeoutEnd() bool
- func (e *Event) IsTimeoutStart() bool
- func (e *Event) Lt(other *Event) bool
- func (e *Event) MessageID() (string, bool)
- func (e *Event) Timeout() (*ReplicaTimeout, bool)
- type EventDAG
- func (d *EventDAG) AddDirty(e *Event, parents []*Event)
- func (d *EventDAG) AddNode(e *Event, parents []*Event)
- func (d *EventDAG) Clean()
- func (d *EventDAG) Clone() *EventDAG
- func (d *EventDAG) GetLatestNode(e *Event) (*Event, bool)
- func (d *EventDAG) GetNode(eid uint64) (*EventNode, bool)
- func (d *EventDAG) GetReceiveNode(e *Event) (*Event, bool)
- func (d *EventDAG) GetSendNode(e *Event) (*Event, bool)
- func (d *EventDAG) GetTimeoutEnd(e *Event) (*Event, bool)
- func (d *EventDAG) GetTimeoutStart(e *Event) (*Event, bool)
- func (d *EventDAG) MarshalJSON() ([]byte, error)
- type EventNode
- func (n *EventNode) AddParents(parents []*EventNode)
- func (n *EventNode) Clone() *EventNode
- func (n *EventNode) GetNext() uint64
- func (n *EventNode) GetPrev() uint64
- func (n *EventNode) IsDirty() bool
- func (n *EventNode) MarkClean()
- func (n *EventNode) MarkDirty()
- func (n *EventNode) SetNext(next uint64)
- func (n *EventNode) SetPrev(prev uint64)
- type EventNodeSet
- type EventNodeTag
- type EventNodeTagType
- type EventQueue
- type EventType
- type GenericEventType
- type GlobalClock
- type IntW
- type Message
- type MessageParser
- type MessageQueue
- func (q *MessageQueue) Add(m *Message)
- func (q *MessageQueue) Disable()
- func (q *MessageQueue) Enable()
- func (q *MessageQueue) Flush()
- func (q *MessageQueue) Pop() (*Message, bool)
- func (q *MessageQueue) Restart() error
- func (q *MessageQueue) Start() error
- func (q *MessageQueue) Stop() error
- func (q *MessageQueue) Subscribe(label string) chan *Message
- type MessageReceiveEventType
- type MessageSendEventType
- type MessageStore
- func (s *MessageStore) Add(m *Message) *Message
- func (s *MessageStore) Exists(id string) bool
- func (s *MessageStore) Get(id string) (*Message, bool)
- func (s *MessageStore) Iter() []*Message
- func (s *MessageStore) Remove(id string) *Message
- func (s *MessageStore) RemoveAll()
- func (s *MessageStore) Size() int
- type ParsedMessage
- type Replica
- type ReplicaID
- type ReplicaLog
- type ReplicaLogQueue
- type ReplicaLogStore
- type ReplicaState
- type ReplicaStateStore
- type ReplicaStore
- type ReplicaTimeout
- type RestartableService
- type Service
- type Subscriber
- type TimeoutContext
- type TimeoutEndEventType
- type TimeoutStartEventType
- type TimeoutStore
Constants ¶
This section is empty.
Variables ¶
var ( ErrDuplicateSubs = errors.New("duplicate subscriber") ErrNoSubs = errors.New("subscriber does not exist") ErrNoData = errors.New("no data in message") ErrBadParser = errors.New("bad parser") DefaultSubsChSize = 10 )
var ( // ErrReplicaStoreFull is returned when more than the intended number of replicas register with the scheduler tool ErrReplicaStoreFull = errors.New("replica store is full") )
Functions ¶
This section is empty.
Types ¶
type AnnotatedEventDAG ¶
type AnnotatedEventDAG struct {
// contains filtered or unexported fields
}
func NewAnnotatedEventDag ¶
func NewAnnotatedEventDag(d *EventDAG) *AnnotatedEventDAG
func (*AnnotatedEventDAG) AddEvent ¶
func (a *AnnotatedEventDAG) AddEvent(e *Event, parents []*Event)
func (*AnnotatedEventDAG) Check ¶
func (a *AnnotatedEventDAG) Check() bool
func (*AnnotatedEventDAG) SetFrameOfReference ¶
func (a *AnnotatedEventDAG) SetFrameOfReference(e *Event) bool
type BaseService ¶
BaseService provides the basic nuts an bolts needed to implement a service
func NewBaseService ¶
func NewBaseService(name string, parentLogger *log.Logger) *BaseService
NewBaseService instantiates BaseService
func (*BaseService) QuitCh ¶
func (b *BaseService) QuitCh() <-chan struct{}
QuitCh returns the quit channel which will be closed when the service stops running
func (*BaseService) StartRunning ¶
func (b *BaseService) StartRunning()
StartRunning is called to set the running flag
func (*BaseService) StopRunning ¶
func (b *BaseService) StopRunning()
StopRunning is called to unset the running flag
type ClockValue ¶
type ClockValue []float64
func (ClockValue) Eq ¶
func (c ClockValue) Eq(other ClockValue) bool
func (ClockValue) Lt ¶
func (c ClockValue) Lt(other ClockValue) bool
type Clonable ¶
type Clonable interface {
Clone() Clonable
}
Clonable is any type which returns a copy of itself on Clone()
type Event ¶
type Event struct { // Replica at which the event occurs Replica ReplicaID `json:"replica"` // Type of the event Type EventType `json:"-"` // TypeS is the string representation of the event TypeS string `json:"type"` // ID unique identifier assigned for every new event ID uint64 `json:"id"` // Timestamp of the event Timestamp int64 `json:"timestamp"` // Vector clock value of the event ClockValue ClockValue }
Event is a generic event that occurs at a replica
func (*Event) IsMessageReceive ¶
func (*Event) IsMessageSend ¶
func (*Event) IsTimeoutEnd ¶
func (*Event) IsTimeoutStart ¶
func (*Event) Timeout ¶
func (e *Event) Timeout() (*ReplicaTimeout, bool)
type EventDAG ¶
type EventDAG struct {
// contains filtered or unexported fields
}
func NewEventDag ¶
func NewEventDag() *EventDAG
func (*EventDAG) MarshalJSON ¶
type EventNode ¶
type EventNode struct { Event *Event `json:"event"` Parents *EventNodeSet `json:"parents"` Children *EventNodeSet `json:"children"` // contains filtered or unexported fields }
func NewEventNode ¶
func (*EventNode) AddParents ¶
type EventNodeSet ¶
type EventNodeSet struct {
// contains filtered or unexported fields
}
func NewEventNodeSet ¶
func NewEventNodeSet() *EventNodeSet
func (*EventNodeSet) Add ¶
func (d *EventNodeSet) Add(nid uint64)
func (*EventNodeSet) Clone ¶
func (d *EventNodeSet) Clone() *EventNodeSet
func (*EventNodeSet) Exists ¶
func (d *EventNodeSet) Exists(nid uint64) bool
func (*EventNodeSet) Iter ¶
func (d *EventNodeSet) Iter() []uint64
func (*EventNodeSet) MarshalJSON ¶
func (d *EventNodeSet) MarshalJSON() ([]byte, error)
func (*EventNodeSet) Size ¶
func (d *EventNodeSet) Size() int
type EventNodeTag ¶
type EventNodeTag struct { Event *Event Type EventNodeTagType Min *IntW Max *IntW }
func NewEventNodeTag ¶
func NewEventNodeTag(e *Event) *EventNodeTag
type EventNodeTagType ¶
type EventNodeTagType = string
var ( MessageSend EventNodeTagType = "MessageSend" MessageReceive EventNodeTagType = "MessageReceive" TimeoutStart EventNodeTagType = "TimeoutStart" TimeoutEnd EventNodeTagType = "TimeoutEnd" Other EventNodeTagType = "Other" )
type EventQueue ¶
type EventQueue struct { *BaseService // contains filtered or unexported fields }
EventQueue datastructure to store the messages in a FIFO queue
func NewEventQueue ¶
func NewEventQueue(logger *log.Logger) *EventQueue
NewEventQueue returns an empty EventQueue
func (*EventQueue) Subscribe ¶
func (q *EventQueue) Subscribe(label string) chan *Event
Subscribe creates and returns a channel for the subscriber with the given label
type EventType ¶
type EventType interface { // Clone copies the event type Clone() EventType // Type is a unique key for that event type Type() string // String should return a string representation of the event type String() string }
EventType abstract type for representing different types of events
type GenericEventType ¶
type GenericEventType struct { // Marshalled parameters Params map[string]string `json:"params"` // Type of event for reference // Eg: Commit T string `json:"type"` }
GenericEventType is the event type published by a replica It can be specific to the algorithm that is implemented
func NewGenericEventType ¶
func NewGenericEventType(params map[string]string, t string) *GenericEventType
NewGenericEventType instantiates GenericEventType
func (*GenericEventType) Clone ¶
func (g *GenericEventType) Clone() EventType
Clone returns a copy of the current GenericEventType
func (*GenericEventType) String ¶
func (g *GenericEventType) String() string
String returns a string representation of the event type
func (*GenericEventType) Type ¶
func (g *GenericEventType) Type() string
Type returns a unique key for GenericEventType
type GlobalClock ¶
type GlobalClock struct {
// contains filtered or unexported fields
}
func NewGlobalClock ¶
func NewGlobalClock(dag *EventDAG, messageStore *MessageStore) *GlobalClock
type Message ¶
type Message struct { From ReplicaID `json:"from"` To ReplicaID `json:"to"` Data []byte `json:"data"` Type string `json:"type"` ID string `json:"id"` Intercept bool `json:"intercept"` ParsedMessage ParsedMessage `json:"-"` Repr string `json:"repr"` }
Message stores a message that has been interecepted between two replicas
func (*Message) Parse ¶
func (m *Message) Parse(parser MessageParser) error
type MessageParser ¶
type MessageParser interface {
Parse([]byte) (ParsedMessage, error)
}
type MessageQueue ¶
type MessageQueue struct { *BaseService // contains filtered or unexported fields }
MessageQueue datastructure to store the messages in a FIFO queue
func NewMessageQueue ¶
func NewMessageQueue(logger *log.Logger) *MessageQueue
NewMessageQueue returns an empty MessageQueue
func (*MessageQueue) Disable ¶
func (q *MessageQueue) Disable()
Disable closes the queue and drops all incoming messages Use with caution
func (*MessageQueue) Enable ¶
func (q *MessageQueue) Enable()
Enable enqueues the messages and feeds it to the subscribers if any
func (*MessageQueue) Pop ¶
func (q *MessageQueue) Pop() (*Message, bool)
func (*MessageQueue) Subscribe ¶
func (q *MessageQueue) Subscribe(label string) chan *Message
Subscribe create and returns a channel for the subscriber with the specified label
type MessageReceiveEventType ¶
type MessageReceiveEventType struct { // MessageID is the ID of the message received MessageID string }
MessageReceiveEventType is the event type when a replica receives a message
func NewMessageReceiveEventType ¶
func NewMessageReceiveEventType(messageID string) *MessageReceiveEventType
NewMessageReceiveEventType instantiates MessageReceiveEventType
func (*MessageReceiveEventType) Clone ¶
func (r *MessageReceiveEventType) Clone() EventType
Clone returns a copy of the current MessageReceiveEventType
func (*MessageReceiveEventType) String ¶
func (r *MessageReceiveEventType) String() string
String returns a string representation of the event type
func (*MessageReceiveEventType) Type ¶
func (r *MessageReceiveEventType) Type() string
Type returns a unique key for MessageReceiveEventType
type MessageSendEventType ¶
type MessageSendEventType struct { // MessageID of the message that was sent MessageID string }
MessageSendEventType is the event type where a message is sent from the replica
func NewMessageSendEventType ¶
func NewMessageSendEventType(messageID string) *MessageSendEventType
NewMessageSendEventType instantiates MessageSendEventType
func (*MessageSendEventType) Clone ¶
func (s *MessageSendEventType) Clone() EventType
Clone returns a copy of the current MessageSendEventType
func (*MessageSendEventType) String ¶
func (s *MessageSendEventType) String() string
String returns a string representation of the event type
func (*MessageSendEventType) Type ¶
func (s *MessageSendEventType) Type() string
Type returns a unique key for MessageSendEventType
type MessageStore ¶
type MessageStore struct {
// contains filtered or unexported fields
}
MessageStore to store the messages. Thread safe
func NewMessageStore ¶
func NewMessageStore() *MessageStore
NewMessageStore creates a empty MessageStore
func (*MessageStore) Add ¶
func (s *MessageStore) Add(m *Message) *Message
Add adds a message to the store Returns any old message with the same ID if it exists or nil if not
func (*MessageStore) Exists ¶
func (s *MessageStore) Exists(id string) bool
Exists returns true if the message exists
func (*MessageStore) Get ¶
func (s *MessageStore) Get(id string) (*Message, bool)
Get returns a message and bool indicating if the message exists
func (*MessageStore) Iter ¶
func (s *MessageStore) Iter() []*Message
Iter returns a list of all the messages in the store
func (*MessageStore) Remove ¶
func (s *MessageStore) Remove(id string) *Message
Remove returns and deleted the message from the store if it exists. Returns nil otherwise
func (*MessageStore) RemoveAll ¶
func (s *MessageStore) RemoveAll()
RemoveAll empties the message store
func (*MessageStore) Size ¶
func (s *MessageStore) Size() int
Size returns the size of the message store
type ParsedMessage ¶
type ParsedMessage interface { String() string Clone() ParsedMessage Marshal() ([]byte, error) }
type Replica ¶
type Replica struct { ID ReplicaID `json:"id"` Ready bool `json:"ready"` Info map[string]interface{} `json:"info"` Addr string `json:"addr"` }
Replica immutable representation of the attributes of a replica
type ReplicaID ¶
type ReplicaID string
ReplicaID is an identifier for the replica encoded as a string
type ReplicaLog ¶
type ReplicaLog struct { // Params is a marhsalled params of the log message Params map[string]string `json:"params"` // Message is the message that was logged Message string `json:"message"` // Timestamp of the log Timestamp int64 `json:"timestamp"` // Replica which posted the log Replica ReplicaID `json:"replica"` }
ReplicaLog encapsulates a log message with the necessary attributes
type ReplicaLogQueue ¶
type ReplicaLogQueue struct { *BaseService // contains filtered or unexported fields }
ReplicaLogQueue is the queue of log messages
func NewReplicaLogQueue ¶
func NewReplicaLogQueue(logger *log.Logger) *ReplicaLogQueue
NewReplicaLogQueue instantiates ReplicaLogQueue
func (*ReplicaLogQueue) Flush ¶
func (q *ReplicaLogQueue) Flush()
Flush erases the contents of the queue
func (*ReplicaLogQueue) Subscribe ¶
func (q *ReplicaLogQueue) Subscribe(label string) chan *ReplicaLog
Subscribe creates and returns a channel for the subscriber
type ReplicaLogStore ¶
type ReplicaLogStore struct {
// contains filtered or unexported fields
}
ReplicaLogStore stores the logs as a map indexed by the replica ID
func NewReplicaLogStore ¶
func NewReplicaLogStore() *ReplicaLogStore
NewReplicaLogStore instantiates a ReplicaLogStore
func (*ReplicaLogStore) Add ¶
func (store *ReplicaLogStore) Add(log *ReplicaLog)
Add adds to the log store
func (*ReplicaLogStore) GetLogs ¶
func (store *ReplicaLogStore) GetLogs(replica ReplicaID, from, to int) ([]*ReplicaLog, int)
GetLogs returns the list of logs for a replica where from <=index<to
func (*ReplicaLogStore) Reset ¶
func (store *ReplicaLogStore) Reset()
type ReplicaState ¶
type ReplicaStateStore ¶
type ReplicaStateStore struct {
// contains filtered or unexported fields
}
func NewReplicaStateStore ¶
func NewReplicaStateStore() *ReplicaStateStore
type ReplicaStore ¶
type ReplicaStore struct {
// contains filtered or unexported fields
}
ReplicaStore to store all replica information, thread safe
func NewReplicaStore ¶
func NewReplicaStore(size int) *ReplicaStore
NewReplicaStore creates an empty ReplicaStore
func (*ReplicaStore) Add ¶
func (s *ReplicaStore) Add(p *Replica)
Add adds or updates a replica to the store
func (*ReplicaStore) Cap ¶
func (s *ReplicaStore) Cap() int
Cap returns the set of replicas used for the test
func (*ReplicaStore) Count ¶
func (s *ReplicaStore) Count() int
Count returns the total number of replicas
func (*ReplicaStore) Get ¶
func (s *ReplicaStore) Get(id ReplicaID) (p *Replica, ok bool)
Get returns the replica and a bool indicating if it exists or not
func (*ReplicaStore) Iter ¶
func (s *ReplicaStore) Iter() []*Replica
Iter returns a list of the existing replicas
func (*ReplicaStore) NumReady ¶
func (s *ReplicaStore) NumReady() int
NumReady returns the number of replicas with Ready attribute set to true
func (*ReplicaStore) ResetReady ¶
func (s *ReplicaStore) ResetReady()
ResetReady sets the Ready attribute of all replicas to false
type ReplicaTimeout ¶
type ReplicaTimeout struct { Replica ReplicaID `json:"replica"` Type string `json:"type"` Duration time.Duration `json:"duration"` }
func TimeoutFromParams ¶
func TimeoutFromParams(replica ReplicaID, params map[string]string) (*ReplicaTimeout, bool)
func (*ReplicaTimeout) Eq ¶
func (t *ReplicaTimeout) Eq(other *ReplicaTimeout) bool
func (*ReplicaTimeout) MarshalJSON ¶
func (t *ReplicaTimeout) MarshalJSON() ([]byte, error)
type RestartableService ¶
RestartableService is a service which can be restarted
type Service ¶
type Service interface { // Name of the service Name() string // Start to start the service Start() error // Running to indicate if the service is running Running() bool // Stop to stop the service Stop() error // Quit returns a channel which will be closed once the service stops running QuitCh() <-chan struct{} }
Service is any entity which runs on a separate thread
type Subscriber ¶
type Subscriber struct {
Ch chan interface{}
}
Generic subscriber to maintain state of the subsciber
type TimeoutContext ¶
type TimeoutContext struct { PendingTimeouts map[string]*timeoutWrapper PendingReceives map[string]*Event // contains filtered or unexported fields }
func NewTimeoutContext ¶
func NewTimeoutContext(d *EventDAG) *TimeoutContext
func (*TimeoutContext) AddEvent ¶
func (t *TimeoutContext) AddEvent(e *Event)
func (*TimeoutContext) CanDeliverMessages ¶
func (t *TimeoutContext) CanDeliverMessages(messages []*Message)
type TimeoutEndEventType ¶
type TimeoutEndEventType struct {
Timeout *ReplicaTimeout
}
func NewTimeoutEndEventType ¶
func NewTimeoutEndEventType(timeout *ReplicaTimeout) *TimeoutEndEventType
func (*TimeoutEndEventType) Clone ¶
func (te *TimeoutEndEventType) Clone() EventType
func (*TimeoutEndEventType) String ¶
func (te *TimeoutEndEventType) String() string
func (*TimeoutEndEventType) Type ¶
func (te *TimeoutEndEventType) Type() string
type TimeoutStartEventType ¶
type TimeoutStartEventType struct {
Timeout *ReplicaTimeout
}
func NewTimeoutStartEventType ¶
func NewTimeoutStartEventType(timeout *ReplicaTimeout) *TimeoutStartEventType
func (*TimeoutStartEventType) Clone ¶
func (ts *TimeoutStartEventType) Clone() EventType
func (*TimeoutStartEventType) String ¶
func (ts *TimeoutStartEventType) String() string
func (*TimeoutStartEventType) Type ¶
func (ts *TimeoutStartEventType) Type() string
type TimeoutStore ¶
type TimeoutStore struct { *BaseService // contains filtered or unexported fields }
func NewTimeoutStore ¶
func NewTimeoutStore(logger *log.Logger) *TimeoutStore
func (*TimeoutStore) AddTimeout ¶
func (s *TimeoutStore) AddTimeout(t *ReplicaTimeout)
func (*TimeoutStore) Reset ¶
func (s *TimeoutStore) Reset()
func (*TimeoutStore) Start ¶
func (s *TimeoutStore) Start() error
func (*TimeoutStore) Stop ¶
func (s *TimeoutStore) Stop() error
func (*TimeoutStore) ToDispatch ¶
func (s *TimeoutStore) ToDispatch() []*ReplicaTimeout