Documentation
¶
Index ¶
- Constants
- func GetRedisTxBackoff(ii int) time.Duration
- func IsServerReady(ctx context.Context, client *redis.Client, timeout time.Duration) error
- func NewClient(ctx context.Context, cfg *RedisConfig) (*redis.Client, error)
- func SendMessage(ctx context.Context, client *redis.Client, msg Message) error
- type APIMessageReply
- type APIMessageRequest
- type DummyRedis
- type GetRequestBuf
- type Message
- type MessageHandler
- type RedisConfig
- type ReplyStatus
- type RequestHandler
- type StreamReplyCb
- type StreamRequestHandler
- type UnaryAPI
- func (s *UnaryAPI) DoRequest(ctx context.Context, methodName string, request, replyBuf interface{}) error
- func (s *UnaryAPI) DoStreamRequest(ctx context.Context, methodName string, request, replyBuf interface{}, ...) (reterr error)
- func (s *UnaryAPI) HandleRequests(ctx context.Context, methodName string, getReqBuf GetRequestBuf, ...)
- func (s *UnaryAPI) HandleStreamRequests(ctx context.Context, methodName string, getReqBuf GetRequestBuf, ...)
Constants ¶
const ( MaxRedisWait = time.Second * 30 RedisTxMaxRetries = 10 // Special IDs in the streams API RedisSmallestId = "-" RedisGreatestId = "+" RedisLastId = "$" DefaultCfgRedisOptional int = iota DefaultCfgRedisStandalone DefaultCfgRedisHA RedisStandalonePort = "6379" DefaultRedisStandaloneAddr = "127.0.0.1:" + RedisStandalonePort RedisHeadlessService = "redis-headless" RedisCloudletStandaloneAddr = RedisHeadlessService + ":" + RedisStandalonePort // for redis running in a cloudlet DefaultRedisMasterName = "redismaster" DefaultRedisSentinelAddrs = "127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381" )
Variables ¶
This section is empty.
Functions ¶
func GetRedisTxBackoff ¶
func IsServerReady ¶
Types ¶
type APIMessageReply ¶
type APIMessageReply struct { Msg interface{} Error string Status ReplyStatus }
type APIMessageRequest ¶
type APIMessageRequest struct { ID string Msg interface{} }
type DummyRedis ¶
type DummyRedis struct {
// contains filtered or unexported fields
}
func NewMockRedisServer ¶
func NewMockRedisServer() (*DummyRedis, error)
func (*DummyRedis) Close ¶
func (r *DummyRedis) Close()
func (*DummyRedis) FastForward ¶
func (r *DummyRedis) FastForward(d time.Duration)
func (*DummyRedis) GetSentinelAddr ¶
func (r *DummyRedis) GetSentinelAddr() string
func (*DummyRedis) GetStandaloneAddr ¶
func (r *DummyRedis) GetStandaloneAddr() string
type GetRequestBuf ¶
type GetRequestBuf func() interface{}
GetRequestBuf should return an empty Message of the expected underlying type to unmarshal the message data into.
type MessageHandler ¶
type MessageHandler struct {
// contains filtered or unexported fields
}
MessageHandler is used to wait for a specific message.
func Subscribe ¶
func Subscribe(ctx context.Context, client *redis.Client, desired Message) (*MessageHandler, error)
Subscribe shall be done before the code that will trigger the message being sent. After that code, call WaitForMessage(). The desired Message parameter must have its key value set. An example of this would be:
desired := &edgeproto.Info{ Key: someKey, }
h := Subscribe(ctx, client, desired) defer h.Close() <code to trigger message send>
err := h.WaitForMessage(10*time.Second, func() bool { if desired.State == TARGET_STATE { return true } return false })
func (*MessageHandler) WaitForMessage ¶
WaitForMessage waits for the desired message to be received. The message when received is copied into the desired Message parameter passed to Subscribe. The isDone callback should check the state of desired and return true if we're done. Context may have timeout set.
type RedisConfig ¶
func (*RedisConfig) AddrSpecified ¶
func (r *RedisConfig) AddrSpecified() bool
func (*RedisConfig) InitFlags ¶
func (r *RedisConfig) InitFlags(defaultCfgType int)
type ReplyStatus ¶
type ReplyStatus int
const ( ReplyStatusStreaming ReplyStatus = iota ReplyStatusSuccess ReplyStatusFailure )
type RequestHandler ¶
RequestHandler shall handle the request and return the reply. It can return nil, nil to avoid sending a reply.
type StreamReplyCb ¶
type StreamReplyCb func(reply interface{}) error
type StreamRequestHandler ¶
type StreamRequestHandler func(ctx context.Context, req interface{}, sendReply StreamReplyCb) error
type UnaryAPI ¶
type UnaryAPI struct {
// contains filtered or unexported fields
}
UnaryAPI allows for request-reply over Redis.
func NewUnaryAPI ¶
func NewUnaryAPI(client *redis.Client) *UnaryAPI
NewUnaryAPI creates a new UnaryAPI for request-reply over Redis.
func (*UnaryAPI) DoRequest ¶
func (s *UnaryAPI) DoRequest(ctx context.Context, methodName string, request, replyBuf interface{}) error
DoRequest is a blocking call that sends the request and waits for the reply. The reply is written into replyBuf. The methodName identifies the API, it is analogous to the URL in an HTTP request or the method name in a GRPC service.
func (*UnaryAPI) DoStreamRequest ¶
func (s *UnaryAPI) DoStreamRequest(ctx context.Context, methodName string, request, replyBuf interface{}, cb func() error) (reterr error)
DoStreamRequest is a blocking call that sends the request and waits for multiple replies. Each reply is handled serially and written to the replyBuf parameter. The callback function is called and must return before it processes the next reply.
func (*UnaryAPI) HandleRequests ¶
func (s *UnaryAPI) HandleRequests(ctx context.Context, methodName string, getReqBuf GetRequestBuf, requestHandler RequestHandler)
HandleRequests waits for requests, then calls the handler function to generate a reply. This is a blocking function, which spawns a go thread for each incoming request message.
func (*UnaryAPI) HandleStreamRequests ¶
func (s *UnaryAPI) HandleStreamRequests(ctx context.Context, methodName string, getReqBuf GetRequestBuf, requestHandler StreamRequestHandler)