aqdispatch

package module
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

README

AQDispatch

Dispatch RPC over The Big Red's Advanced Queue (DBMS_AQ).

You give the request type (needs a Name and a Payload), and the response type (needs an Errmsg and a Payload) to New, and Dispatcher receives the messages, and calls the given do function for each messages.

The output of that function is sent back as answer.

This is a dispatcher, which splits input over the NAME of the task, and manages separate disk queues for each NAME.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUnknownCommand = errors.New("unknown command")
	ErrSkipResponse   = errors.New("skip response")
	ErrEmpty          = errors.New("empty")
	ErrExit           = errors.New("exit")
	ErrAnswer         = errors.New("answer send error")
)

Functions

func MarshalProtobuf added in v0.7.0

func MarshalProtobuf(dst []byte, ms protobufMarshaler) []byte

Types

type Blob added in v0.7.0

type Blob struct {
	Bytes []byte `protobuf:"bytes,1,opt,name=Bytes,proto3" json:"Bytes,omitempty"`
}

func (*Blob) UnmarshalProtobuf added in v0.7.0

func (B *Blob) UnmarshalProtobuf(src []byte) (err error)

UnmarshalProtobuf unmarshals ts from protobuf message at src.

type Config

type Config struct {

	// Enc is needed when DB sends/requires something other than UTF-8
	Enc encoding.Encoding

	*slog.Logger

	// RequestKeyName is the attribute name instead of NAME.
	RequestKeyName string

	// DisQPrefix is the diskqueue file's prefix.
	DisQPrefix string
	// DisQPath is the path for the diskqueue.
	DisQPath string

	// Correlation specifies that only dequeue messages with the same correlation string.
	Correlation string
	// ResponseKeyPayload is the attribute name instead of PAYLOAD
	ResponseKeyPayload string
	// ResponseKeyBlob is the attribute name instead of AUX
	ResponseKeyBlob string

	// ResponseKeyErrMsg is the attribute name instead of ERRMSG
	ResponseKeyErrMsg string
	// RequestKeyPayload is the attribute name instead of PAYLOAD
	RequestKeyPayload string
	// RequestKeyBlob is the attribute name instead of AUX
	RequestKeyBlob string

	DisQMaxFileSize, DisQSyncEvery int64
	DisQSyncTimeout                time.Duration

	Timeout, PipeTimeout time.Duration
	// QueueCount is the approximate number of queues dispatched over this AQ.
	QueueCount int
	// Concurrency is the number of concurrent RPCs.
	Concurrency int

	DisQMinMsgSize, DisQMaxMsgSize int32
}

Config of the Dispatcher.

type Dispatcher

type Dispatcher struct {
	Timezone *time.Location
	// contains filtered or unexported fields
}

Dispatcher. After creating with New, run it with Run.

Reads tasks and store the messages in diskqueues - one for each distinct NAME. If Concurrency allows, calls the do function given in New, and sends the answer as PAYLOAD of what that writes as response.

func New

func New(
	db *sql.DB,
	conf Config,
	inQName, inQType string,
	do DoFunc,
	outQName, outQType string,
) (*Dispatcher, error)

New returns a new Dispatcher, which receives inQType typed messages on inQName.

Then it calls "do" function with the task, and sends its output as response on outQName queue in outQType message.

When outQNameand outQType is empty, no response is sent, no response queue is opened.

func (*Dispatcher) Close

func (di *Dispatcher) Close() error

Close the dispatcher.

func (*Dispatcher) Decode

func (di *Dispatcher) Decode(p []byte) string

Decode the string from DB's encoding to UTF-8.

func (*Dispatcher) Encode

func (di *Dispatcher) Encode(s string) string

Encode a string using the DB's encoding.

func (*Dispatcher) PurgeExpired added in v0.3.16

func (di *Dispatcher) PurgeExpired(ctx context.Context) error

PurgeExpired calls PurgeExpired on the underlying queues, purging expired messages.

func (*Dispatcher) Run added in v0.3.0

func (di *Dispatcher) Run(ctx context.Context, taskNames []string) error

Run the dispatcher, accepting tasks with names in taskNames.

type DoFunc added in v0.6.1

type DoFunc func(context.Context, io.Writer, *Task) (io.Reader, error)

DoFunc is the type of the function that processes the Task.

type Task

type Task struct {
	Name     string    `protobuf:"bytes,1,opt,name=Name,proto3" json:"Name,omitempty"`
	RefID    string    `protobuf:"bytes,2,opt,name=RefID,proto3" json:"RefID,omitempty"`
	Deadline time.Time `protobuf:"bytes,4,opt,name=Deadline,proto3" json:"Deadline,omitempty"`
	Payload  []byte    `protobuf:"bytes,5,opt,name=Payload,proto3" json:"Payload,omitempty"`
	Blobs    []*Blob   `protobuf:"bytes,6,rep,name=Blobs,proto3" json:"Blobs,omitempty"`
}

func (*Task) UnmarshalProtobuf added in v0.7.0

func (ts *Task) UnmarshalProtobuf(src []byte) (err error)

UnmarshalProtobuf unmarshals ts from protobuf message at src.

type Timestamp added in v0.7.0

type Timestamp struct {
	// seconds	int64	Represents seconds of UTC time since Unix epoch 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to 9999-12-31T23:59:59Z inclusive.
	Seconds int64 `protobuf:"int64,1,opt,name=Seconds,proto3"`
	// nanos	int32	Non-negative fractions of a second at nanosecond resolution. Negative second values with fractions must still have non-negative nanos values that count forward in time. Must be from 0 to 999,999,999 inclusive.
	Nanos int32 `protobuf:"int32,2,opt,name=Nanos,proto3"`
}

func (Timestamp) Time added in v0.7.0

func (ts Timestamp) Time() time.Time

func (*Timestamp) UnmarshalProtobuf added in v0.7.0

func (ts *Timestamp) UnmarshalProtobuf(src []byte) (err error)

UnmarshalProtobuf unmarshals ts from protobuf message at src.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL