subscriptions

package
v0.0.51-rc23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2019 License: MPL-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SubscriptionCreated string = "subscription_created"
	SubscriptionDeleted string = "subscription_deleted"
)

Variables

View Source
var (
	ErrSubscriptionNotFound = errors.New("subscription not found")
)
View Source
var (
	SEP = byte('/')
)

Functions

func TenantTopicIndexer

func TenantTopicIndexer() *topicIndexer

Types

type Channel added in v0.0.39

type Channel interface {
	Broadcast([]byte)
}

type INode

type INode struct {
	// contains filtered or unexported fields
}

func NewINode

func NewINode() *INode

func (*INode) AddNode

func (d *INode) AddNode(node *Node)

func (*INode) Insert

func (d *INode) Insert(topic Topic, tenant string, subscription Subscription)

func (*INode) Remove

func (d *INode) Remove(tenant, id string, topic Topic) error

func (*INode) Select

func (d *INode) Select(tenant string, set SubscriptionSet, topic Topic) SubscriptionSet

type Metadata added in v0.0.39

type Metadata struct {
	ID                   string   `protobuf:"bytes,1,opt,name=ID,proto3" json:"ID,omitempty"`
	SessionID            string   `protobuf:"bytes,2,opt,name=SessionID,proto3" json:"SessionID,omitempty"`
	Tenant               string   `protobuf:"bytes,3,opt,name=Tenant,proto3" json:"Tenant,omitempty"`
	Pattern              []byte   `protobuf:"bytes,4,opt,name=Pattern,proto3" json:"Pattern,omitempty"`
	Qos                  int32    `protobuf:"varint,5,opt,name=Qos,proto3" json:"Qos,omitempty"`
	Peer                 string   `protobuf:"bytes,6,opt,name=Peer,proto3" json:"Peer,omitempty"`
	LastAdded            int64    `protobuf:"varint,7,opt,name=LastAdded,proto3" json:"LastAdded,omitempty"`
	LastDeleted          int64    `protobuf:"varint,8,opt,name=LastDeleted,proto3" json:"LastDeleted,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*Metadata) Descriptor added in v0.0.39

func (*Metadata) Descriptor() ([]byte, []int)

func (*Metadata) GetID added in v0.0.39

func (m *Metadata) GetID() string

func (*Metadata) GetLastAdded added in v0.0.39

func (m *Metadata) GetLastAdded() int64

func (*Metadata) GetLastDeleted added in v0.0.39

func (m *Metadata) GetLastDeleted() int64

func (*Metadata) GetPattern added in v0.0.39

func (m *Metadata) GetPattern() []byte

func (*Metadata) GetPeer added in v0.0.39

func (m *Metadata) GetPeer() string

func (*Metadata) GetQos added in v0.0.39

func (m *Metadata) GetQos() int32

func (*Metadata) GetSessionID added in v0.0.39

func (m *Metadata) GetSessionID() string

func (*Metadata) GetTenant added in v0.0.39

func (m *Metadata) GetTenant() string

func (*Metadata) ProtoMessage added in v0.0.39

func (*Metadata) ProtoMessage()

func (*Metadata) Reset added in v0.0.39

func (m *Metadata) Reset()

func (*Metadata) String added in v0.0.39

func (m *Metadata) String() string

func (*Metadata) XXX_DiscardUnknown added in v0.0.51

func (m *Metadata) XXX_DiscardUnknown()

func (*Metadata) XXX_Marshal added in v0.0.51

func (m *Metadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Metadata) XXX_Merge added in v0.0.51

func (m *Metadata) XXX_Merge(src proto.Message)

func (*Metadata) XXX_Size added in v0.0.51

func (m *Metadata) XXX_Size() int

func (*Metadata) XXX_Unmarshal added in v0.0.51

func (m *Metadata) XXX_Unmarshal(b []byte) error

type Node

type Node struct {
	// contains filtered or unexported fields
}

func NewNode

func NewNode(tenant string, pattern []byte) *Node

func (*Node) AddSubscription

func (n *Node) AddSubscription(tenant string, subscription Subscription) *Node

func (*Node) DelSubscription

func (n *Node) DelSubscription(id string) *Node

type RemoteSender added in v0.0.39

type RemoteSender func(host string, session string, publish packet.Publish) error

type Store

type Store interface {
	ByTopic(tenant string, pattern []byte) (SubscriptionSet, error)
	ByID(id string) (Subscription, error)
	All() (SubscriptionSet, error)
	ByPeer(peer string) (SubscriptionSet, error)
	BySession(id string) (SubscriptionSet, error)
	Sessions() ([]string, error)
	Create(message Subscription, sender func(context.Context, packet.Publish) error) error
	Delete(id string) error
	On(event string, handler func(Subscription)) func()
}

func NewMemDBStore

func NewMemDBStore(mesh cluster.ServiceLayer, sender RemoteSender) (Store, error)

type Subscription

type Subscription struct {
	Metadata
	Sender func(context.Context, packet.Publish) error
}

type SubscriptionFilter

type SubscriptionFilter func(Subscription) bool

type SubscriptionMetadataList added in v0.0.39

type SubscriptionMetadataList struct {
	Metadatas            []*Metadata `protobuf:"bytes,1,rep,name=Metadatas,proto3" json:"Metadatas,omitempty"`
	XXX_NoUnkeyedLiteral struct{}    `json:"-"`
	XXX_unrecognized     []byte      `json:"-"`
	XXX_sizecache        int32       `json:"-"`
}

func (*SubscriptionMetadataList) Descriptor added in v0.0.39

func (*SubscriptionMetadataList) Descriptor() ([]byte, []int)

func (*SubscriptionMetadataList) GetMetadatas added in v0.0.39

func (m *SubscriptionMetadataList) GetMetadatas() []*Metadata

func (*SubscriptionMetadataList) ProtoMessage added in v0.0.39

func (*SubscriptionMetadataList) ProtoMessage()

func (*SubscriptionMetadataList) Reset added in v0.0.39

func (m *SubscriptionMetadataList) Reset()

func (*SubscriptionMetadataList) String added in v0.0.39

func (m *SubscriptionMetadataList) String() string

func (*SubscriptionMetadataList) XXX_DiscardUnknown added in v0.0.51

func (m *SubscriptionMetadataList) XXX_DiscardUnknown()

func (*SubscriptionMetadataList) XXX_Marshal added in v0.0.51

func (m *SubscriptionMetadataList) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*SubscriptionMetadataList) XXX_Merge added in v0.0.51

func (m *SubscriptionMetadataList) XXX_Merge(src proto.Message)

func (*SubscriptionMetadataList) XXX_Size added in v0.0.51

func (m *SubscriptionMetadataList) XXX_Size() int

func (*SubscriptionMetadataList) XXX_Unmarshal added in v0.0.51

func (m *SubscriptionMetadataList) XXX_Unmarshal(b []byte) error

type SubscriptionSet added in v0.0.39

type SubscriptionSet []Subscription

func (SubscriptionSet) Apply added in v0.0.39

func (set SubscriptionSet) Apply(f func(Subscription))

func (SubscriptionSet) ApplyE added in v0.0.39

func (set SubscriptionSet) ApplyE(f func(Subscription) error) (err error)

func (SubscriptionSet) ApplyIdx added in v0.0.39

func (set SubscriptionSet) ApplyIdx(f func(idx int, s Subscription))

func (SubscriptionSet) Filter added in v0.0.39

func (set SubscriptionSet) Filter(filters ...SubscriptionFilter) SubscriptionSet

type Topic

type Topic []byte

func (Topic) Chop

func (t Topic) Chop() (Topic, []byte, bool)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL