Documentation
¶
Index ¶
- Constants
- func AddAuthFlags(fs *flag.FlagSet) func() (*azservicebus.Client, *admin.Client, error)
- func ConstantlyUpdateQueue(ctx context.Context, adminClient *admin.Client, queue string, ...) error
- func ForceQueueDetach(ctx context.Context, adminClient *admin.Client, queue string) error
- func LoadEnvironment() error
- func MustCreateAutoDeletingQueue(sc *StressContext, queueName string, qp *admin.QueueProperties) *admin.Client
- func MustCreateSubscriptions(sc *StressContext, topicName string, subscriptionNames []string, ...) func()
- func MustGenerateMessages(sc *StressContext, sender *TrackingSender, messageLimit int, numExtraBytes int)
- func NewCtrlCContext() (context.Context, context.CancelFunc)
- func TrackDuration(ctx context.Context, tc *TelemetryClientWrapper, name Metric) func(map[string]string)
- func TrackError(ctx context.Context, tc *TelemetryClientWrapper, err error)
- func TrackMetric(ctx context.Context, tc *TelemetryClientWrapper, name Metric, value float64, ...)
- func UpdateBaggage(ctx context.Context, baggage map[string]string) map[string]string
- func WithBaggage(ctx context.Context, baggage map[string]string) context.Context
- type BaseTelemetry
- type EventTelemetry
- type ExceptionTelemetry
- type Metric
- type MetricTelemetry
- type MustCreateSubscriptionsOptions
- type StreamingMessageBatch
- type StressContext
- func (tracker *StressContext) Assert(condition bool, message string)
- func (sc *StressContext) End()
- func (tracker *StressContext) Equal(val1 any, val2 any)
- func (tracker *StressContext) Failf(format string, args ...any)
- func (sc *StressContext) LogIfFailed(message string, err error)
- func (tracker *StressContext) Nil(val1 any)
- func (tracker *StressContext) NoError(err error)
- func (tracker *StressContext) NoErrorf(err error, format string, args ...any)
- func (tracker *StressContext) PanicOnError(message string, err error)
- func (sc *StressContext) Start(entityName string, attributes map[string]string)
- type StressContextOptions
- type TelemetryClientWrapper
- type TelemetryClientWrapperContext
- type TestContext
- type TrackingReceiver
- func (tr *TrackingReceiver) AbandonMessage(ctx context.Context, message *azservicebus.ReceivedMessage, ...) error
- func (tr *TrackingReceiver) Close(ctx context.Context) error
- func (tr *TrackingReceiver) CompleteMessage(ctx context.Context, message *azservicebus.ReceivedMessage, ...) error
- func (tr *TrackingReceiver) PeekMessages(ctx context.Context, maxMessageCount int, ...) ([]*azservicebus.ReceivedMessage, error)
- func (tr *TrackingReceiver) ReceiveMessages(ctx context.Context, maxMessages int, ...) ([]*azservicebus.ReceivedMessage, error)
- func (tr *TrackingReceiver) RenewMessageLock(ctx context.Context, msg *azservicebus.ReceivedMessage, ...) error
- type TrackingSender
- func (ts *TrackingSender) Close(ctx context.Context) error
- func (ts *TrackingSender) NewMessageBatch(ctx context.Context, options *azservicebus.MessageBatchOptions) (*azservicebus.MessageBatch, error)
- func (ts *TrackingSender) SendMessage(ctx context.Context, message *azservicebus.Message, ...) error
- func (ts *TrackingSender) SendMessageBatch(ctx context.Context, batch *azservicebus.MessageBatch, ...) error
Constants ¶
const ( AttrAMQPDeliveryState string = "amqp.delivery_state" AttrAMQPStatusCode string = "amqp.status_code" // TODO: I made these up entirely AttrMessageCount string = "amqp.message_count" )
const (
MetricStressSuccessfulCancels = "stress.cancels"
)
these metrics are specific to stress tests and wouldn't be in customer code.
Variables ¶
This section is empty.
Functions ¶
func AddAuthFlags ¶
AddAuthFlags adds the flags needed for authenticating to Service Bus. Returns a function that can be called after the flags have been parsed, which will create the an *azservicebus.Client.
func ConstantlyUpdateQueue ¶
func ConstantlyUpdateQueue(ctx context.Context, adminClient *admin.Client, queue string, updateInterval time.Duration) error
ConstantlyUpdateQueue updates queue, changing the MaxDeliveryCount properly between 11 and 10, every `updateInterval` This will cause Service Bus to issue force-detaches to our links, allowing us to exercise our recovery logic.
func ForceQueueDetach ¶ added in v0.3.3
func LoadEnvironment ¶
func LoadEnvironment() error
LoadEnvironment loads an .env file. If the env var `ENV_FILE` exists, we assume the value is a path to an .env file Otherwise we fall back to loading from the current directory.
func MustCreateAutoDeletingQueue ¶
func MustCreateAutoDeletingQueue(sc *StressContext, queueName string, qp *admin.QueueProperties) *admin.Client
MustCreateAutoDeletingQueue creates a queue that will auto-delete 10 minutes after activity has ceased.
func MustCreateSubscriptions ¶
func MustCreateSubscriptions(sc *StressContext, topicName string, subscriptionNames []string, options *MustCreateSubscriptionsOptions) func()
func MustGenerateMessages ¶
func MustGenerateMessages(sc *StressContext, sender *TrackingSender, messageLimit int, numExtraBytes int)
func NewCtrlCContext ¶
func NewCtrlCContext() (context.Context, context.CancelFunc)
NewCtrlCContext creates a context that cancels if the user hits ctrl+c.
func TrackDuration ¶ added in v1.6.0
func TrackDuration(ctx context.Context, tc *TelemetryClientWrapper, name Metric) func(map[string]string)
TrackDuration tracks durations (as a metric), using the initial call to TrackDuration as the start. The duration is ended when you call the returned function. TrackDuration respects any included baggage in the context.
func TrackError ¶ added in v1.6.0
func TrackError(ctx context.Context, tc *TelemetryClientWrapper, err error)
TrackError tracks an error (using the AppInsights exceptions table). TrackError respects any included baggage in the context.
NOTE: this function does not consider context cancellations/deadlines as errors.
func TrackMetric ¶ added in v1.6.0
func TrackMetric(ctx context.Context, tc *TelemetryClientWrapper, name Metric, value float64, attrs map[string]string)
TrackMetric tracks metric and respects any included baggage in the context.
func UpdateBaggage ¶ added in v1.6.0
Types ¶
type BaseTelemetry ¶ added in v1.7.2
type EventTelemetry ¶ added in v1.7.2
func NewEventTelemetry ¶ added in v1.7.2
func NewEventTelemetry(name string) *EventTelemetry
type ExceptionTelemetry ¶ added in v1.7.2
type ExceptionTelemetry struct {
BaseTelemetry BaseTelemetry
}
func NewExceptionTelemetry ¶ added in v1.7.2
func NewExceptionTelemetry(err error) *ExceptionTelemetry
type Metric ¶ added in v1.6.0
type Metric string
const ( MetricConnectionLost Metric = "messaging.servicebus.connectionlost" MetricMessagesSent Metric = "messaging.servicebus.messages.sent" // metrics related to Service Bus sessions (NOT amqp sessions) MetricSessionAccept Metric = "messaging.servicebus.session.accept" MetricSessionTimeoutMS Metric = "messaging.servicebus.session.timeout" MetricSettlementRequestDuration Metric = "messaging.servicebus.settlement.request.duration" MetricReceiveLag Metric = "messaging.servicebus.receiver.lag" MetricAMQPSendDuration Metric = "messaging.az.amqp.producer.send.duration" MetricAMQPMgmtRequestDuration Metric = "messaging.az.amqp.management.request.duration" MetricAMQPSettlementRequestDuration Metric = "messaging.servicebus.settlement.request.duration" MetricAMQPSettlementSequenceNum Metric = "messaging.servicebus.settlement.sequence_number" // TODO: I've made these up entirely. MetricMessageReceived Metric = "messaging.servicebus.messages.received" MetricMessagePeeked Metric = "messaging.servicebus.messages.peeked" MetricCloseDuration Metric = "messaging.servicebus.close.duration" MetricLockRenew Metric = "messaging.servicebus.lockrenew.duration" // TODO: separate for session vs message lock? )
These names are modeled off of the metrics from Java https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/instrumentation/ServiceBusMeter.java
and from our standard for attributes: https://gist.github.com/lmolkova/e4215c0f44a49ef824983382762e6b92
type MetricTelemetry ¶ added in v1.7.2
type MetricTelemetry struct { Name string Value any BaseTelemetry BaseTelemetry }
type MustCreateSubscriptionsOptions ¶ added in v1.1.4
type MustCreateSubscriptionsOptions struct { Topic *admin.CreateTopicOptions Subscription *admin.CreateSubscriptionOptions }
type StreamingMessageBatch ¶
type StreamingMessageBatch struct {
// contains filtered or unexported fields
}
func NewStreamingMessageBatch ¶
func NewStreamingMessageBatch(ctx context.Context, sender internalBatchSender, expectedTotal int) (*StreamingMessageBatch, error)
func (*StreamingMessageBatch) Add ¶
func (sb *StreamingMessageBatch) Add(ctx context.Context, msg *azservicebus.Message, options *azservicebus.AddMessageOptions) error
Add appends to the current batch. If it's full it'll send it, allocate a new one.
type StressContext ¶
type StressContext struct { TC *TelemetryClientWrapper Context context.Context // TestRunID represents the test run and can be used to tie into other container metrics generated within the test cluster. TestRunID string // Nano is the nanoseconds start time for the stress test run Nano string // Endpoint is the value from SERVICEBUS_ENDPOINT Endpoint string Cred azcore.TokenCredential // contains filtered or unexported fields }
StressContext holds onto some common useful state for stress tests, including some simple stats tracking, a telemetry client and a context that represents the lifetime of the test itself (and will be cancelled if the user quits out of the stress)
func MustCreateStressContext ¶
func MustCreateStressContext(testName string, options *StressContextOptions) *StressContext
func (*StressContext) Assert ¶ added in v0.3.4
func (tracker *StressContext) Assert(condition bool, message string)
func (*StressContext) End ¶
func (sc *StressContext) End()
func (*StressContext) Equal ¶ added in v1.1.4
func (tracker *StressContext) Equal(val1 any, val2 any)
func (*StressContext) Failf ¶ added in v1.1.0
func (tracker *StressContext) Failf(format string, args ...any)
func (*StressContext) LogIfFailed ¶
func (sc *StressContext) LogIfFailed(message string, err error)
func (*StressContext) Nil ¶ added in v1.1.4
func (tracker *StressContext) Nil(val1 any)
func (*StressContext) NoError ¶ added in v1.1.4
func (tracker *StressContext) NoError(err error)
func (*StressContext) NoErrorf ¶ added in v1.1.4
func (tracker *StressContext) NoErrorf(err error, format string, args ...any)
func (*StressContext) PanicOnError ¶
func (tracker *StressContext) PanicOnError(message string, err error)
PanicOnError logs, sends telemetry and then closes on error
type StressContextOptions ¶ added in v1.3.0
type StressContextOptions struct { // Duration is the amount of time the stress test should run before // the StressContext.Context expires. Duration time.Duration // CommonBaggage will be added as part of the telemetry client, and will be included in each // metric/event/error that's reported. CommonBaggage map[string]string // EmitStartEvent enables the automatic sending of the "Start" event for our test to telemetry. EmitStartEvent bool }
type TelemetryClientWrapper ¶ added in v1.7.2
type TelemetryClientWrapper struct {
// contains filtered or unexported fields
}
TelemetryClientWrapper is a wrapper for telemetry client, once we get that phased back in.
func (*TelemetryClientWrapper) Context ¶ added in v1.7.2
func (tc *TelemetryClientWrapper) Context() *TelemetryClientWrapperContext
func (*TelemetryClientWrapper) Flush ¶ added in v1.7.2
func (tc *TelemetryClientWrapper) Flush()
func (*TelemetryClientWrapper) Track ¶ added in v1.7.2
func (tc *TelemetryClientWrapper) Track(evt any)
func (*TelemetryClientWrapper) TrackEvent ¶ added in v1.7.2
func (tc *TelemetryClientWrapper) TrackEvent(name string)
func (*TelemetryClientWrapper) TrackException ¶ added in v1.7.2
func (tc *TelemetryClientWrapper) TrackException(err error)
type TelemetryClientWrapperContext ¶ added in v1.7.2
type TestContext ¶
type TestContext struct { *StressContext Client *azservicebus.Client }
type TrackingReceiver ¶ added in v1.6.0
type TrackingReceiver struct {
// contains filtered or unexported fields
}
TrackingReceiver reports metrics and errors automatically for its methods.
func NewTrackingReceiverForQueue ¶ added in v1.6.0
func NewTrackingReceiverForQueue(tc *TelemetryClientWrapper, client *azservicebus.Client, queueName string, options *azservicebus.ReceiverOptions) (*TrackingReceiver, error)
func NewTrackingReceiverForSubscription ¶ added in v1.6.0
func NewTrackingReceiverForSubscription(tc *TelemetryClientWrapper, client *azservicebus.Client, topicName string, subscriptionName string, options *azservicebus.ReceiverOptions) (*TrackingReceiver, error)
func (*TrackingReceiver) AbandonMessage ¶ added in v1.6.0
func (tr *TrackingReceiver) AbandonMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.AbandonMessageOptions) error
func (*TrackingReceiver) Close ¶ added in v1.6.0
func (tr *TrackingReceiver) Close(ctx context.Context) error
func (*TrackingReceiver) CompleteMessage ¶ added in v1.6.0
func (tr *TrackingReceiver) CompleteMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.CompleteMessageOptions) error
func (*TrackingReceiver) PeekMessages ¶ added in v1.6.0
func (tr *TrackingReceiver) PeekMessages(ctx context.Context, maxMessageCount int, options *azservicebus.PeekMessagesOptions) ([]*azservicebus.ReceivedMessage, error)
func (*TrackingReceiver) ReceiveMessages ¶ added in v1.6.0
func (tr *TrackingReceiver) ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error)
func (*TrackingReceiver) RenewMessageLock ¶ added in v1.6.0
func (tr *TrackingReceiver) RenewMessageLock(ctx context.Context, msg *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error
type TrackingSender ¶ added in v1.6.0
type TrackingSender struct {
// contains filtered or unexported fields
}
TrackingSender reports metrics and errors automatically for its methods.
func NewTrackingSender ¶ added in v1.6.0
func NewTrackingSender(tc *TelemetryClientWrapper, client *azservicebus.Client, queueOrTopic string, options *azservicebus.NewSenderOptions) (*TrackingSender, error)
func (*TrackingSender) Close ¶ added in v1.6.0
func (ts *TrackingSender) Close(ctx context.Context) error
func (*TrackingSender) NewMessageBatch ¶ added in v1.6.0
func (ts *TrackingSender) NewMessageBatch(ctx context.Context, options *azservicebus.MessageBatchOptions) (*azservicebus.MessageBatch, error)
func (*TrackingSender) SendMessage ¶ added in v1.6.0
func (ts *TrackingSender) SendMessage(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error
func (*TrackingSender) SendMessageBatch ¶ added in v1.6.0
func (ts *TrackingSender) SendMessageBatch(ctx context.Context, batch *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error