Class: MQConsumer

MQConsumer

MQ的消息消费者


new MQConsumer(client, instanceId, topic, consumer, messageTag)

消费者构造函数

Parameters:
Name Type Description
client MQClient

MQ客户端

instanceId string

实例ID

topic string

主题名字

consumer string

消费者名字(CID)a

messageTag string

消费消息的过滤标签,可空

Source:
Returns:
Type
MQConsumer

Methods


<async> ackMessage(receiptHandles)

确认消息消费成功,消费成功后需要调用该接口否则会重复消费消息

Parameters:
Name Type Description
receiptHandles array

消息句柄数组

Source:
Throws:

err 请求失败或者其它网络异常

{
 // MQ服务端返回的错误Code,如ReceiptHandleError,表示消息句柄非法,MessageNotExist表示超过了ack的时间,即NextConsumeTime
 Code: ""
 // 请求ID
 RequestId: ""
}
Type
exception
Returns:
{
 // 请求成功
 code:204,
 // 请求ID
 requestId:""
}
Type
object

<async> consumeMessage(numOfMessages, waitSeconds)

消费消息,默认如果该条消息没有被 {ackMessage} 确认消费成功,即在NextConsumeTime时会再次消费到该条消息

Parameters:
Name Type Description
numOfMessages int

每次从服务端消费条消息

waitSeconds int

长轮询的等待时间(可空),如果服务端没有消息请求会在该时间之后返回等于请求阻塞在服务端,如果期间有消息立刻返回

Source:
Throws:

err MQ服务端返回的错误或者其它网络异常

 {
   // MQ服务端返回的错误Code,其中MessageNotExist是正常现象,表示没有可消费的消息
   Code: "",
   // 请求ID
   RequestId: ""
 }
```json
Type
exception
Returns:
{
 code: 200,
 requestId: "",
 body: [
   {
     // 消息ID
     MessageId: "",
     // 消息体MD5
     MessageBodyMD5: "",
     // 发送消息的时间戳,毫秒
     PublishTime: {long},
     // 下次重试消费的时间,前提是这次不调用{ackMessage} 确认消费消费成功,毫秒
     NextConsumeTime: {long},
     // 第一次消费的时间,毫秒
     FirstConsumeTime: {long},
     // 消费的次数
     ConsumedTimes: {long},
     // 消息句柄,调用 {ackMessage} 需要将消息句柄传入,用于确认该条消息消费成功
     ReceiptHandle: "",
     // 消息内容
     MessageBody: "",
     // 消息标签
     MessageTag: ""
   }
 ]
}

Type
object

<async> consumeMessageOrderly(numOfMessages, waitSeconds)

顺序消费消息,拿到的消息可能是多个分区的(对于分区顺序)一个分区的内的消息一定是顺序的
对于顺序消费,如果一个分区内的消息只要有没有被确认消费 {ackMessage} 成功,则对于这个分区在NextConsumeTime后还会消费到相同的消息
对于一个分区,只有所有消息确认消费成功才能消费下一批消息

Parameters:
Name Type Description
numOfMessages int

每次从服务端消费条消息

waitSeconds int

长轮询的等待时间(可空),如果服务端没有消息请求会在该时间之后返回等于请求阻塞在服务端,如果期间有消息立刻返回

Source:
Throws:

err MQ服务端返回的错误或者其它网络异常

 {
   // MQ服务端返回的错误Code,其中MessageNotExist是正常现象,表示没有可消费的消息
   Code: "",
   // 请求ID
   RequestId: ""
 }
```json
Type
exception
Returns:
{
 code: 200,
 requestId: "",
 body: [
   {
     // 消息ID
     MessageId: "",
     // 消息体MD5
     MessageBodyMD5: "",
     // 发送消息的时间戳,毫秒
     PublishTime: {long},
     // 下次重试消费的时间,前提是这次不调用{ackMessage} 确认消费消费成功,毫秒
     NextConsumeTime: {long},
     // 第一次消费的时间,毫秒,顺序消费无意义
     FirstConsumeTime: {long},
     // 消费的次数
     ConsumedTimes: {long},
     // 消息句柄,调用 {ackMessage} 需要将消息句柄传入,用于确认该条消息消费成功
     ReceiptHandle: "",
     // 消息内容
     MessageBody: "",
     // 消息标签
     MessageTag: ""
   }
 ]
}

Type
object