提问者:小点点

amqp-在连接中断的情况下使用自动恢复


我正在尝试在案例连接中断时使用自动恢复功能。我有2个问题:

1)自动恢复代码似乎完全忽略了networkRecoveryInterval。在我的日志文件中,在连接中断的1分钟内,文件增长到1.5 GB。以下错误不断重复。

2)最后,当我打开路由器时,网络恢复不起作用。我在日志文件中得到了这个。

这是我相当简单的消费者端代码。请注意,我将所有代码都写在一个新的线程中,因为我不希望我的构造函数阻塞。

private ConnectionFactory factory = null;
private Connection connection = null;
private Channel channel = null;

private PaymentInfoFromGlobalServerConsumer() {
    new Thread(new Runnable() {
        public void run() {
            factory = new ConnectionFactory();
            try {
                factory.setUri(amqpServerUrl);
                factory.setAutomaticRecoveryEnabled(true);
                factory.setNetworkRecoveryInterval(30000); // In case of broken connection, try again every 30 seconds (hope this is correct understanding)
                factory.setRequestedHeartbeat(45); //Keep sending the heartbeat every 45 seconds to prevent any routers from considering the connection stale.
            } catch (KeyManagementException | NoSuchAlgorithmException | URISyntaxException e) {
                //Will never happen if configured properly
                logger.error(e);
                return;
            }

            try {
                connection = factory.newConnection();
                channel = connection.createChannel();
                //Create a durable queue (if not already present)
                channel.queueDeclare(merchantId, true, false, false, null);

                QueueingConsumer consumer = new QueueingConsumer(channel);
                channel.basicConsume(merchantId, false, consumer);

                while (true) {
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String billId = new String(delivery.getBody());

                    //TODO - Redeliveries are possible as per design
                    System.out.println(" [x] Received '" + billId + "'");
                    System.out.println(" [x] Done" );

                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }               
            } catch (IOException | ConsumerCancelledException | InterruptedException e) {
                e.printStackTrace();
                logger.error(e);
            } catch (ShutdownSignalException e) {
                System.out.println(e.isInitiatedByApplication() + " " + e.isHardError());
            } finally {
                close();
            }
        }           
    }).start();
}

public void close() {
    try {
        if (channel != null) channel.close();
    } catch (IOException | AlreadyClosedException e) {
        //Cannot do anything now
    }
    try {
        if (connection != null) connection.close();
    } catch (IOException | AlreadyClosedException e) {
        //Cannot do anything now
    }
}

我是amqp的新手,所以任何帮助都很感激。谢谢


共1个答案

匿名用户

尝试恢复拓扑以及连接。

factory.setTopologyRecoveryEnabled(true);