Documentation ¶
Overview ¶
* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors
* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * * NOTE: The kafka client is used to publish events on Kafka in voltha * release 2.9. It is no longer used for inter voltha container * communication.
- Copyright 2020-2024 Open Networking Foundation (ONF) and the ONF Contributors *
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at *
- http://www.apache.org/licenses/LICENSE-2.0 *
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors
* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
* Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors
* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
Index ¶
- Constants
- func GetDeviceIdFromTopic(topic Topic) string
- func MonitorKafkaReadiness(ctx context.Context, kClient Client, ...)
- func StartAndWaitUntilKafkaConnectionIsUp(ctx context.Context, kClient Client, connectionRetryInterval time.Duration, ...) error
- type Client
- type KVArg
- type RpcMType
- type RpcResponse
- type SaramaClient
- func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error
- func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error
- func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool
- func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool
- func (sc *SaramaClient) ListTopics(ctx context.Context) ([]string, error)
- func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error
- func (sc *SaramaClient) SendLiveness(ctx context.Context) error
- func (sc *SaramaClient) Start(ctx context.Context) error
- func (sc *SaramaClient) Stop(ctx context.Context)
- func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error)
- func (sc *SaramaClient) SubscribeForMetadata(ctx context.Context, callback func(fromTopic string, timestamp time.Time))
- func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error
- type SaramaClientOption
- func Address(address string) SaramaClientOption
- func AutoCreateTopic(opt bool) SaramaClientOption
- func ConsumerGroupName(name string) SaramaClientOption
- func ConsumerGroupPrefix(prefix string) SaramaClientOption
- func ConsumerMaxWait(wait int) SaramaClientOption
- func ConsumerType(consumer int) SaramaClientOption
- func LivenessChannelInterval(opt time.Duration) SaramaClientOption
- func MaxProcessingTime(pTime int) SaramaClientOption
- func MetadatMaxRetries(retry int) SaramaClientOption
- func NumPartitions(number int) SaramaClientOption
- func NumReplicas(number int) SaramaClientOption
- func ProducerFlushFrequency(frequency int) SaramaClientOption
- func ProducerFlushMaxMessages(num int) SaramaClientOption
- func ProducerFlushMessages(num int) SaramaClientOption
- func ProducerMaxRetries(num int) SaramaClientOption
- func ProducerRetryBackoff(duration time.Duration) SaramaClientOption
- func ProducerReturnOnErrors(opt bool) SaramaClientOption
- func ProducerReturnOnSuccess(opt bool) SaramaClientOption
- type Topic
Constants ¶
const ( PartitionConsumer = iota GroupCustomer = iota )
const ( OffsetNewest = -1 OffsetOldest = -2 )
const ( GroupIdKey = "groupId" Offset = "offset" )
const ( DefaultKafkaAddress = "127.0.0.1:9092" DefaultGroupName = "voltha" DefaultSleepOnError = 1 DefaultProducerFlushFrequency = 10 DefaultProducerFlushMessages = 10 DefaultProducerFlushMaxmessages = 100 DefaultProducerReturnSuccess = true DefaultProducerReturnErrors = true DefaultProducerRetryMax = 3 DefaultProducerRetryBackoff = time.Millisecond * 100 DefaultConsumerMaxwait = 100 DefaultMaxProcessingTime = 100 DefaultConsumerType = PartitionConsumer DefaultNumberPartitions = 3 DefaultNumberReplicas = 1 DefaultAutoCreateTopic = false DefaultMetadataMaxRetry = 3 DefaultMaxRetries = 3 DefaultLivenessChannelInterval = time.Second * 30 )
const ( TopicSeparator = "_" DeviceIdLength = 24 )
Variables ¶
This section is empty.
Functions ¶
func GetDeviceIdFromTopic ¶
TODO: Remove and provide better may to get the device id GetDeviceIdFromTopic extract the deviceId from the topic name. The topic name is formatted either as:
<any string> or <any string>_<deviceId>. The device Id is 24 characters long.
func MonitorKafkaReadiness ¶
func MonitorKafkaReadiness(ctx context.Context, kClient Client, liveProbeInterval, notLiveProbeInterval time.Duration, serviceName string)
* MonitorKafkaReadiness checks the liveliness and readiness of the kafka service and update the status in the probe.
func StartAndWaitUntilKafkaConnectionIsUp ¶
func StartAndWaitUntilKafkaConnectionIsUp(ctx context.Context, kClient Client, connectionRetryInterval time.Duration, serviceName string) error
WaitUntilKafkaConnectionIsUp waits until the kafka client can establish a connection to the kafka broker or until the context times out.
Types ¶
type Client ¶
type Client interface { Start(ctx context.Context) error Stop(ctx context.Context) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error DeleteTopic(ctx context.Context, topic *Topic) error Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error SubscribeForMetadata(context.Context, func(fromTopic string, timestamp time.Time)) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error SendLiveness(ctx context.Context) error EnableLivenessChannel(ctx context.Context, enable bool) chan bool EnableHealthinessChannel(ctx context.Context, enable bool) chan bool ListTopics(ctx context.Context) ([]string, error) }
MsgClient represents the set of APIs a Kafka MsgClient must implement
type RpcResponse ¶
func NewResponse ¶
func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse
type SaramaClient ¶
type SaramaClient struct { KafkaAddress string // contains filtered or unexported fields }
SaramaClient represents the messaging proxy
func NewSaramaClient ¶
func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient
func (*SaramaClient) CreateTopic ¶
func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error
CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to ensure no two go routines are performing operations on the same topic
func (*SaramaClient) DeleteTopic ¶
func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error
DeleteTopic removes a topic from the kafka Broker
func (*SaramaClient) EnableHealthinessChannel ¶
func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool
Enable the Healthiness monitor channel. This channel will report "false" if the kafka consumers die, or some other problem occurs which is catastrophic that would require re-creating the client.
func (*SaramaClient) EnableLivenessChannel ¶
func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool
Enable the liveness monitor channel. This channel will report a "true" or "false" on every publish, which indicates whether or not the channel is still live. This channel is then picked up by the service (i.e. rw_core / ro_core) to update readiness status and/or take other actions.
func (*SaramaClient) ListTopics ¶
func (sc *SaramaClient) ListTopics(ctx context.Context) ([]string, error)
func (*SaramaClient) Send ¶
func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error
send formats and sends the request onto the kafka messaging bus.
func (*SaramaClient) SendLiveness ¶
func (sc *SaramaClient) SendLiveness(ctx context.Context) error
send an empty message on the liveness channel to check whether connectivity has been restored.
func (*SaramaClient) Stop ¶
func (sc *SaramaClient) Stop(ctx context.Context)
func (*SaramaClient) Subscribe ¶
func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error)
Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive messages from that topic
func (*SaramaClient) SubscribeForMetadata ¶
func (*SaramaClient) UnSubscribe ¶
func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error
UnSubscribe unsubscribe a consumer from a given topic
type SaramaClientOption ¶
type SaramaClientOption func(*SaramaClient)
func Address ¶
func Address(address string) SaramaClientOption
func AutoCreateTopic ¶
func AutoCreateTopic(opt bool) SaramaClientOption
func ConsumerGroupName ¶
func ConsumerGroupName(name string) SaramaClientOption
func ConsumerGroupPrefix ¶
func ConsumerGroupPrefix(prefix string) SaramaClientOption
func ConsumerMaxWait ¶
func ConsumerMaxWait(wait int) SaramaClientOption
func ConsumerType ¶
func ConsumerType(consumer int) SaramaClientOption
func LivenessChannelInterval ¶
func LivenessChannelInterval(opt time.Duration) SaramaClientOption
func MaxProcessingTime ¶
func MaxProcessingTime(pTime int) SaramaClientOption
func MetadatMaxRetries ¶
func MetadatMaxRetries(retry int) SaramaClientOption
func NumPartitions ¶
func NumPartitions(number int) SaramaClientOption
func NumReplicas ¶
func NumReplicas(number int) SaramaClientOption
func ProducerFlushFrequency ¶
func ProducerFlushFrequency(frequency int) SaramaClientOption
func ProducerFlushMaxMessages ¶
func ProducerFlushMaxMessages(num int) SaramaClientOption
func ProducerFlushMessages ¶
func ProducerFlushMessages(num int) SaramaClientOption
func ProducerMaxRetries ¶
func ProducerMaxRetries(num int) SaramaClientOption
func ProducerRetryBackoff ¶
func ProducerRetryBackoff(duration time.Duration) SaramaClientOption
func ProducerReturnOnErrors ¶
func ProducerReturnOnErrors(opt bool) SaramaClientOption
func ProducerReturnOnSuccess ¶
func ProducerReturnOnSuccess(opt bool) SaramaClientOption
type Topic ¶
type Topic struct { // The name of the topic. It must start with a letter, // and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`), // underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent // signs (`%`). Name string }
A Topic definition - may be augmented with additional attributes eventually