我有一个环境,其中一些AMQP 1.0和一些AMQP 0.9.1客户端需要写入/读取RabbitMQ队列。我启用了AMQP 1.0兔子插件并且它正在工作,但是我在每个AMQP 1.0消息的正文中获得了额外的字节。
我通过AMQP1.0使用ria
(typescript)发送消息:
const connection: Connection = new Connection (
{
host: 'localhost',
port: 5672,
id: 'my_id',
reconnect: true
}
);
const senderName = "sender01";
const senderOptions: SenderOptions = {
name: senderName,
target: {
address: "target.queue"
},
onError: (context: EventContext) => {},
onSessionError: (context: EventContext) => {}
};
await connection.open();
const sender: Sender = await connection.createSender(senderOptions);
sender.send({
body: JSON.stringify({"one": "two", "three": "four"}),
content_encoding: 'UTF-8',
content_type: 'application/json'
});
console.log("sent");
await sender.close();
await connection.close();
console.log("connection closed");
此示例有效,但这是存储在队列中的内容:
base64编码消息为AFN3oRx7Im9uZSI6InR3byIsInRocmVlIjoiZm91ciJ9
,解码后变为:
Sw{"one":"two","three":"four"}
有一个额外的Sw
我没有发送。
我尝试使用官方RabbitMQ库(与AMQP 0.9.1对话)设置一个java客户端,看看这些额外的字节是否被发送到客户端:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicConsume(
"target.queue",
true,
(consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
},
ignored -> {}
);
这是输出:
[x] Received ' Sw�{"one":"two","three":"four"}'
奇怪的是,如果我尝试使用AMQP 1.0客户端使用完全相同的消息,这些额外的字节不会出现在接收到的消息正文中,额外的字节只会在使用AMQP 1.0发布和订阅AMQP 0.9.1时出现。
为什么?在使用两个AMQP版本时,有什么方法可以避免额外的字节?
更新
我还尝试了SwiftMQ:
int nMsgs = 100;
int qos = QoS.AT_MOST_ONCE;
AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
String host = "localhost";
int port = 5672;
String queue = "target.queue";
try {
Connection connection = new Connection(ctx, host, port, false);
connection.setContainerId(UUID.randomUUID().toString());
connection.setIdleTimeout(-1);
connection.setMaxFrameSize(1024 * 4);
connection.setExceptionListener(Exception::printStackTrace);
connection.connect();
{
Session session = connection.createSession(10, 10);
Producer p = session.createProducer(queue, qos);
for (int i = 0; i < nMsgs; i++) {
AMQPMessage msg = new AMQPMessage();
System.out.println("Sending " + i);
msg.setAmqpValue(new AmqpValue(new AMQPString("{\"one\":\"two\",\"three\":\"four\"}")));
p.send(msg);
}
p.close();
session.close();
}
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
问题仍然存在,但第一个字节发生了变化,现在我得到:
[x] Received '□�□□□□□□□w�{"one":"two","three":"four"}'
请参阅此响应,它阐明了如何对有效负载进行编码并避免这些额外的字节:
如果AMQP 1.0客户端向0-9-1客户端发送消息并在“数据部分”(即不在amqp序列部分,不在amqp值部分)中将她的有效负载编码为二进制,则0-9-1客户端应获得完整的有效负载而无需任何额外字节
注意:RabbitMQ团队监控Rabbitmq-user
邮件列表,有时只回答StackOverflow上的问题。