Documentation
¶
Overview ¶
+k8s:deepcopy-gen=package
Index ¶
Constants ¶
const (
// DefaultMaxWait is the default number of waiting pull requests.
DefaultMaxWait = 512
)
Variables ¶
This section is empty.
Functions ¶
func ConfigV1Alpha1ToNats ¶
func ConfigV1Alpha1ToNats(name string, config *ConsumerConfig) (*nats.ConsumerConfig, error)
Types ¶
type ClusterInfo ¶
type ClusterInfo struct { // Name is the name of the cluster. Name string `json:"name,omitempty"` // Leader is the leader of the cluster. Leader string `json:"leader,omitempty"` // Replicas are the replicas of the cluster. Replicas []*PeerInfo `json:"replicas,omitempty"` }
StreamObservationClusterInfo shows information about the underlying set of servers that make up the stream or consumer.
func (*ClusterInfo) DeepCopy ¶
func (in *ClusterInfo) DeepCopy() *ClusterInfo
DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterInfo.
func (*ClusterInfo) DeepCopyInto ¶
func (in *ClusterInfo) DeepCopyInto(out *ClusterInfo)
DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
type ConsumerConfig ¶
type ConsumerConfig struct { // Description is a human readable description of the consumer. // This can be particularly useful for ephemeral consumers to indicate their purpose since the durable name cannot be provided. // +kubebuilder:validation:Optional Description string `json:"description,omitempty"` // DeliverPolicy defines the point in the stream to receive messages from, either All, Last, New, ByStartSequence, ByStartTime, or LastPerSubject. // Fore more information see https://docs.nats.io/jetstream/concepts/consumers#deliverpolicy // +kubebuilder:validation:Enum=All;Last;New;ByStartSequence;ByStartTime;LastPerSubject // +kubebuilder:default=All // +kubebuilder:validation:Required DeliverPolicy string `json:"deliverPolicy"` // OptStartSeq is an optional start sequence number and is used with the DeliverByStartSequence deliver policy. // +kubebuilder:validation:Optional OptStartSeq uint64 `json:"optStartSeq,omitempty"` // OptStartTime is an optional start time and is used with the DeliverByStartTime deliver policy. // The time format is RFC 3339, e.g. 2023-01-09T14:48:32Z // +kubebuilder:validation:Pattern="^((?:(\\d{4}-\\d{2}-\\d{2})T(\\d{2}:\\d{2}:\\d{2}(?:\\.\\d+)?))(Z|[\\+-]\\d{2}:\\d{2})?)$" // +kubebuilder:validation:Optional // +kubebuilder:validation:Type=string OptStartTime string `json:"optStartTime,omitempty"` // AckPolicy describes the requirement of client acknowledgements, either Explicit, None, or All. // For more information see https://docs.nats.io/nats-concepts/jetstream/consumers#ackpolicy // +kubebuilder:validation:Enum=Explicit;None;All // +kubebuilder:validation:Required // +kubebuilder:validation:Type=string // +kubebuilder:default=Explicit AckPolicy string `json:"ackPolicy"` // AckWait is the duration that the server will wait for an ack for any individual message once it has been delivered to a consumer. // If an ack is not received in time, the message will be redelivered. // Format is a string duration, e.g. 1h, 1m, 1s, 1h30m or 2h3m4s. // +kubebuilder:validation:Pattern="([0-9]+h)?([0-9]+m)?([0-9]+s)?" // +kubebuilder:validation:Required // +kubebuilder:validation:Type=string // +kubebuilder:default="30s" AckWait string `json:"ackWait"` // MaxDeliver is the maximum number of times a specific message delivery will be attempted. // Applies to any message that is re-sent due to ack policy (i.e. due to a negative ack, or no ack sent by the client). // +kubebuilder:validation:Optional // +kubebuilder:default=-1 MaxDeliver int `json:"maxDeliver,omitempty"` // Backoff is a list of time durations that represent the time to delay based on delivery count. // Format of the durations is a string duration, e.g. 1h, 1m, 1s, 1h30m or 2h3m4s where multiple durations are separated by commas. // Example: `1s,2s,3s,4s,5s`. // +kubebuilder:validation:Pattern="^(([0-9]+h)?([0-9]+m)?([0-9]+s)?)(?:,\\s*(([0-9]+h)?([0-9]+m)?([0-9]+s)?))*$" // +kubebuilder:validation:Optional BackOff string `json:"backoff,omitempty"` // FilterSubject defines an overlapping subject with the subjects bound to the stream which will filter the set of messages received by the consumer. FilterSubject string `json:"filterSubject,omitempty"` // ReplayPolicy is used to define the mode of message replay. // If the policy is Instant, the messages will be pushed to the client as fast as possible while adhering to the Ack Policy, Max Ack Pending and the client's ability to consume those messages. // If the policy is Original, the messages in the stream will be pushed to the client at the same rate that they were originally received, simulating the original timing of messages. // +kubebuilder:validation:Enum=Instant;Original // +kubebuilder:validation:Required // +kubebuilder:default=Instant ReplayPolicy string `json:"replayPolicy"` // SampleFrequency sets the percentage of acknowledgements that should be sampled for observability. // +kubebuilder:validation:Pattern="^([1-9][0-9]?|100)%?$" // +kubebuilder:validation:Optional SampleFrequency string `json:"sampleFreq,omitempty"` // MaxAckPending sets the number of outstanding acks that are allowed before message delivery is halted. // +kubebuilder:validation:Optional // +kubebuilder:default=1000 MaxAckPending int `json:"maxAckPending,omitempty"` // InactiveThreshold defines the duration that instructs the server to cleanup consumers that are inactive for that long. // Format is a string duration, e.g. 1h, 1m, 1s, 1h30m or 2h3m4s. // +kubebuilder:validation:Pattern="([0-9]+h)?([0-9]+m)?([0-9]+s)?" // +kubebuilder:validation:Optional InactiveThreshold string `json:"inactiveThreshold,omitempty"` // Replicas sets the number of replicas for the consumer's state. // By default, when the value is set to zero, consumers inherit the number of replicas from the stream. // +kubebuilder:validation:Required // +kubebuilder:default=0 Replicas int `json:"numReplicas"` // MemoryStorage if set, forces the consumer state to be kept in memory rather than inherit the storage type of the stream (file in this case). // +kubebuilder:validation:Optional MemoryStorage bool `json:"memStorage,omitempty"` // PullConsumer defines the pull-based consumer configuration. // +kubebuilder:validation:Optional PullConsumer *PullConsumerSpec `json:"pull,omitempty"` // PushConsumer defines the push-based consumer configuration. // +kubebuilder:validation:Optional PushConsumer *PushConsumerSpec `json:"push,omitempty"` }
+kubebuilder:object:generate=true ConsumerConfig will determine the properties for a JetStream consumer. For more information see https://docs.nats.io/jetstream/concepts/consumers
func (*ConsumerConfig) DeepCopy ¶
func (in *ConsumerConfig) DeepCopy() *ConsumerConfig
DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConsumerConfig.
func (*ConsumerConfig) DeepCopyInto ¶
func (in *ConsumerConfig) DeepCopyInto(out *ConsumerConfig)
DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (*ConsumerConfig) SetDefaults ¶
func (c *ConsumerConfig) SetDefaults(t ConsumerType)
type ConsumerObservationState ¶
type ConsumerObservationState struct { // Domain is the domain of the consumer. Domain string `json:"domain"` // Stream is the stream name. Stream string `json:"streamName"` // Name is the consumer name. Name string `json:"name"` // Durable is the durable name. Durable string `json:"durableName"` // Created is the time the consumer was created. // needs to be converted to time.Time Created string `json:"created"` // Delivered is the consumer sequence and last activity. Delivered SequenceInfo `json:"delivered"` // AckFloor TBD AckFloor SequenceInfo `json:"ackFloor"` // NumAckPending is the number of messages pending acknowledgement. NumAckPending int `json:"numAckPending"` // NumRedelivered is the number of redelivered messages. NumRedelivered int `json:"numRedelivered"` // NumWaiting is the number of messages waiting to be delivered. NumWaiting int `json:"numWaiting"` // NumPending is the number of messages pending. NumPending uint64 `json:"numPending"` // Cluster is the cluster information. Cluster *ClusterInfo `json:"cluster,omitempty"` // PushBound is whether the consumer is push bound. PushBound string `json:"pushBound,omitempty"` }
ConsumerInfo is the info from a JetStream consumer.
func (*ConsumerObservationState) DeepCopy ¶
func (in *ConsumerObservationState) DeepCopy() *ConsumerObservationState
DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConsumerObservationState.
func (*ConsumerObservationState) DeepCopyInto ¶
func (in *ConsumerObservationState) DeepCopyInto(out *ConsumerObservationState)
DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
type ConsumerType ¶
type ConsumerType int
const ( ConsumerTypePull ConsumerType = iota ConsumerTypePush ConsumerTypeNone )
type PeerInfo ¶
type PeerInfo struct { Name string `json:"name"` Current bool `json:"current"` Offline bool `json:"offline,omitempty"` Active string `json:"active"` Lag uint64 `json:"lag,omitempty"` }
PeerInfo shows information about all the peers in the cluster that are supporting the stream or consumer.
func (*PeerInfo) DeepCopy ¶
DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PeerInfo.
func (*PeerInfo) DeepCopyInto ¶
DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
type PullConsumerSpec ¶
type PullConsumerSpec struct { // MaxWaiting defines the maximum number of waiting pull requests. // This is a pull consumer specific setting. // +kubebuilder:validation:Optional // +kubebuilder:default=512 MaxWaiting *int `json:"maxWaiting,omitempty"` // MaxRequestExpires defines the maximum duration a single pull request will wait for messages to be available to pull. // This is a pull consumer specific setting. // +kubebuilder:validation:Pattern="([0-9]+h)?([0-9]+m)?([0-9]+s)?" // +kubebuilder:validation:Optional MaxRequestExpires string `json:"maxExpires,omitempty"` // MaxRequestBatch defines th maximum batch size a single pull request can make. // When set with MaxRequestMaxBytes, the batch size will be constrained by whichever limit is hit first. // This is a pull consumer specific setting. // +kubebuilder:validation:Optional MaxRequestBatch int `json:"maxBatch,omitempty"` // MaxRequestMaxBytes defines the maximum total bytes that can be requested in a given batch. // When set with MaxRequestBatch, the batch size will be constrained by whichever limit is hit first. // This is a pull consumer specific setting. // +kubebuilder:validation:Optional MaxRequestMaxBytes int `json:"maxBytes,omitempty"` }
PullConsumerSpec defines the pull-based consumer configuration. For more information, see https://docs.nats.io/nats-concepts/jetstream/consumers#pull-specific
func (*PullConsumerSpec) DeepCopy ¶
func (in *PullConsumerSpec) DeepCopy() *PullConsumerSpec
DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PullConsumerSpec.
func (*PullConsumerSpec) DeepCopyInto ¶
func (in *PullConsumerSpec) DeepCopyInto(out *PullConsumerSpec)
DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
type PushConsumerSpec ¶
type PushConsumerSpec struct { // RateLimit is used to throttle the delivery of messages to the consumer, in bits per second. // +kubebuilder:validation:Optional RateLimit uint64 `json:"rateLimitBps,omitempty"` // HeadersOnly delivers, if set, only the headers of messages in the stream and not the bodies. // Additionally adds Nats-Msg-Size header to indicate the size of the removed payload. // +kubebuilder:validation:Optional HeadersOnly bool `json:"headersOnly,omitempty"` // DeliverSubject defines the subject to deliver messages to. // Note, setting this field implicitly decides whether the consumer is push or pull-based. // With a deliver subject, the server will push messages to client subscribed to this subject. // This is a push consumer specific setting. // +kubebuilder:validation:Required DeliverSubject string `json:"deliverSubject,omitempty"` // DeliverGroup defines the queue group name which, if specified, is then used to distribute the messages between the subscribers to the consumer. // This is analogous to a queue group in core NATS. See https://docs.nats.io/nats-concepts/core-nats/queue for more information on queue groups. // This is a push consumer specific setting. // +kubebuilder:validation:Optional DeliverGroup string `json:"deliverGroup,omitempty"` // FlowControl enables per-subscription flow control using a sliding-window protocol. // This protocol relies on the server and client exchanging messages to regulate when and how many messages are pushed to the client. // This one-to-one flow control mechanism works in tandem with the one-to-many flow control imposed by MaxAckPending across all subscriptions bound to a consumer. // This is a push consumer specific setting. // +kubebuilder:validation:Optional FlowControl bool `json:"flowControl,omitempty"` // IdleHeartbeat defines, if set, that the server will regularly send a status message to the client (i.e. when the period has elapsed) while there are no new messages to send. // This lets the client know that the JetStream service is still up and running, even when there is no activity on the stream. // The message status header will have a code of 100. Unlike FlowControl, it will have no reply to address. // It may have a description such "Idle Heartbeat". // Note that this heartbeat mechanism is all handled transparently by supported clients and does not need to be handled by the application. // Format is a string duration, e.g. 1h, 1m, 1s, 1h30m or 2h3m4s. // This is a push consumer specific setting. // +kubebuilder:validation:Pattern="([0-9]+h)?([0-9]+m)?([0-9]+s)?" // +kubebuilder:validation:Optional IdleHeartbeat string `json:"idleHeartbeat,omitempty"` }
PushConsumerSpec defines the pull-based consumer configuration. For more information, see https://docs.nats.io/nats-concepts/jetstream/consumers#push-specific
func (*PushConsumerSpec) DeepCopy ¶
func (in *PushConsumerSpec) DeepCopy() *PushConsumerSpec
DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PushConsumerSpec.
func (*PushConsumerSpec) DeepCopyInto ¶
func (in *PushConsumerSpec) DeepCopyInto(out *PushConsumerSpec)
DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
type SequenceInfo ¶
type SequenceInfo struct { // Consumer is the consumer name Consumer uint64 `json:"consumerSeq"` // Stream is the name of the stream Stream uint64 `json:"streamSeq"` // Last is the last time the consumer was active // needs to be converted to time.Time Last string `json:"lastActive,omitempty"` }
SequenceInfo has both the consumer and the stream sequence and last activity.
func (*SequenceInfo) DeepCopy ¶
func (in *SequenceInfo) DeepCopy() *SequenceInfo
DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SequenceInfo.
func (*SequenceInfo) DeepCopyInto ¶
func (in *SequenceInfo) DeepCopyInto(out *SequenceInfo)
DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.