streams

package module
v0.0.0-...-58d6456 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2024 License: MIT Imports: 10 Imported by: 0

README

Moonwalker Streams Helper

Documentation

Index

Constants

View Source
const (
	FETCH_NO_WAIT   = 100000
	MAX_ACK_PENDING = -1
	MAX_DELIVERY    = -1
	MAX_BYTES       = 1000000000 // 1 GiB
)

Variables

This section is empty.

Functions

func NewJetstreamConsumer

func NewJetstreamConsumer(jStream *Stream, streamName string) *jetstreamConsumer

Types

type KeyHistory

type KeyHistory struct {
	Created  time.Time `json:"created"`
	Revision uint64    `json:"revision"`
	Value    string    `json:"value"`
}

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream(url, streamName, entriesPrefix, schemasPrefix string) *Stream

func (*Stream) AddKeyValue

func (this *Stream) AddKeyValue(bucket string, key string, value []byte) error

func (*Stream) CreateBucket

func (this *Stream) CreateBucket(bucket string, history int) error

func (*Stream) CreateConsumer

func (this *Stream) CreateConsumer(stream string, durable string) (jetstream.Consumer, error)

func (*Stream) CreateStream

func (this *Stream) CreateStream(subjects []string) (jetstream.Stream, error)

func (*Stream) DeleteKeyValue

func (this *Stream) DeleteKeyValue(bucket string, key string) error

func (*Stream) FetchAllMessages

func (this *Stream) FetchAllMessages(stream string, filters []string, startTime *time.Time) ([]string, error)

func (*Stream) FetchLastMessageBySubject

func (this *Stream) FetchLastMessageBySubject(filters []string) ([]byte, error)

func (*Stream) FetchLastMessagePerSubject

func (this *Stream) FetchLastMessagePerSubject(filters []string) (map[string][][]byte, error)

func (*Stream) GetKVs

func (this *Stream) GetKVs(nc *nats.Conn, bucket string) (map[string][]byte, error)

func (*Stream) GetKeyHistory

func (this *Stream) GetKeyHistory(bucket string, key string) error

func (*Stream) GetKeys

func (this *Stream) GetKeys(bucket string) ([]string, error)

func (*Stream) GetMessageByID

func (this *Stream) GetMessageByID(s jetstream.Stream, sequence uint64) ([]byte, error)

func (*Stream) GetMessageBySequence

func (this *Stream) GetMessageBySequence(stream string, sequence uint64) ([]byte, error)

func (*Stream) GetStream

func (this *Stream) GetStream(name string) (*nats.Conn, jetstream.Stream, error)

func (*Stream) GetValueByKey

func (this *Stream) GetValueByKey(bucket string, key string) ([]byte, error)

func (*Stream) Publish

func (this *Stream) Publish(subject string, payload []byte) (*jetstream.PubAck, error)

func (*Stream) PublishMsg

func (this *Stream) PublishMsg(subject string, payload []byte, publisher string) (*jetstream.PubAck, error)

func (*Stream) PutKeyValue

func (this *Stream) PutKeyValue(bucket string, key string, value []byte) error

func (*Stream) SetCredentialsPath

func (this *Stream) SetCredentialsPath(path string)

func (*Stream) SetNKeys

func (this *Stream) SetNKeys(user, seed string)

func (*Stream) UpdateKeyValue

func (this *Stream) UpdateKeyValue(bucket string, key string, value []byte, revision uint64) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL