whisper

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2023 License: Apache-2.0 Imports: 8 Imported by: 0

README

whisper

Instant Messaging Engine

Documentation

Index

Constants

View Source
const (
	DefaultAPI_GetMemberIDs  = "/apis/v1/group/:group_id/members/id"
	DefaultAPI_IsMutedMember = "/apis/v1/group/:group_id/muted/:user_id"
)
View Source
const (
	MESSAGE_DISPATCHER_DURABLE_FORMAT = "%s_message_dispatcher"  // <domain>_message_dispatcher
	MEMBER_SUBJECT_FORMAT             = "%s.member.bucket.%v.%s" // <domain>.member.bucket.<hash>.<mid>
	MEMBER_STREAM_FORMAT              = "%s_MEMBER_%d_MSG"       // <domain>_MEMBER_<hash>_MSG
)
View Source
const (
	GROUP_SUBJECT_FORMAT = "%s.group.bucket.%v.%s" // <domain>.group.bucket.<hash>.<gid>
	GROUP_STREAM_FORMAT  = "%s_GROUP_%d_MSG"       // <domain>_GROUP_<hash>_MSG
)
View Source
const (
	MEMBER_AGENT_DURABLE_FORMAT = "%s_member_agent_%s" // <domain>_member_agent_<durable>
)

Variables

View Source
var (
	ErrGroupNotFound    = errors.New("group_resolver: group not found")
	ErrOperationFailure = errors.New("group_resolver: operation failure")
)
View Source
var (
	ErrInvalidMessageFormat = errors.New("whisper: invalid message format")
)

Functions

This section is empty.

Types

type Event

type Event struct {
	Subject string
	Payload []byte
}

type Exchange

type Exchange interface {
	Init(w Whisper) error
	AssertStream(streamName string, subjects []string) error
	DeleteStream(streamName string) error
	EmitSignal(subject string, payload []byte) error
	EmitEvent(subject string, payload []byte) error
	SendBatchEvents(events []*Event) error
	ChanSubscribe(subject string, channels chan Msg, durable string) (Subscription, error)
	ChanQueueSubscribe(subject string, queue string, channels chan Msg) (Subscription, error)
	ChanWatch(subject string, channels chan Msg) (Subscription, error)
}

func NewNATSExchange

func NewNATSExchange(conn *nats.Conn) Exchange

type GetGroupResponse

type GetGroupResponse struct {
	Members []string `json:"members"`
}

type GroupMember added in v0.0.4

type GroupMember struct {
	Member   *Member `json:"member" validate:"required"`
	JoinedAt int64   `json:"joined_at"`
}

type GroupResolver

type GroupResolver interface {
	Init(w Whisper) error
	GetMemberIDs(groupID string) ([]string, error)
	IsMutedMember(groupID string, memberID string) (bool, error)
}

func NewGroupResolverHTTP

func NewGroupResolverHTTP(url string, apis GroupResolverHttpAPIs) GroupResolver

type GroupResolverHTTP

type GroupResolverHTTP struct {
	// contains filtered or unexported fields
}

func (*GroupResolverHTTP) GetMemberIDs added in v0.0.4

func (gs *GroupResolverHTTP) GetMemberIDs(groupID string) ([]string, error)

func (*GroupResolverHTTP) Init

func (gs *GroupResolverHTTP) Init(w Whisper) error

func (*GroupResolverHTTP) IsMutedMember added in v0.0.4

func (gs *GroupResolverHTTP) IsMutedMember(groupID string, userID string) (bool, error)

type GroupResolverHttpAPIs added in v0.0.4

type GroupResolverHttpAPIs struct {
	GetMemberIDs  string
	IsMutedMember string
}

type GroupResolverMemory

type GroupResolverMemory struct {
	// contains filtered or unexported fields
}

func NewGroupResolverMemory

func NewGroupResolverMemory() *GroupResolverMemory

func (*GroupResolverMemory) GetMemberIDs added in v0.0.4

func (gs *GroupResolverMemory) GetMemberIDs(groupID string) ([]string, error)

func (*GroupResolverMemory) Init

func (gs *GroupResolverMemory) Init(w Whisper) error

func (*GroupResolverMemory) IsMutedMember added in v0.0.4

func (gs *GroupResolverMemory) IsMutedMember(groupID string, userID string) (bool, error)

type IsMutedMemberResponse added in v0.0.4

type IsMutedMemberResponse struct {
	IsMuted bool `json:"is_muted"`
}

type Member

type Member struct {
	ID          string `json:"id" validate:"required"`
	DisplayName string `json:"display_name" validate:"required"`
	Avatar      string `json:"avatar"`
}

type MemberAgent

type MemberAgent interface {
	Close() error
	ChanWatch(ch chan Msg) error
	ChanSubscribe(durable string, ch chan Msg) error
}

func NewMemberAgent

func NewMemberAgent(w Whisper, memberID string) MemberAgent

type Message

type Message struct {
	ID      string      `json:"id" validate:"required"`
	Type    string      `json:"type" validate:"required"` // normal, event
	Meta    Meta        `json:"meta" validate:"required"`
	Payload interface{} `json:"payload" validate:"required"`
}

func ParseMessage

func ParseMessage(data []byte) (*Message, error)

func (*Message) ToJSON

func (m *Message) ToJSON() []byte

type MessageDispatcher

type MessageDispatcher interface {
	Init() error
	ClearStream() error
	Subscribe(buckets []int32) error
	Dispatch(memberID string, payload []byte) error
	DispatchAll(members []string, payload []byte) error
	Close()
}

func NewMessageDispatcher

func NewMessageDispatcher(w Whisper) MessageDispatcher

type MessageReceiver

type MessageReceiver interface {
	Init() error
	ClearStream() error
	Receive(payload []byte) error
}

func NewMessageReceiver

func NewMessageReceiver(w Whisper) MessageReceiver

type Meta

type Meta struct {
	Sender      *Member  `json:"sender" validate:"required"`
	Group       string   `json:"group" validate:"required"`
	ContentType string   `json:"content_type"` // plain, image
	CreatedAt   int64    `json:"created_at" validate:"required"`
	Reference   *Message `json:"ref"`
}

type Msg

type Msg interface {
	Payload() []byte
	Ack() error
	Nak() error
}

type NATSExchange

type NATSExchange struct {
	// contains filtered or unexported fields
}

func (*NATSExchange) AssertStream

func (ex *NATSExchange) AssertStream(streamName string, subjects []string) error

func (*NATSExchange) ChanQueueSubscribe added in v0.0.6

func (ex *NATSExchange) ChanQueueSubscribe(subject string, queue string, channels chan Msg) (Subscription, error)

func (*NATSExchange) ChanSubscribe

func (ex *NATSExchange) ChanSubscribe(subject string, channels chan Msg, durable string) (Subscription, error)

func (*NATSExchange) ChanWatch

func (ex *NATSExchange) ChanWatch(subject string, channels chan Msg) (Subscription, error)

func (*NATSExchange) DeleteStream

func (ex *NATSExchange) DeleteStream(streamName string) error

func (*NATSExchange) EmitEvent

func (ex *NATSExchange) EmitEvent(subject string, payload []byte) error

func (*NATSExchange) EmitSignal

func (ex *NATSExchange) EmitSignal(subject string, payload []byte) error

func (*NATSExchange) Init

func (ex *NATSExchange) Init(w Whisper) error

func (*NATSExchange) SendBatchEvents

func (ex *NATSExchange) SendBatchEvents(events []*Event) error

type NATSMsg

type NATSMsg struct {
	// contains filtered or unexported fields
}

func (*NATSMsg) Ack

func (m *NATSMsg) Ack() error

func (*NATSMsg) Nak

func (m *NATSMsg) Nak() error

func (*NATSMsg) Payload

func (m *NATSMsg) Payload() []byte

type NATSSubscription

type NATSSubscription struct {
	// contains filtered or unexported fields
}

func (*NATSSubscription) Close

func (s *NATSSubscription) Close()

func (*NATSSubscription) Unsubscribe

func (s *NATSSubscription) Unsubscribe() error

type Opt

type Opt func(*whisper)

func WithBucketSize

func WithBucketSize(size int32) Opt

func WithDomain

func WithDomain(domain string) Opt

func WithExchange

func WithExchange(ex Exchange) Opt

func WithGroupResolver

func WithGroupResolver(gs GroupResolver) Opt

type Subscription

type Subscription interface {
	Unsubscribe() error
}

type Whisper

type Whisper interface {
	Init() error
	Exchange() Exchange
	GroupResolver() GroupResolver
	Domain() string
	BucketSize() int32
	NewMessageReceiver() (MessageReceiver, error)
	NewMessageDispatcher() (MessageDispatcher, error)
	NewMemberAgent(memberID string) (MemberAgent, error)
}

func NewWhisper

func NewWhisper(opts ...Opt) Whisper

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL