我正在尝试在案例连接中断时使用自动恢复功能。我有2个问题:
1)自动恢复代码似乎完全忽略了networkRecoveryInterval。在我的日志文件中,在连接中断的1分钟内,文件增长到1.5 GB。以下错误不断重复。
2)最后,当我打开路由器时,网络恢复不起作用。我在日志文件中得到了这个。
这是我相当简单的消费者端代码。请注意,我将所有代码都写在一个新的线程中,因为我不希望我的构造函数阻塞。
private ConnectionFactory factory = null;
private Connection connection = null;
private Channel channel = null;
private PaymentInfoFromGlobalServerConsumer() {
new Thread(new Runnable() {
public void run() {
factory = new ConnectionFactory();
try {
factory.setUri(amqpServerUrl);
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(30000); // In case of broken connection, try again every 30 seconds (hope this is correct understanding)
factory.setRequestedHeartbeat(45); //Keep sending the heartbeat every 45 seconds to prevent any routers from considering the connection stale.
} catch (KeyManagementException | NoSuchAlgorithmException | URISyntaxException e) {
//Will never happen if configured properly
logger.error(e);
return;
}
try {
connection = factory.newConnection();
channel = connection.createChannel();
//Create a durable queue (if not already present)
channel.queueDeclare(merchantId, true, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(merchantId, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String billId = new String(delivery.getBody());
//TODO - Redeliveries are possible as per design
System.out.println(" [x] Received '" + billId + "'");
System.out.println(" [x] Done" );
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
} catch (IOException | ConsumerCancelledException | InterruptedException e) {
e.printStackTrace();
logger.error(e);
} catch (ShutdownSignalException e) {
System.out.println(e.isInitiatedByApplication() + " " + e.isHardError());
} finally {
close();
}
}
}).start();
}
public void close() {
try {
if (channel != null) channel.close();
} catch (IOException | AlreadyClosedException e) {
//Cannot do anything now
}
try {
if (connection != null) connection.close();
} catch (IOException | AlreadyClosedException e) {
//Cannot do anything now
}
}
我是amqp的新手,所以任何帮助都很感激。谢谢
尝试恢复拓扑以及连接。
factory.setTopologyRecoveryEnabled(true);