Documentation ¶
Index ¶
- Constants
- Variables
- func GetNodePartitionRange(cfg *Config, list *memberlist.Memberlist) (int, int)
- func InitMemberList(name string, port int, seedServers []string, seedPort int) (*memberlist.Memberlist, int, error)
- type Config
- func (cfg *Config) GetCompressedMessages(queueName string) (bool, error)
- func (cfg *Config) GetMaxPartitionAge(queueName string) (float64, error)
- func (cfg *Config) GetMaxPartitions(queueName string) (int, error)
- func (cfg *Config) GetMinPartitions(queueName string) (int, error)
- func (cfg *Config) GetVisibilityTimeout(queueName string) (float64, error)
- func (cfg *Config) InitializeQueue(queueName string) error
- func (cfg *Config) RiakConnection() *riak.Client
- func (cfg *Config) SetCompressedMessages(queueName string, compressedMessages bool) error
- func (cfg *Config) SetMaxPartitionAge(queueName string, age float64) error
- func (cfg *Config) SetMaxPartitions(queueName string, timeout int) error
- func (cfg *Config) SetMinPartitions(queueName string, timeout int) error
- func (cfg *Config) SetVisibilityTimeout(queueName string, timeout float64) error
- type ConfigRequest
- type Core
- type HTTPApiV1
- type Partition
- type Partitions
- type Queue
- func (queue *Queue) BatchDelete(cfg *Config, ids []string) (int, error)
- func (queue *Queue) Delete(cfg *Config, id string) bool
- func (queue *Queue) Get(cfg *Config, list *memberlist.Memberlist, batchsize int64) ([]riak.RObject, error)
- func (queue *Queue) Put(cfg *Config, message string) string
- func (queue *Queue) RetrieveMessages(ids []string, cfg *Config) []riak.RObject
- type Queues
- type Stats
- type Topic
- type Topics
Constants ¶
const CompressedMessages = "compressed_messages"
CompressedMessages is the name of the config setting name for controlling if the queue is using compression or not
const ConfigurationBucket = "config"
ConfigurationBucket is the name of the riak bucket holding the config
const MaxPartitionAge = "max_partition_age"
MaxPartitionAge is the name of the config setting name for controlling how long an un-used partition should exist
const MaxPartitions = "max_partitions"
MaxPartitions is the name of the config setting name for controlling the maximum number of partitions per node
const MinPartitions = "min_partitions"
MinPartitions is the name of the config setting name for controlling the minimum number of partitions per queue
const NoPartitions string = "no available partitions"
NoPartitions represents the message that there were no available partitions
const PartitionCount = "partition_count"
PartitionCount is
const QueueConfigName = "queue_config"
QueueConfigName is the key in the riak bucket holding the config
const QueueDeletedStatsSuffix = "deleted.count"
QueueDeletedStatsSuffix is
const QueueDepthAprStatsSuffix = "approximate_depth.count"
QueueDepthAprStatsSuffix is
const QueueDepthStatsSuffix = "depth.count"
QueueDepthStatsSuffix is
const QueueFillDeltaStatsSuffix = "fill.count"
QueueFillDeltaStatsSuffix
const QueueReceivedStatsSuffix = "received.count"
QueueReceivedStatsSuffix is
const QueueSentStatsSuffix = "sent.count"
QueueSentStatsSuffix is
const QueueSetName = "queues"
QueueSetName is the crdt key holding the set of all queues
const VisibilityTimeout = "visibility_timeout"
VisibilityTimeout is the name of the config setting name for controlling how long a message is "inflight"
Variables ¶
var DefaultSettings = map[string]string{VisibilityTimeout: "30", PartitionCount: "5", MinPartitions: "1", MaxPartitions: "10", MaxPartitionAge: "432000", CompressedMessages: "false"}
DefaultSettings is
var ( // ErrConfigurationOptionNotFound represents the condition that occurs if an invalid // location is specified for the config file ErrConfigurationOptionNotFound = errors.New("Configuration Value Not Found") )
var MaxIDSize = *big.NewInt(math.MaxInt64)
MaxIDSize is
var Settings = [...]string{VisibilityTimeout, PartitionCount, MinPartitions, MaxPartitions, MaxPartitionAge, CompressedMessages}
Settings Arrays and maps cannot be made immutable in golang
Functions ¶
func GetNodePartitionRange ¶
func GetNodePartitionRange(cfg *Config, list *memberlist.Memberlist) (int, int)
GetNodePartitionRange returns the range of partitions active for this node
func InitMemberList ¶
func InitMemberList(name string, port int, seedServers []string, seedPort int) (*memberlist.Memberlist, int, error)
InitMemberList created a memberlist, and joins it to the network TODO clean this up, since we only really need the 1 port
Types ¶
type Config ¶
type Config struct { Core Core Stats Stats Compressor compressor.Compressor Queues *Queues RiakPool *riak.Client Topics *Topics }
Config is
func (*Config) GetCompressedMessages ¶
GetCompressedMessages is
func (*Config) GetMaxPartitionAge ¶
GetMaxPartitionAge is
func (*Config) GetMaxPartitions ¶
GetMaxPartitions is
func (*Config) GetMinPartitions ¶
GetMinPartitions is
func (*Config) GetVisibilityTimeout ¶
GetVisibilityTimeout is
func (*Config) InitializeQueue ¶
InitializeQueue is
func (*Config) RiakConnection ¶
func (cfg *Config) RiakConnection() *riak.Client
RiakConnection returns a pointer to the current pool of riak connections, which is abstracted inside of the riak.Client object
func (*Config) SetCompressedMessages ¶
SetCompressedMessages is
func (*Config) SetMaxPartitionAge ¶
SetMaxPartitionAge is
func (*Config) SetMaxPartitions ¶
SetMaxPartitions is
func (*Config) SetMinPartitions ¶
SetMinPartitions is
type ConfigRequest ¶
type ConfigRequest struct { VisibilityTimeout *float64 `json:"visibility_timeout,omitempty"` MinPartitions *int `json:"min_partitions,omitempty"` MaxPartitions *int `json:"max_partitions,omitempty"` MaxPartitionAge *float64 `json:"max_partition_age,omitempty"` CompressedMessages *bool `json:"compressed_messages,omitempty"` }
ConfigRequest is
type Core ¶
type Core struct { Name string Port int SeedServer string SeedPort int SeedServers []string HTTPPort int RiakNodes string BackendConnectionPool int SyncConfigInterval time.Duration LogLevel logrus.Level LogLevelString string }
Core is
type HTTPApiV1 ¶
type HTTPApiV1 struct { }
HTTPApiV1 is
func (HTTPApiV1) InitWebserver ¶
func (h HTTPApiV1) InitWebserver(list *memberlist.Memberlist, cfg *Config)
InitWebserver is
type Partitions ¶
Partitions represents a collecton of Partition objects
func InitPartitions ¶
func InitPartitions(cfg *Config, queueName string) *Partitions
InitPartitions creates a series of partitions based on the provided config and queue
func (*Partitions) GetPartition ¶
func (part *Partitions) GetPartition(cfg *Config, queueName string, list *memberlist.Memberlist) (int, int, *Partition, error)
GetPartition pops a partition off of the queue for the specified queue
func (*Partitions) PartitionCount ¶
func (part *Partitions) PartitionCount() int
PartitionCount returns the count of known partitions
func (*Partitions) PushPartition ¶
func (part *Partitions) PushPartition(cfg *Config, queueName string, partition *Partition, lock bool)
PushPartition pushes a partition back onto the queue for the given queue
type Queue ¶
type Queue struct { // the definition of a queue // name of the queue Name string // the partitions of the queue Parts *Partitions // Individual settings for the queue Config *riak.RDtMap // Mutex for protecting rw access to the Config object sync.RWMutex }
Queue represents
func (*Queue) BatchDelete ¶
BatchDelete deletes multiple messages at once
func (*Queue) Get ¶
func (queue *Queue) Get(cfg *Config, list *memberlist.Memberlist, batchsize int64) ([]riak.RObject, error)
Get gets a message from the queue
func (*Queue) RetrieveMessages ¶
RetrieveMessages takes a list of message ids and pulls the actual data from Riak
type Queues ¶
type Queues struct { // a container for all queues QueueMap map[string]*Queue // Settings for Queues in general, ie queue list Config *riak.RDtMap // Mutex for protecting rw access to the Config object sync.RWMutex // contains filtered or unexported fields }
Queues represents
func (*Queues) DeleteQueue ¶
DeleteQueue deletes the given queue
type Stats ¶
type Stats struct { Type string FlushInterval int Address string Prefix string Client stats.Client }
Stats is
type Topic ¶
type Topic struct { // store a CRDT in riak for the topic configuration including subscribers Name string Config *riak.RDtMap // Mutex for protecting rw access to the Config object sync.RWMutex // contains filtered or unexported fields }
Topic represents a topic
func (*Topic) Broadcast ¶
Broadcast will send the message to all listening queues and return the acked writes
func (*Topic) Delete ¶
Delete will delete the given topic, which removes any queues from its subscription list
func (*Topic) DeleteQueue ¶
DeleteQueue will remove a queue from the list of topic subscribers
func (*Topic) ListQueues ¶
ListQueues will return a list of all known queues for a topic
type Topics ¶
type Topics struct { // global topic configuration, should contain list of all active topics Config *riak.RDtMap // topic map TopicMap map[string]*Topic // Mutex for protecting rw access to the Config object sync.RWMutex // contains filtered or unexported fields }
Topics represents a collection of topics
func InitTopics ¶
InitTopics initializes the set of known topics in the system
func (*Topics) DeleteTopic ¶
DeleteTopic will delete the topic from the collection of all topics, which removes any queues it's subscription list