Documentation
¶
Index ¶
- Constants
- Variables
- func Main(port int, dbpath string) error
- type BoltMessageBucket
- type BoltStore
- type Broker
- func (b *Broker) Done()
- func (b *Broker) FinishMessage(name string, id MessageID) error
- func (b *Broker) GetMessage(name string, id MessageID) (*Message, error)
- func (b *Broker) Init() error
- func (b *Broker) NewID() MessageID
- func (b *Broker) PushMessage(name string, job *config.Job) (*Message, error)
- func (b *Broker) ReportJob(ctx context.Context, req *pb.ReportJobRequest) (res *pb.ReportJobResponse, err error)
- func (b *Broker) ReportJobDone(ctx context.Context, req *pb.ReportJobDoneRequest) (res *pb.ReportJobDoneResponse, err error)
- func (b *Broker) SubscribeJob(ctx context.Context, req *pb.SubscribeJobRequest) (*pb.SubscribeJobResponse, error)
- func (b *Broker) Topic(name string) *Topic
- type Json
- type LQueue
- type Message
- type MessageBucket
- type MessageID
- type MessageIDGenerator
- type Queue
- type Store
- type TaskResults
- type Topic
Constants ¶
View Source
const ( MSG_PENDING = iota MSG_RECEIVED MSG_SUCCESS MSG_FAILURE )
View Source
const ( MsgPendingState = "PENDING" MsgReceivedState = "RECEIVED" MsgSuccessState = "SUCCESS" MsgFailureState = "FAILURE" )
View Source
const ( MessageBucketName = "messages" MessagePendingBucketName = "pendingMessages" WorkerPerMessageBucketNamePrefix = "worker:" )
View Source
const MsgIdLen = 16
Variables ¶
View Source
var ( ErrTopicNotFound = errors.New("Topic not found") ErrMsgNotFound = errors.New("Message not found") )
View Source
var ErrNotSupportedStoreType = errors.New("this store type is not supported")
View Source
var ErrSequenceExpired = errors.New("sequence expired")
View Source
var ErrTimeBackwards = errors.New("time has gone backwards")
View Source
var (
MsgStates map[int]string
)
Functions ¶
Types ¶
type BoltMessageBucket ¶
type BoltMessageBucket struct {
// contains filtered or unexported fields
}
func (*BoltMessageBucket) Del ¶
func (b *BoltMessageBucket) Del(id MessageID) error
func (*BoltMessageBucket) DelBucket ¶
func (b *BoltMessageBucket) DelBucket() error
func (*BoltMessageBucket) Put ¶
func (b *BoltMessageBucket) Put(msg *Message) error
type BoltStore ¶
type BoltStore struct { Path string // contains filtered or unexported fields }
func NewBoltStore ¶
func (*BoltStore) MessageBucket ¶
func (bs *BoltStore) MessageBucket(name string) MessageBucket
type Broker ¶
type Broker struct { ID int64 DBPath string Topics map[string]*Topic // contains filtered or unexported fields }
func (*Broker) GetMessage ¶
func (*Broker) PushMessage ¶
func (*Broker) ReportJob ¶
func (b *Broker) ReportJob(ctx context.Context, req *pb.ReportJobRequest) (res *pb.ReportJobResponse, err error)
func (*Broker) ReportJobDone ¶
func (b *Broker) ReportJobDone(ctx context.Context, req *pb.ReportJobDoneRequest) (res *pb.ReportJobDoneResponse, err error)
func (*Broker) SubscribeJob ¶
func (b *Broker) SubscribeJob(ctx context.Context, req *pb.SubscribeJobRequest) (*pb.SubscribeJobResponse, error)
twirp rpc interface
type Message ¶
type Message struct { ID MessageID Job *config.Job Created time.Time State int Results *TaskResults }
func DecodeMessage ¶
func (*Message) SetResults ¶
type MessageBucket ¶
type MessageID ¶
func GetMessageID ¶
type MessageIDGenerator ¶
type MessageIDGenerator interface {
NewID() MessageID
}
type Store ¶
type Store interface { Open() error Close() error MessageBucket(name string) MessageBucket }
type TaskResults ¶
type TaskResults struct { WorkerId string `json:"worker"` Tasks map[string]interface{} `json:"tasks"` }
func NewTaskResults ¶
func NewTaskResults(workerId string, tasks map[string]interface{}) *TaskResults
type Topic ¶
func (*Topic) FinishMessage ¶
func (*Topic) PopMessage ¶
func (*Topic) PushMessage ¶
Click to show internal directories.
Click to hide internal directories.