Documentation
¶
Index ¶
- Constants
- Variables
- type Event
- type Exchange
- type GetGroupResponse
- type GroupMember
- type GroupResolver
- type GroupResolverHTTP
- type GroupResolverHttpAPIs
- type GroupResolverMemory
- type IsMutedMemberResponse
- type Member
- type MemberAgent
- type Message
- type MessageDispatcher
- type MessageReceiver
- type Meta
- type Msg
- type NATSExchange
- func (ex *NATSExchange) AssertStream(streamName string, subjects []string) error
- func (ex *NATSExchange) ChanQueueSubscribe(subject string, queue string, channels chan Msg) (Subscription, error)
- func (ex *NATSExchange) ChanSubscribe(subject string, channels chan Msg, durable string) (Subscription, error)
- func (ex *NATSExchange) ChanWatch(subject string, channels chan Msg) (Subscription, error)
- func (ex *NATSExchange) DeleteStream(streamName string) error
- func (ex *NATSExchange) EmitEvent(subject string, payload []byte) error
- func (ex *NATSExchange) EmitSignal(subject string, payload []byte) error
- func (ex *NATSExchange) Init(w Whisper) error
- func (ex *NATSExchange) SendBatchEvents(events []*Event) error
- type NATSMsg
- type NATSSubscription
- type Opt
- type Subscription
- type Whisper
Constants ¶
View Source
const ( DefaultAPI_GetMemberIDs = "/apis/v1/group/:group_id/members/id" DefaultAPI_IsMutedMember = "/apis/v1/group/:group_id/muted/:user_id" )
View Source
const ( MESSAGE_DISPATCHER_DURABLE_FORMAT = "%s_message_dispatcher" // <domain>_message_dispatcher MEMBER_SUBJECT_FORMAT = "%s.member.bucket.%v.%s" // <domain>.member.bucket.<hash>.<mid> MEMBER_STREAM_FORMAT = "%s_MEMBER_%d_MSG" // <domain>_MEMBER_<hash>_MSG )
View Source
const ( GROUP_SUBJECT_FORMAT = "%s.group.bucket.%v.%s" // <domain>.group.bucket.<hash>.<gid> GROUP_STREAM_FORMAT = "%s_GROUP_%d_MSG" // <domain>_GROUP_<hash>_MSG )
View Source
const (
MEMBER_AGENT_DURABLE_FORMAT = "%s_member_agent_%s" // <domain>_member_agent_<durable>
)
Variables ¶
View Source
var ( ErrGroupNotFound = errors.New("group_resolver: group not found") ErrOperationFailure = errors.New("group_resolver: operation failure") )
View Source
var (
ErrInvalidMessageFormat = errors.New("whisper: invalid message format")
)
Functions ¶
This section is empty.
Types ¶
type Exchange ¶
type Exchange interface { Init(w Whisper) error AssertStream(streamName string, subjects []string) error DeleteStream(streamName string) error EmitSignal(subject string, payload []byte) error EmitEvent(subject string, payload []byte) error SendBatchEvents(events []*Event) error ChanSubscribe(subject string, channels chan Msg, durable string) (Subscription, error) ChanQueueSubscribe(subject string, queue string, channels chan Msg) (Subscription, error) ChanWatch(subject string, channels chan Msg) (Subscription, error) }
func NewNATSExchange ¶
func NewNATSExchange(conn *nats.Conn) Exchange
type GetGroupResponse ¶
type GetGroupResponse struct {
Members []string `json:"members"`
}
type GroupMember ¶ added in v0.0.4
type GroupResolver ¶
type GroupResolver interface { Init(w Whisper) error GetMemberIDs(groupID string) ([]string, error) IsMutedMember(groupID string, memberID string) (bool, error) }
func NewGroupResolverHTTP ¶
func NewGroupResolverHTTP(url string, apis GroupResolverHttpAPIs) GroupResolver
type GroupResolverHTTP ¶
type GroupResolverHTTP struct {
// contains filtered or unexported fields
}
func (*GroupResolverHTTP) GetMemberIDs ¶ added in v0.0.4
func (gs *GroupResolverHTTP) GetMemberIDs(groupID string) ([]string, error)
func (*GroupResolverHTTP) Init ¶
func (gs *GroupResolverHTTP) Init(w Whisper) error
func (*GroupResolverHTTP) IsMutedMember ¶ added in v0.0.4
func (gs *GroupResolverHTTP) IsMutedMember(groupID string, userID string) (bool, error)
type GroupResolverHttpAPIs ¶ added in v0.0.4
type GroupResolverMemory ¶
type GroupResolverMemory struct {
// contains filtered or unexported fields
}
func NewGroupResolverMemory ¶
func NewGroupResolverMemory() *GroupResolverMemory
func (*GroupResolverMemory) GetMemberIDs ¶ added in v0.0.4
func (gs *GroupResolverMemory) GetMemberIDs(groupID string) ([]string, error)
func (*GroupResolverMemory) Init ¶
func (gs *GroupResolverMemory) Init(w Whisper) error
func (*GroupResolverMemory) IsMutedMember ¶ added in v0.0.4
func (gs *GroupResolverMemory) IsMutedMember(groupID string, userID string) (bool, error)
type IsMutedMemberResponse ¶ added in v0.0.4
type IsMutedMemberResponse struct {
IsMuted bool `json:"is_muted"`
}
type MemberAgent ¶
type MemberAgent interface { Close() error ChanWatch(ch chan Msg) error ChanSubscribe(durable string, ch chan Msg) error }
func NewMemberAgent ¶
func NewMemberAgent(w Whisper, memberID string) MemberAgent
type Message ¶
type Message struct { ID string `json:"id" validate:"required"` Type string `json:"type" validate:"required"` // normal, event Meta Meta `json:"meta" validate:"required"` Payload interface{} `json:"payload" validate:"required"` }
func ParseMessage ¶
type MessageDispatcher ¶
type MessageDispatcher interface { Init() error ClearStream() error Subscribe(buckets []int32) error Dispatch(memberID string, payload []byte) error DispatchAll(members []string, payload []byte) error Close() }
func NewMessageDispatcher ¶
func NewMessageDispatcher(w Whisper) MessageDispatcher
type MessageReceiver ¶
func NewMessageReceiver ¶
func NewMessageReceiver(w Whisper) MessageReceiver
type NATSExchange ¶
type NATSExchange struct {
// contains filtered or unexported fields
}
func (*NATSExchange) AssertStream ¶
func (ex *NATSExchange) AssertStream(streamName string, subjects []string) error
func (*NATSExchange) ChanQueueSubscribe ¶ added in v0.0.6
func (ex *NATSExchange) ChanQueueSubscribe(subject string, queue string, channels chan Msg) (Subscription, error)
func (*NATSExchange) ChanSubscribe ¶
func (ex *NATSExchange) ChanSubscribe(subject string, channels chan Msg, durable string) (Subscription, error)
func (*NATSExchange) ChanWatch ¶
func (ex *NATSExchange) ChanWatch(subject string, channels chan Msg) (Subscription, error)
func (*NATSExchange) DeleteStream ¶
func (ex *NATSExchange) DeleteStream(streamName string) error
func (*NATSExchange) EmitEvent ¶
func (ex *NATSExchange) EmitEvent(subject string, payload []byte) error
func (*NATSExchange) EmitSignal ¶
func (ex *NATSExchange) EmitSignal(subject string, payload []byte) error
func (*NATSExchange) Init ¶
func (ex *NATSExchange) Init(w Whisper) error
func (*NATSExchange) SendBatchEvents ¶
func (ex *NATSExchange) SendBatchEvents(events []*Event) error
type NATSSubscription ¶
type NATSSubscription struct {
// contains filtered or unexported fields
}
func (*NATSSubscription) Close ¶
func (s *NATSSubscription) Close()
func (*NATSSubscription) Unsubscribe ¶
func (s *NATSSubscription) Unsubscribe() error
type Opt ¶
type Opt func(*whisper)
func WithBucketSize ¶
func WithDomain ¶
func WithExchange ¶
func WithGroupResolver ¶
func WithGroupResolver(gs GroupResolver) Opt
type Subscription ¶
type Subscription interface {
Unsubscribe() error
}
type Whisper ¶
type Whisper interface { Init() error Exchange() Exchange GroupResolver() GroupResolver Domain() string BucketSize() int32 NewMessageReceiver() (MessageReceiver, error) NewMessageDispatcher() (MessageDispatcher, error) NewMemberAgent(memberID string) (MemberAgent, error) }
func NewWhisper ¶
Click to show internal directories.
Click to hide internal directories.