Documentation ¶
Index ¶
- Constants
- Variables
- func Compare(o1, o2 Offset) (int, error)
- func NewLengthDelimitedFrameReader(r io.ReadCloser) io.ReadCloser
- func NewLengthDelimitedFrameWriter(w io.Writer) io.Writer
- func WithUser(ctx context.Context, user User) context.Context
- type Attributes
- type AttributesCodec
- type AttributesDecoder
- type AttributesEncoder
- type DataFrame
- type DataFrameCodec
- type DataFrameDecoder
- type DataFrameEncoder
- type DataFrameReader
- type Index
- type Interface
- type Offset
- type Range
- type StreamStatus
- type Tags
- func (t Tags) Contains(t1 Tags) bool
- func (t Tags) Diff(t1 Tags) (add Tags, del Tags, update Tags)
- func (t Tags) Empty() bool
- func (t Tags) Equals(t1 Tags) bool
- func (t Tags) Get(key string) string
- func (t Tags) Has(key string) bool
- func (t Tags) Set(key string, value string)
- func (t Tags) ToJSON() string
- func (t Tags) Validate() error
- type User
- type UserAware
- type UserWithToken
- type Watcher
- type WorkerStatus
Constants ¶
View Source
const ( ContentTypeProtobuf = "application/vnd.google.protobuf" ContentTypeFlatbuffer = "application/x-flatbuffers" ContentTypeJSON = "application/json" )
View Source
const ( Backend = "meta.backend" MaxPayloadBytes = "meta.maxPayloadBytes" UserIdentifyHeader = "meta.header.userIdentifyHeader" GroupIdentifyHeader = "meta.header.groupIdentifyHeader" StreamLength = "stream.length" StreamApproximateLength = "stream.approxMaxLength" )
const attributes keys the queue service implement must provide.
Variables ¶
View Source
var MaxIndex = FromUint64(uint64(math.MaxUint64))
Functions ¶
func NewLengthDelimitedFrameReader ¶
func NewLengthDelimitedFrameReader(r io.ReadCloser) io.ReadCloser
NewLengthDelimitedFrameReader returns an io.Reader that will decode length-prefixed frames off of a stream.
The protocol is:
stream: message ... message: prefix body prefix: 4 byte uint32 in BigEndian order, denotes length of body body: bytes (0..prefix)
If the buffer passed to Read is not long enough to contain an entire frame, io.ErrShortRead will be returned along with the number of bytes read.
Types ¶
type Attributes ¶
type AttributesCodec ¶
type AttributesCodec interface { MediaType() string AttributesEncoder AttributesDecoder }
AttributesCodec helps to encode or decode Attributes from or to bytes.
func AttributesCodecFor ¶
func AttributesCodecFor(contentType string) AttributesCodec
type AttributesDecoder ¶
type AttributesDecoder interface {
Decode([]byte, *Attributes) error
}
type AttributesEncoder ¶
type AttributesEncoder interface {
Encode(Attributes Attributes, w io.Writer) error
}
type DataFrameCodec ¶
type DataFrameCodec interface { MediaType() string DataFrameEncoder DataFrameDecoder }
DataFrameCodec helps to encode or decode a DataFrame from or to bytes.
func DataFrameCodecFor ¶
func DataFrameCodecFor(contentType string) DataFrameCodec
type DataFrameDecoder ¶
type DataFrameEncoder ¶
type DataFrameReader ¶
type DataFrameReader interface { // FrameChan return a DataFrame channel. FrameChan() <-chan DataFrame }
type Interface ¶
type Interface interface { // End normally emits 'EOS' symbol to end up the queue asynchronously, // but if force set to true, stream ends up directly. // Undelivered data will be truncated. End(ctx context.Context, force bool) error // Truncate truncates data before the specific index. Truncate(ctx context.Context, index uint64) error // Put appends new data into stream. Put(ctx context.Context, data []byte, tags Tags) (index uint64, err error) // Get returns data frames from the index of stream in queue. // Param length specifies the expected message count. // And if timeout is set, this call will block until length got satisfied or // timeout timer fires. Get(ctx context.Context, index uint64, length int, timeout time.Duration, tags Tags) (dfs []DataFrame, err error) // Watch subscribe to queue service, when new data frame is appended through Put method, // watcher will emit it through its result channel. // Param index specifies the beginning message index of the watch. // Param window specifies the largest size the Watcher could transfer at one time. Watch(ctx context.Context, index uint64, indexOnly bool, noAck bool, window uint64) (Watcher, error) // Commit commits indices to make the corresponding messages marked as consumed. Commit(ctx context.Context, del bool, indexes ...uint64) error // Del deletes indices to make the corresponding messages deleted from stream. Del(ctx context.Context, indexes ...uint64) error // Attributes reflects self dynamic attributes by K/V pairs. Attributes() Attributes }
Interface of QueueService. Core abstraction for streaming framework.
type Range ¶
func ParseRange ¶
type StreamStatus ¶
type StreamStatus string
const ( StreamOk StreamStatus = "OK" StreamCancel StreamStatus = "Cancel" StreamEnd StreamStatus = "End" )
type User ¶
type User interface { // Uid represents the user id. Uid() string // Gid represents the group id of user. Gid() string // Token represents the access token of the queue service. Token() string }
User authenticated information.
type UserWithToken ¶
type UserWithToken interface { // Token to access the backend service. Token() string }
type Watcher ¶
type Watcher interface { // Watcher is a kind of DataFrameReader. DataFrameReader // Close stops Watcher and closes the FrameChan. Close() }
Watcher is the entity following the stream.
type WorkerStatus ¶
type WorkerStatus string
const ( WorkerRunning WorkerStatus = "Running" WorkerStopped WorkerStatus = "Stopped" WorkerError WorkerStatus = "Error" WorkerUnknown WorkerStatus = "Unknown" )
Click to show internal directories.
Click to hide internal directories.