Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func MarshalProtobuf ¶ added in v0.7.0
Types ¶
type Blob ¶ added in v0.7.0
type Blob struct {
Bytes []byte `protobuf:"bytes,1,opt,name=Bytes,proto3" json:"Bytes,omitempty"`
}
func (*Blob) UnmarshalProtobuf ¶ added in v0.7.0
UnmarshalProtobuf unmarshals ts from protobuf message at src.
type Config ¶
type Config struct { // Enc is needed when DB sends/requires something other than UTF-8 Enc encoding.Encoding *slog.Logger // RequestKeyName is the attribute name instead of NAME. RequestKeyName string // DisQPrefix is the diskqueue file's prefix. DisQPrefix string // DisQPath is the path for the diskqueue. DisQPath string // Correlation specifies that only dequeue messages with the same correlation string. Correlation string // ResponseKeyPayload is the attribute name instead of PAYLOAD ResponseKeyPayload string // ResponseKeyBlob is the attribute name instead of AUX ResponseKeyBlob string // ResponseKeyErrMsg is the attribute name instead of ERRMSG ResponseKeyErrMsg string // RequestKeyPayload is the attribute name instead of PAYLOAD RequestKeyPayload string // RequestKeyBlob is the attribute name instead of AUX RequestKeyBlob string DisQMaxFileSize, DisQSyncEvery int64 DisQSyncTimeout time.Duration Timeout, PipeTimeout time.Duration // QueueCount is the approximate number of queues dispatched over this AQ. QueueCount int // Concurrency is the number of concurrent RPCs. Concurrency int DisQMinMsgSize, DisQMaxMsgSize int32 }
Config of the Dispatcher.
type Dispatcher ¶
Dispatcher. After creating with New, run it with Run.
Reads tasks and store the messages in diskqueues - one for each distinct NAME. If Concurrency allows, calls the do function given in New, and sends the answer as PAYLOAD of what that writes as response.
func New ¶
func New( db *sql.DB, conf Config, inQName, inQType string, do DoFunc, outQName, outQType string, ) (*Dispatcher, error)
New returns a new Dispatcher, which receives inQType typed messages on inQName.
Then it calls "do" function with the task, and sends its output as response on outQName queue in outQType message.
When outQNameand outQType is empty, no response is sent, no response queue is opened.
func (*Dispatcher) Decode ¶
func (di *Dispatcher) Decode(p []byte) string
Decode the string from DB's encoding to UTF-8.
func (*Dispatcher) Encode ¶
func (di *Dispatcher) Encode(s string) string
Encode a string using the DB's encoding.
func (*Dispatcher) PurgeExpired ¶ added in v0.3.16
func (di *Dispatcher) PurgeExpired(ctx context.Context) error
PurgeExpired calls PurgeExpired on the underlying queues, purging expired messages.
type Task ¶
type Task struct { Name string `protobuf:"bytes,1,opt,name=Name,proto3" json:"Name,omitempty"` RefID string `protobuf:"bytes,2,opt,name=RefID,proto3" json:"RefID,omitempty"` Deadline time.Time `protobuf:"bytes,4,opt,name=Deadline,proto3" json:"Deadline,omitempty"` Payload []byte `protobuf:"bytes,5,opt,name=Payload,proto3" json:"Payload,omitempty"` Blobs []*Blob `protobuf:"bytes,6,rep,name=Blobs,proto3" json:"Blobs,omitempty"` }
func (*Task) UnmarshalProtobuf ¶ added in v0.7.0
UnmarshalProtobuf unmarshals ts from protobuf message at src.
type Timestamp ¶ added in v0.7.0
type Timestamp struct { // seconds int64 Represents seconds of UTC time since Unix epoch 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to 9999-12-31T23:59:59Z inclusive. Seconds int64 `protobuf:"int64,1,opt,name=Seconds,proto3"` // nanos int32 Non-negative fractions of a second at nanosecond resolution. Negative second values with fractions must still have non-negative nanos values that count forward in time. Must be from 0 to 999,999,999 inclusive. Nanos int32 `protobuf:"int32,2,opt,name=Nanos,proto3"` }
func (*Timestamp) UnmarshalProtobuf ¶ added in v0.7.0
UnmarshalProtobuf unmarshals ts from protobuf message at src.