Documentation ¶
Index ¶
- Constants
- Variables
- func NewReader(conn redis.Conn, name string, options ...func(*RedReader)) io.ReadCloser
- func NewSyncReader(parent context.Context, r io.Reader, subs redis.Conn, ...) (io.ReadCloser, error)
- func NewWriter(conn redis.Conn, name string, options ...func(*RedWriter) error) (io.WriteCloser, error)
- func ReadLookahead(peek uint8) func(*RedReader)
- func SyncStarve(dur time.Duration) func(*SyncReader) error
- func SyncStdSub() func(*SyncReader) error
- func SyncSub(pubSubChan interface{}) func(*SyncReader) error
- func WriteExpire(sec uint16) func(*RedWriter) error
- func WriteMaxChunk(max uint16) func(*RedWriter) error
- func WriteMinChunk(min uint16) func(*RedWriter) error
- func WriteStdPub() func(*RedWriter) error
- func WriteTrailer(c CmdBuilder) func(*RedWriter) error
- type CmdBuilder
- type CmdBuilderFunc
- type RedReader
- type RedWriter
- type SyncReader
Constants ¶
const DefaultMaxChunkSize uint16 = 2 ^ 10 // 1024
DefaultMaxChunkSize is 1kb
Variables ¶
var ErrStarveEOF = errors.New("data not yet available")
ErrStarveEOF marks when we are returning a zero size buffer, in the cases where either we do not yet know the full size of the stream, or we do, and have not received that data yet.
Functions ¶
func NewReader ¶
NewReader creates an io.ReadCloser that assembles a byte stream as stored in complete chunks on a Redis hash. The underlying implementation is a *RedReader, and is the intended counterpart to the *RedWriter.
The name parameter is used as the key for the Redis hash.
Configuration is by functional options, as inspired by this blog post: http://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
See: ReadLookahead
func NewSyncReader ¶
func NewSyncReader( parent context.Context, r io.Reader, subs redis.Conn, options ...func(*SyncReader) error, ) (io.ReadCloser, error)
NewSyncReader creates an io.ReadCloser that reads bytes from an underlying io.Reader. However, if the underlying io.Reader returned an ErrStarveEOF, this implementation will block until receiving a Redis PubSub stimulus, whereupon it will attempt to read from the underlying io.Reader again.
The intendend usage for SyncReader is to be used with a RedReader (or any io.Reader that sends the same ErrStarveEOF), but it would function as a simple pass through for any other io.Reader.
This accepts a Context object, so a client can set a timeout, or cancel reading mid-stream.
The passed-in Redis connection *MUST* be different the one used in the underlying RedReader, as this pub/sub subscription cannot also handle concurrent transactional needs.
The client is expected to provide (at least) one pub/sub channel upon which this implementation will listen for syncronization events.
Configuration is by functional options, as inspired by this blog post: http://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
See: SyncSub, SyncStdSub, SyncStarve
func NewWriter ¶
func NewWriter(conn redis.Conn, name string, options ...func(*RedWriter) error) (io.WriteCloser, error)
NewWriter creates an io.WriteCloser that will write byte chunks to Redis. The underlying implementation is a *RedWriter.
The intendend usage for this is a growing stream of bytes where the full length is not known until the stream is closed. (Though, this does not preclude the ability to be used for streams of known/fixed length.)
A single Redis hash is the sole underlying data structure. The name parameter is used as the Redis key, and each chunk of bytes is stored as a field+value pair on the hash. This implementation reserves all field names starting with "c:" (for chunk), but is open to clients using any arbitrary other field names on this hash.
Network hits are minimized by using Redis pipelining. Per Write invocation, the buffer is apportioned into configurable-sized chunks, any trailers (arbitrary Redis command configured by the client) are assembled, and all of it is written out to Redis at once.
Upon Close, if >0 bytes had been written, a last pipeline is constructed, writing out any buffered data, an end-of-stream marker, and any configured trailers.
The default configuration has a 1024-byte maximum chunk size, and a 0-byte minimum chunk size, and no trailers. Clients will want to adjust these parameters to their use cases. (Some factors that may be considered in this tuning: desired ingress/egress data rates; Redis tuning; etc.)
Configuration is by functional options, as inspired by this blog post: http://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
See: WriteExpire, WriteMaxChunk, WriteMinChunk, WriteStdPub, WriteTrailer
func ReadLookahead ¶
ReadLookahead sets the number of chunks to optimistically look for when hitting the network doing a single fetch from Redis
func SyncStarve ¶
func SyncStarve(dur time.Duration) func(*SyncReader) error
SyncStarve sets the maximum amount of time to wait between stimulus/Read attempts. If this timeout is tripped, the reader will return io.ErrUnexpectedEOF
func SyncStdSub ¶
func SyncStdSub() func(*SyncReader) error
SyncStdSub sets up the "standard" channel subscription. This is the Read-side counterpart to WriteStdPub
func SyncSub ¶
func SyncSub(pubSubChan interface{}) func(*SyncReader) error
SyncSub allows a client to add an arbitrary Redis PubSub channel to listen to for stimulus
func WriteExpire ¶
WriteExpire sends a Redis EXPIRE command for the underlying hash upon each pipeline flush
func WriteMaxChunk ¶
WriteMaxChunk overrides the default maximum size per chunk written to Redis
func WriteMinChunk ¶
WriteMinChunk sets the minimums size per chunk written to Redis
func WriteStdPub ¶
WriteStdPub sends a standard Redis PUBLISH message for this hash on every pipeline flush. This is inteneded to be used in concert with a standard read-side subscription for synchronizing when there are new bytes to be read.
func WriteTrailer ¶
func WriteTrailer(c CmdBuilder) func(*RedWriter) error
WriteTrailer provides clients a way to configure arbitrary Redis commands to be included per pipeline flush
Types ¶
type CmdBuilder ¶
A CmdBuilder spits out the parameters that are used for invocation of a Redis command
type CmdBuilderFunc ¶
CmdBuilderFunc is a function adapter for the CmdBuilder interface
func (CmdBuilderFunc) Build ¶
func (f CmdBuilderFunc) Build() (string, []interface{}, error)
Build conforms to the CmdBuilder interface
type RedReader ¶
type RedReader struct {
// contains filtered or unexported fields
}
RedReader is the implementation that retrieves a variable-length stream of bytes from a Redis hash, as written by its RedWriter counterpart
func (*RedReader) Close ¶
Close conforms to the io.Closer interface
This will also close the underlying Redis connection
func (*RedReader) Read ¶
Read conforms to the io.Reader interface
It is recommended that the buffer p is at least as big as ((lookahead + 1) X maxChunkSize), otherwise you run the risk of receiving an io.ErrShortBuffer error.
(Receiving a io.ErrShortBuffer is not necessarily a fatal condition: the Read will try to "keep its place" in the stream and delivery what bytes it can per invocation.)
This implementation has one very specific behavior: if the end-of-stream marker has not yet been written, but there are currently no bytes to deliver, the error ErrStarveEOF is returned.
If the byte stream is already fully written, or would be by the Read's get to the end-of-stream marker, this acts just like any other io.Reader
type RedWriter ¶
type RedWriter struct {
// contains filtered or unexported fields
}
An RedWriter is the implementation that stores a variable-length stream of bytes into a Redis hash.
func (*RedWriter) Close ¶
Close conforms to the io.Closer interface.
It is necessary to write out any buffered (leftover) bytes, and to add the end-of-stream marker
This finishes by calling Close on the underlyling Redis connection.
func (*RedWriter) Write ¶
Write conforms to the io.Writer interface.
This is where the incoming buffer is apportioned into chunks, and written out to Redis via a pipeline.
There should be at most 1 hit to Redis per Write invocation. (Only fewer if a chunk is less than the configured minimum chunk size.)
type SyncReader ¶
type SyncReader struct {
// contains filtered or unexported fields
}
A SyncReader is the implementation that reads from an underlying io.Reader, and will block then retry upon stimulus if the underlying Reader returned ErrStarveEOF.
func (*SyncReader) Close ¶
func (sr *SyncReader) Close() error
Close conforms to the io.Closer interface
This will close the passed in Redis connection AND the passed in Reader if it also implements io.Closer
func (*SyncReader) Read ¶
func (sr *SyncReader) Read(p []byte) (n int, err error)
Read conforms to the io.Reader interface
This will pass through any results from the underlying Read, unless we recieved an ErrStarveEOF, whereupon it will block and re-attempt a Read when it receives pub/sub stimulus.