Using ActiveMQ and bitronix-transaction-manager redelivery issue

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Using ActiveMQ and bitronix-transaction-manager redelivery issue

neoman
This post has NOT been accepted by the mailing list yet.
Hello everyone,
I'm trying to test bitronix-transaction-manager with activemq. I'm running into this problem in my test cases. It would be great if anyone can help.
When the JMS listener fails to process the message with an exception, the transaction gets rolled back, but the message is getting redelivered to the queue. It's not following the redelivery policy and the messages are getting lost.
This is the configuration.

@Bean(initMethod = "init", destroyMethod = "close")
    public ConnectionFactory jmsConnectionFactory() {
        PoolingConnectionFactory btmPoolingConnectionFactory = new PoolingConnectionFactory();
        btmPoolingConnectionFactory.setClassName("org.apache.activemq.ActiveMQXAConnectionFactory");
        btmPoolingConnectionFactory.setUniqueName(options.getClientId());
        btmPoolingConnectionFactory.setMinPoolSize(1);
        btmPoolingConnectionFactory.setMaxPoolSize(5);
        btmPoolingConnectionFactory.setAllowLocalTransactions(true);

        Properties driverProperties = btmPoolingConnectionFactory.getDriverProperties();
        driverProperties.setProperty("redeliveryPolicy.backOffMultiplier", "2");
        driverProperties.setProperty("redeliveryPolicy.useExponentialBackOff", "true");
        driverProperties.setProperty("redeliveryPolicy.maximumRedeliveries", "3");
        driverProperties.setProperty("redeliveryPolicy.initialRedeliveryDelay", "1000");

        driverProperties.setProperty("prefetchPolicy.queuePrefetch", "1");
        driverProperties.setProperty("prefetchPolicy.topicPrefetch", "1");
        driverProperties.setProperty("brokerURL", options.getConnectionUrl());
        driverProperties.setProperty("nonBlockingRedelivery", "true");

        btmPoolingConnectionFactory.setDriverProperties(driverProperties);
        return jmsConnectionFactory;
}

// container - consumes from request queue
        container = new DefaultMessageListenerContainer();
        container.setSessionTransacted(true);
        container.setConnectionFactory(jmsConnectionFactory);
        container.setTransactionManager(platformTransactionManager); //Bitrnoix transaction manager
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        container.setMessageListener(this);
        container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
        container.setDestination(new ActiveMQQueue(options.getTopicJndiName()));



Loading...