topic

package
v0.0.0-...-703b195 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2023 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Attachment

type Attachment interface {
	Send(ctx context.Context, m Message)
}

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker manages the set of topics active on this node.

func NewBroker

func NewBroker(options Options) *Broker

func (*Broker) GetTopic

func (b *Broker) GetTopic(name string) *Topic

GetTopic returns the topic with the given name. If the topic is not active it is activated and returned.

type Message

type Message struct {
	Topic   string
	Message []byte
	Offset  uint64
}

type Options

type Options struct {
	// Persisted indicates the commit log segments should be persisted to disk.
	Persisted bool

	// Dir is the directory to store the commit log segments if persisted.
	Dir string

	// SegmentSize is the size of the commit log segments to use.
	SegmentSize uint64
}

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription reads messages from the topic and sends to the connection.

func NewSubscription

func NewSubscription(attachment Attachment, topic *Topic) (*Subscription, uint64)

NewSubscription creates a subscription to the given topic starting from the next message in the topic.

func NewSubscriptionFromOffset

func NewSubscriptionFromOffset(attachment Attachment, topic *Topic, offset uint64) (*Subscription, uint64)

NewSubscriptionFromOffset creates a subscription to the given topic, starting at the next message after the given offset. If the offset is less than the earliest message retained by the topic, will subscribe from that earliest retained message.

func (*Subscription) Notify

func (s *Subscription) Notify(m Message)

Notify notifys the subscriber about a new message.

func (*Subscription) Shutdown

func (s *Subscription) Shutdown()

Shutdown unsubscribes and stops the send loop.

type Subscriptions

type Subscriptions struct {
	// contains filtered or unexported fields
}

func NewSubscriptions

func NewSubscriptions(broker *Broker, attachment Attachment) *Subscriptions

func (*Subscriptions) AddSubscription

func (s *Subscriptions) AddSubscription(topicName string) uint64

func (*Subscriptions) AddSubscriptionFromOffset

func (s *Subscriptions) AddSubscriptionFromOffset(topicName string, lastOffset uint64) uint64

func (*Subscriptions) UnsubscribeAll

func (s *Subscriptions) UnsubscribeAll()

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic(name string, options Options) *Topic

func (*Topic) GetMessage

func (t *Topic) GetMessage(offset uint64) ([]byte, error)

GetMessage returns the message with the given offset. If the offset is less than the earliest message, will round up to the next message.

func (*Topic) Name

func (t *Topic) Name() string

func (*Topic) Offset

func (t *Topic) Offset() uint64

Offset returns the offset of the last message processed.

func (*Topic) Publish

func (t *Topic) Publish(b []byte)

func (*Topic) Subscribe

func (t *Topic) Subscribe(s *Subscription)

func (*Topic) SubscribeIfLatest

func (t *Topic) SubscribeIfLatest(offset uint64, s *Subscription) bool

func (*Topic) Unsubscribe

func (t *Topic) Unsubscribe(s *Subscription)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL