rocketmq

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2019 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.

Index

Constants

View Source
const (
	CommonProducer  = ProducerModel(1)
	OrderlyProducer = ProducerModel(2)
)
View Source
const (
	BroadCasting = MessageModel(1)
	Clustering   = MessageModel(2)
)
View Source
const (
	CoCurrently = ConsumerModel(1)
	Orderly     = ConsumerModel(2)
)
View Source
const (
	NIL                        = rmqError(C.OK)
	ErrNullPoint               = rmqError(C.NULL_POINTER)
	ErrMallocFailed            = rmqError(C.MALLOC_FAILED)
	ErrProducerStartFailed     = rmqError(C.PRODUCER_START_FAILED)
	ErrSendSyncFailed          = rmqError(C.PRODUCER_SEND_SYNC_FAILED)
	ErrSendOnewayFailed        = rmqError(C.PRODUCER_SEND_ONEWAY_FAILED)
	ErrSendOrderlyFailed       = rmqError(C.PRODUCER_SEND_ORDERLY_FAILED)
	ErrPushConsumerStartFailed = rmqError(C.PUSHCONSUMER_ERROR_CODE_START)
	ErrPullConsumerStartFailed = rmqError(C.PULLCONSUMER_START_FAILED)
	ErrFetchMQFailed           = rmqError(C.PULLCONSUMER_FETCH_MQ_FAILED)
	ErrFetchMessageFailed      = rmqError(C.PULLCONSUMER_FETCH_MESSAGE_FAILED)
)
View Source
const (
	LogLevelFatal = LogLevel(C.E_LOG_LEVEL_FATAL)
	LogLevelError = LogLevel(C.E_LOG_LEVEL_ERROR)
	LogLevelWarn  = LogLevel(C.E_LOG_LEVEL_WARN)
	LogLevelInfo  = LogLevel(C.E_LOG_LEVEL_INFO)
	LogLevelDebug = LogLevel(C.E_LOG_LEVEL_DEBUG)
	LogLevelTrace = LogLevel(C.E_LOG_LEVEL_TRACE)
	LogLevelNum   = LogLevel(C.E_LOG_LEVEL_LEVEL_NUM)
)

predefined log level

View Source
const (
	SendOK                = SendStatus(C.E_SEND_OK)
	SendFlushDiskTimeout  = SendStatus(C.E_SEND_FLUSH_DISK_TIMEOUT)
	SendFlushSlaveTimeout = SendStatus(C.E_SEND_FLUSH_SLAVE_TIMEOUT)
	SendSlaveNotAvailable = SendStatus(C.E_SEND_SLAVE_NOT_AVAILABLE)
)
View Source
const (
	PullFound         = PullStatus(C.E_FOUND)
	PullNoNewMsg      = PullStatus(C.E_NO_NEW_MSG)
	PullNoMatchedMsg  = PullStatus(C.E_NO_MATCHED_MSG)
	PullOffsetIllegal = PullStatus(C.E_OFFSET_ILLEGAL)
	PullBrokerTimeout = PullStatus(C.E_BROKER_TIMEOUT)
)

predefined pull status

View Source
const (
	ConsumeSuccess = ConsumeStatus(C.E_CONSUME_SUCCESS)
	ReConsumeLater = ConsumeStatus(C.E_RECONSUME_LATER)
)
View Source
const GoClientVersion = "Go Client V1.2.0, Support CPP Core:V1.2.X"

Variables

This section is empty.

Functions

func GetVersion

func GetVersion() (version string)

func Version

func Version() (version string)

Types

type ClientConfig

type ClientConfig struct {
	GroupID          string
	NameServer       string
	NameServerDomain string
	GroupName        string
	InstanceName     string
	Credentials      *SessionCredentials
	LogC             *LogConfig
}

func (*ClientConfig) String

func (config *ClientConfig) String() string

type ConsumeStatus

type ConsumeStatus int

func (ConsumeStatus) String

func (status ConsumeStatus) String() string

type ConsumerModel

type ConsumerModel int

func (ConsumerModel) String

func (mode ConsumerModel) String() string

type LogConfig

type LogConfig struct {
	Path     string
	FileNum  int
	FileSize int64
	Level    LogLevel
}

LogConfig the log configuration for the pull consumer

func (*LogConfig) String

func (lc *LogConfig) String() string

type LogLevel

type LogLevel int

LogLevel the log level

func (LogLevel) String

func (l LogLevel) String() string

type Message

type Message struct {
	Topic          string
	Tags           string
	Keys           string
	Body           string
	DelayTimeLevel int
	Property       map[string]string
}

func (*Message) String

func (msg *Message) String() string

type MessageExt

type MessageExt struct {
	Message
	MessageID                 string
	QueueId                   int
	ReconsumeTimes            int
	StoreSize                 int
	BornTimestamp             int64
	StoreTimestamp            int64
	QueueOffset               int64
	CommitLogOffset           int64
	PreparedTransactionOffset int64
	// contains filtered or unexported fields
}

func (*MessageExt) GetProperty

func (msgExt *MessageExt) GetProperty(key string) string

func (*MessageExt) String

func (msgExt *MessageExt) String() string

type MessageModel

type MessageModel int

func (MessageModel) String

func (mode MessageModel) String() string

type MessageQueue

type MessageQueue struct {
	Topic  string
	Broker string
	ID     int
}

MessageQueue the queue of the message

func (*MessageQueue) String

func (q *MessageQueue) String() string

type MessageQueueSelector

type MessageQueueSelector interface {
	Select(size int, m *Message, arg interface{}) int
}

MessageQueueSelector select one message queue

type Producer

type Producer interface {

	// SendMessageSync send a message with sync
	SendMessageSync(msg *Message) (*SendResult, error)

	// SendMessageOrderly send the message orderly
	SendMessageOrderly(
		msg *Message,
		selector MessageQueueSelector,
		arg interface{},
		autoRetryTimes int) (*SendResult, error)

	// SendMessageOneway send a message with oneway
	SendMessageOneway(msg *Message) error

	SendMessageOrderlyByShardingKey(msg *Message, shardingkey string) (*SendResult, error)
	// contains filtered or unexported methods
}

func NewProducer

func NewProducer(config *ProducerConfig) (Producer, error)

NewProducer create a new producer with config

type ProducerConfig

type ProducerConfig struct {
	ClientConfig
	SendMsgTimeout int
	CompressLevel  int
	MaxMessageSize int
	ProducerModel  ProducerModel
}

ProducerConfig define a producer

func (*ProducerConfig) String

func (config *ProducerConfig) String() string

type ProducerModel

type ProducerModel int

func (ProducerModel) String

func (mode ProducerModel) String() string

type PullConsumer

type PullConsumer interface {

	// Pull returns the messages from the consume queue by specify the offset and the max number
	Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult

	// FetchSubscriptionMessageQueues returns the consume queue of the topic
	FetchSubscriptionMessageQueues(topic string) []MessageQueue
	// contains filtered or unexported methods
}

PullConsumer consumer pulling the message

func NewPullConsumer

func NewPullConsumer(config *PullConsumerConfig) (PullConsumer, error)

NewPullConsumer creates a pull consumer

type PullConsumerConfig

type PullConsumerConfig struct {
	ClientConfig
}

PullConsumerConfig the configuration for the pull consumer

func (*PullConsumerConfig) String

func (config *PullConsumerConfig) String() string

type PullResult

type PullResult struct {
	NextBeginOffset int64
	MinOffset       int64
	MaxOffset       int64
	Status          PullStatus
	Messages        []*MessageExt
}

PullResult the pull result

func (*PullResult) String

func (pr *PullResult) String() string

type PullStatus

type PullStatus int

PullStatus pull status

func (PullStatus) String

func (ps PullStatus) String() string

type PushConsumer

type PushConsumer interface {

	// Subscribe a new topic with specify filter expression and consume function.
	Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error
	// contains filtered or unexported methods
}

func NewPushConsumer

func NewPushConsumer(config *PushConsumerConfig) (PushConsumer, error)

NewPushConsumer create a new consumer with config.

type PushConsumerConfig

type PushConsumerConfig struct {
	ClientConfig
	ThreadCount         int
	MessageBatchMaxSize int
	Model               MessageModel
	ConsumerModel       ConsumerModel
}

PushConsumerConfig define a new consumer.

func (*PushConsumerConfig) String

func (config *PushConsumerConfig) String() string

type SendResult

type SendResult struct {
	Status SendStatus
	MsgId  string
	Offset int64
}

func (*SendResult) String

func (result *SendResult) String() string

type SendStatus

type SendStatus int

func (SendStatus) String

func (status SendStatus) String() string

type SessionCredentials

type SessionCredentials struct {
	AccessKey string
	SecretKey string
	Channel   string
}

func (*SessionCredentials) String

func (session *SessionCredentials) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL