Documentation ¶
Index ¶
- Constants
- Variables
- func NewLogger(handlerOpts slog.HandlerOptions, w io.Writer, system string) *slog.Logger
- func RegisterMonitoringServiceServer(s grpc.ServiceRegistrar, srv MonitoringServiceServer)
- func SetWithHandlerOptions(handlerOpts slog.HandlerOptions, writers ...io.Writer)
- type CurrentWorkingsRequest
- type CurrentWorkingsResponse
- func (*CurrentWorkingsResponse) Descriptor() ([]byte, []int)deprecated
- func (x *CurrentWorkingsResponse) GetTasks() []*Task
- func (*CurrentWorkingsResponse) ProtoMessage()
- func (x *CurrentWorkingsResponse) ProtoReflect() protoreflect.Message
- func (x *CurrentWorkingsResponse) Reset()
- func (x *CurrentWorkingsResponse) String() string
- type Gateway
- type GatewayParameter
- func FetchInterval(d time.Duration) GatewayParameter
- func FetchParallel(n int) GatewayParameter
- func FetcherMaxMessages(n int32) GatewayParameter
- func FetcherQueueLocker(l locker.QueueLocker) GatewayParameter
- func FetcherVisibilityTimeout(d time.Duration) GatewayParameter
- func FetcherWaitTime(d time.Duration) GatewayParameter
- type HTTPInvoker
- type Invoker
- type Message
- type MonitoringService
- type MonitoringServiceClient
- type MonitoringServiceServer
- type System
- type SystemBuilder
- type Task
- func (*Task) Descriptor() ([]byte, []int)deprecated
- func (x *Task) GetId() string
- func (x *Task) GetReceipt() string
- func (x *Task) GetStartedAt() *timestamppb.Timestamp
- func (*Task) ProtoMessage()
- func (x *Task) ProtoReflect() protoreflect.Message
- func (x *Task) Reset()
- func (x *Task) String() string
- type UnimplementedMonitoringServiceServer
- type UnsafeMonitoringServiceServer
Constants ¶
const DisableMonitoring = -1
DisableMonitoring makes gRPC server disable to run.
Variables ¶
var ErrRetainMessage = errors.New("this message should be retained")
ErrRetainMessage shows that this message should keep in queue. So, this error means that worker must not to remove message.
var File_sqsd_proto protoreflect.FileDescriptor
var MonitoringService_ServiceDesc = grpc.ServiceDesc{ ServiceName: "sqsd.MonitoringService", HandlerType: (*MonitoringServiceServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "CurrentWorkings", Handler: _MonitoringService_CurrentWorkings_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "sqsd.proto", }
MonitoringService_ServiceDesc is the grpc.ServiceDesc for MonitoringService service. It's only intended for direct use with grpc.RegisterService, and not to be introspected or modified (even as a copy)
Functions ¶
func RegisterMonitoringServiceServer ¶
func RegisterMonitoringServiceServer(s grpc.ServiceRegistrar, srv MonitoringServiceServer)
func SetWithHandlerOptions ¶
func SetWithHandlerOptions(handlerOpts slog.HandlerOptions, writers ...io.Writer)
SetWithHandlerOptions sets default logger with slog handler options. if io.Writer is not supplied, use os.Stderr.
Types ¶
type CurrentWorkingsRequest ¶
type CurrentWorkingsRequest struct {
// contains filtered or unexported fields
}
func (*CurrentWorkingsRequest) Descriptor
deprecated
func (*CurrentWorkingsRequest) Descriptor() ([]byte, []int)
Deprecated: Use CurrentWorkingsRequest.ProtoReflect.Descriptor instead.
func (*CurrentWorkingsRequest) ProtoMessage ¶
func (*CurrentWorkingsRequest) ProtoMessage()
func (*CurrentWorkingsRequest) ProtoReflect ¶
func (x *CurrentWorkingsRequest) ProtoReflect() protoreflect.Message
func (*CurrentWorkingsRequest) Reset ¶
func (x *CurrentWorkingsRequest) Reset()
func (*CurrentWorkingsRequest) String ¶
func (x *CurrentWorkingsRequest) String() string
type CurrentWorkingsResponse ¶
type CurrentWorkingsResponse struct { Tasks []*Task `protobuf:"bytes,1,rep,name=tasks,proto3" json:"tasks,omitempty"` // contains filtered or unexported fields }
func (*CurrentWorkingsResponse) Descriptor
deprecated
func (*CurrentWorkingsResponse) Descriptor() ([]byte, []int)
Deprecated: Use CurrentWorkingsResponse.ProtoReflect.Descriptor instead.
func (*CurrentWorkingsResponse) GetTasks ¶
func (x *CurrentWorkingsResponse) GetTasks() []*Task
func (*CurrentWorkingsResponse) ProtoMessage ¶
func (*CurrentWorkingsResponse) ProtoMessage()
func (*CurrentWorkingsResponse) ProtoReflect ¶
func (x *CurrentWorkingsResponse) ProtoReflect() protoreflect.Message
func (*CurrentWorkingsResponse) Reset ¶
func (x *CurrentWorkingsResponse) Reset()
func (*CurrentWorkingsResponse) String ¶
func (x *CurrentWorkingsResponse) String() string
type Gateway ¶
type Gateway struct {
// contains filtered or unexported fields
}
Gateway fetches and removes jobs from SQS.
func NewGateway ¶
func NewGateway(queue *sqs.Client, queueURL string, params ...GatewayParameter) *Gateway
NewGateway returns Gateway object.
type GatewayParameter ¶
type GatewayParameter func(*gatewayParams)
GatewayParameter sets parameter to fetcher by functional option pattern.
func FetchInterval ¶
func FetchInterval(d time.Duration) GatewayParameter
FetchInterval sets interval duration of receiving queue request to fetcher.
func FetchParallel ¶
func FetchParallel(n int) GatewayParameter
FetcherParalles sets pallalel count of fetching process to SQS.
func FetcherMaxMessages ¶
func FetcherMaxMessages(n int32) GatewayParameter
FetcherMaxMessages sets MaxNumberOfMessages of SQS between 1 and 10. Fetcher's default value is 10. if supplied value is out of range, forcely sets 1 or 10. (if n is less than 1, set 1 and is more than 10, set 10)
func FetcherQueueLocker ¶
func FetcherQueueLocker(l locker.QueueLocker) GatewayParameter
FetcherQueueLocker sets FetcherQueueLocker in Gateway to block duplicated queue.
func FetcherVisibilityTimeout ¶
func FetcherVisibilityTimeout(d time.Duration) GatewayParameter
FetcherVisibilityTimeout sets VisibilityTimeout of receiving message request.
func FetcherWaitTime ¶
func FetcherWaitTime(d time.Duration) GatewayParameter
FetcherWaitTime sets WaitTimeSecond of receiving message request.
type HTTPInvoker ¶
type HTTPInvoker struct {
// contains filtered or unexported fields
}
HTTPInvoker invokes worker process by HTTP POST request.
func NewHTTPInvoker ¶
func NewHTTPInvoker(rawurl string, dur time.Duration) (*HTTPInvoker, error)
NewHTTPInvoker returns HTTPInvoker instance.
type MonitoringService ¶
type MonitoringService struct { UnimplementedMonitoringServiceServer // contains filtered or unexported fields }
MonitoringService provides grpc handler for MonitoringService.
func NewMonitoringService ¶
func NewMonitoringService(consumer *worker) *MonitoringService
NewMonitoringService returns new MonitoringService object.
func (*MonitoringService) CurrentWorkings ¶
func (s *MonitoringService) CurrentWorkings(ctx context.Context, _ *CurrentWorkingsRequest) (*CurrentWorkingsResponse, error)
CurrentWorkings handles CurrentWorkings grpc request using actor system.
func (*MonitoringService) WaitUntilAllEnds ¶
func (s *MonitoringService) WaitUntilAllEnds(timeout time.Duration) error
WaitUntilAllEnds waits until all worker tasks finishes.
type MonitoringServiceClient ¶
type MonitoringServiceClient interface {
CurrentWorkings(ctx context.Context, in *CurrentWorkingsRequest, opts ...grpc.CallOption) (*CurrentWorkingsResponse, error)
}
MonitoringServiceClient is the client API for MonitoringService service.
For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
func NewMonitoringServiceClient ¶
func NewMonitoringServiceClient(cc grpc.ClientConnInterface) MonitoringServiceClient
type MonitoringServiceServer ¶
type MonitoringServiceServer interface { CurrentWorkings(context.Context, *CurrentWorkingsRequest) (*CurrentWorkingsResponse, error) // contains filtered or unexported methods }
MonitoringServiceServer is the server API for MonitoringService service. All implementations must embed UnimplementedMonitoringServiceServer for forward compatibility
type System ¶
type System struct {
// contains filtered or unexported fields
}
System controls actor system of sqsd.
type SystemBuilder ¶
type SystemBuilder func(*System)
SystemBuilder provides constructor for system object requirements.
func ConsumerBuilder ¶
func ConsumerBuilder(invoker Invoker, parallel int) SystemBuilder
ConsumerBuilder builds consumer for system.
func GatewayBuilder ¶
func GatewayBuilder(queue *sqs.Client, queueURL string, parallel int, timeout time.Duration, params ...GatewayParameter) SystemBuilder
GatewayBuilder builds gateway for system.
func MonitorBuilder ¶
func MonitorBuilder(port int) SystemBuilder
MonitorBuilder sets monitor server port to system.
type Task ¶
type Task struct { Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` Receipt string `protobuf:"bytes,2,opt,name=receipt,proto3" json:"receipt,omitempty"` StartedAt *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=started_at,json=startedAt,proto3" json:"started_at,omitempty"` // contains filtered or unexported fields }
func (*Task) Descriptor
deprecated
func (*Task) GetReceipt ¶
func (*Task) GetStartedAt ¶
func (x *Task) GetStartedAt() *timestamppb.Timestamp
func (*Task) ProtoMessage ¶
func (*Task) ProtoMessage()
func (*Task) ProtoReflect ¶
func (x *Task) ProtoReflect() protoreflect.Message
type UnimplementedMonitoringServiceServer ¶
type UnimplementedMonitoringServiceServer struct { }
UnimplementedMonitoringServiceServer must be embedded to have forward compatible implementations.
func (UnimplementedMonitoringServiceServer) CurrentWorkings ¶
func (UnimplementedMonitoringServiceServer) CurrentWorkings(context.Context, *CurrentWorkingsRequest) (*CurrentWorkingsResponse, error)
type UnsafeMonitoringServiceServer ¶
type UnsafeMonitoringServiceServer interface {
// contains filtered or unexported methods
}
UnsafeMonitoringServiceServer may be embedded to opt out of forward compatibility for this service. Use of this interface is not recommended, as added methods to MonitoringServiceServer will result in compilation errors.