transaction problem with Tomcat + Spring + OpenMQ

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

transaction problem with Tomcat + Spring + OpenMQ

stevenmaring
I am using Sun Message Queue (OpenMQ) for its shared/common message store capabilities in a cluster and need to use Tomcat.  So, all roads seemed to lead me to BTM for JMS connection pooling and transaction support.

First, I will lay out my rather lengthy configuration and error with comments to follow that ...

bitronix-default-config.properties:

bitronix.tm.serverId = spring-btm
bitronix.tm.2pc.async = false
bitronix.tm.2pc.warnAboutZeroResourceTransactions = true
bitronix.tm.disableJmx = false
bitronix.tm.jndi.userTransactionName = java:comp/UserTransaction
bitronix.tm.journal = disk
bitronix.tm.journal.disk.logPart1Filename = btm1.tlog
bitronix.tm.journal.disk.logPart2Filename = btm2.tlog
bitronix.tm.journal.disk.forcedWriteEnabled = true
bitronix.tm.journal.disk.forceBatchingEnabled = true
bitronix.tm.journal.disk.maxLogSize = 2
bitronix.tm.journal.disk.filterLogStatus = false
bitronix.tm.journal.disk.skipCorruptedLogs = false
bitronix.tm.timer.defaultTransactionTimeout = 60
bitronix.tm.timer.gracefulShutdownInterval = 60
bitronix.tm.timer.backgroundRecoveryInterval = 0


applicationContext.xml:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jaxws="http://cxf.apache.org/jaxws"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
              http://www.springframework.org/schema/beans 
              http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
              http://cxf.apache.org/jaxws
              http://cxf.apache.org/schemas/jaxws.xsd
              http://www.springframework.org/schema/aop
              http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
              http://www.springframework.org/schema/tx
              http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">

        <import resource="classpath:META-INF/cxf/cxf.xml" />
        <import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" />
        <import resource="classpath:META-INF/cxf/cxf-servlet.xml" /> 

        <jaxws:endpoint
                id="voiceRequest"
                implementor="com.ess.tts.voicerequest.VoiceRequestImpl"
                address="/voiceRequest"
        />
       
  <bean id="imqConnectionFactory"
  class="bitronix.tm.resource.jms.PoolingConnectionFactory"
  init-method="init"
  destroy-method="close">
                <property name="className" value="com.ess.tts.util.IMQConnectionFactory" />
                <property name="uniqueName" value="imqbroker" />
                <property name="minPoolSize" value="1" />
                <property name="maxPoolSize" value="5" />
                <property name="driverProperties">
                        <props>
                                <prop key="imqBrokerHostName">localhost</prop>
                                <prop key="imqBrokerHostPort">7676</prop>
                                <prop key="imqDefaultUsername">admin</prop>
                                <prop key="imqDefaultPassword">admin</prop>
                        </props>
                </property>
        </bean>
 
  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory" ref="imqConnectionFactory"/>
  </bean>
 
  <bean id="voiceRequestImpl" class="com.ess.tts.voicerequest.VoiceRequestImpl">
  <property name="jmsTemplate" ref="jmsTemplate"/>
  <property name="requestQueue" ref="imqRequestQueue"/>
  </bean>

  <bean id="voiceRequestConsumer" class="com.ess.tts.voicerequest.queue.VoiceRequestConsumer">
  <property name="jmsTemplate" ref="jmsTemplate"/>
  <property name="responseQueue" ref="imqResponseQueue"/>
  </bean>

  <bean id="btmConfig"
  factory-method="getConfiguration"
  class="bitronix.tm.TransactionManagerServices">
                <property name="serverId" value="spring-btm" />
        </bean>

        <bean id="bitronixTransactionManager"
                factory-method="getTransactionManager"
                class="bitronix.tm.TransactionManagerServices"
                depends-on="btmConfig"
                destroy-method="shutdown" />
       
  <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
                <property name="transactionManager" ref="bitronixTransactionManager" />
                <property name="userTransaction" ref="bitronixTransactionManager" />
        </bean>
       
        <aop:aspectj-autoproxy/>
       
        <tx:advice id="txAdvice" transaction-manager="transactionManager"> 
                <tx:attributes> 
                        <tx:method name="*" propagation="REQUIRED" /> 
                </tx:attributes> 
        </tx:advice> 

        <aop:config> 
                <aop:pointcut
                        id="voiceRequestSender"
                        expression="execution(String com.ess.tts.voicerequest.VoiceRequestImpl.submitVoiceRequest(..))" />
                <aop:pointcut
                        id="messageReceptionOperations"
                        expression="execution(void com.ess.tts..*.onMessage(..)) and target(javax.jms.MessageListener)" />
                <aop:advisor advice-ref="txAdvice" pointcut-ref="voiceRequestSender" /> 
                <aop:advisor advice-ref="txAdvice" pointcut-ref="messageReceptionOperations" />
        </aop:config> 

  <bean id="imqRequestQueue" class="com.ess.tts.util.IMQQueue">
  <property name="destinationName" value="voiceRequestQueue"/>
  </bean>
  <bean id="imqResponseQueue" class="com.ess.tts.util.IMQQueue">
  <property name="destinationName" value="voiceResponseQueue"/>
  </bean>

        <bean id="voiceResponseConsumer" class="com.ess.tts.voiceresponse.queue.VoiceResponseConsumer" />
       
        <bean id="jmsRequestContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="imqConnectionFactory"/>
    <property name="destination" ref="imqRequestQueue"/>
    <property name="messageListener" ref="voiceRequestConsumer"/>
    <property name="concurrentConsumers" value="5" />
    <property name="transactionManager" ref="transactionManager"/>
        </bean>
        <bean id="jmsResponseContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="imqConnectionFactory"/>
    <property name="destination" ref="imqResponseQueue"/>
    <property name="messageListener" ref="voiceResponseConsumer"/>
    <property name="concurrentConsumers" value="5" />
    <property name="transactionManager" ref="transactionManager"/>
        </bean>

</beans>


IMQConnectionFactory.java:

public class IMQConnectionFactory extends com.sun.messaging.XAQueueConnectionFactory {

        public void setImqBrokerHostName(String hostname) throws JMSException {
                this.setProperty(ConnectionConfiguration.imqBrokerHostName,hostname);
        }
        public void setImqBrokerHostPort(String port) throws JMSException {
                this.setProperty(ConnectionConfiguration.imqBrokerHostPort,port);
        }
        public void setImqDefaultUsername(String username) throws JMSException {
                this.setProperty(ConnectionConfiguration.imqDefaultUsername,username);
        }
        public void setImqDefaultPassword(String password) throws JMSException {
                this.setProperty(ConnectionConfiguration.imqDefaultPassword,password);
        }
}


VoiceRequestImpl.java:

@WebService(endpointInterface="com.ess.tts.voicerequest.VoiceRequest",
                serviceName="VoiceRequestService",
                portName="VoiceRequestPort")
public class VoiceRequestImpl implements VoiceRequest {

    private static Logger log = LoggerFactory.getLogger(VoiceRequestImpl.class);
    private static JmsTemplate jmsTemplate;
    private static Queue requestQueue;

        public String submitVoiceRequest(final VoicingRequestData voicingData) {
                if (log.isDebugEnabled()) {
                        log.debug("request to voice received with id " + voicingData.getId());
                }
                try {
                jmsTemplate.send(requestQueue, new MessageCreator() {
                    public Message createMessage(Session session) throws JMSException {
                      return session.createObjectMessage(voicingData);
                    }
                });
                } catch (RuntimeException e) {
                        log.error("processing of request failed",e);
                        throw e;
                }
                if (log.isDebugEnabled()) {
                        log.debug("request " + voicingData.getId() + " queued for voicing");
                }
                return "voicing request received";
        }

        public void setJmsTemplate(JmsTemplate jmsTemplate) {
                VoiceRequestImpl.jmsTemplate = jmsTemplate;
        }
        public void setRequestQueue(Queue requestQueue) {
                VoiceRequestImpl.requestQueue = requestQueue;
        }
}


Caused by: javax.jms.JMSException: error enlisting a MessageProducerWrapper of a DualSessionWrapper in state ACCESSIBLE of a JmsPooledConnection of pool imqbroker in state ACCESSIBLE with underlying connection BrokerAddress=localhost:7676(1060), ConnectionID=905260681592230144, ReconnectEnabled: false, IsConnectedToHABroker: false with 1 opened session(s)
        at bitronix.tm.resource.jms.MessageProducerWrapper.enlistResource(MessageProducerWrapper.java:44)
        at bitronix.tm.resource.jms.MessageProducerWrapper.send(MessageProducerWrapper.java:58)
        at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:597)
        at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:574)
        at org.springframework.jms.core.JmsTemplate$3.doInJms(JmsTemplate.java:541)
        at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:471)
        ... 37 more
Caused by: bitronix.tm.internal.BitronixSystemException: resource 'imqbroker' cannot be used outside XA transaction scope. Set allowLocalTransactions to true if you want to allow this and you know your resource supports this.
        at bitronix.tm.resource.common.TransactionContextHelper.enlistInCurrentTransaction(TransactionContextHelper.java:58)
        at bitronix.tm.resource.jms.MessageProducerWrapper.enlistResource(MessageProducerWrapper.java:42)
        ... 42 more



my VoiceRequestConsumer onMessage method is also the method that sends a message to the next Queue

I have read that if the objects being instrumented for transactions are not created by Spring that the pointcuts won't work, but the MessageListener(s) are created by the Spring JMS container and my first sender is a jax-ws web service also instantiated by Spring

So ... where am I going wrong here?

Thanks,
Steve Maring
Reply | Threaded
Open this post in threaded view
|

Re: transaction problem with Tomcat + Spring + OpenMQ

stevenmaring
If I add ...

<property name="allowLocalTransactions" value="true" />

to the Spring config, my messages make it into the queue without errors, but what is the downside to this?


stevenmaring wrote
I am using Sun Message Queue (OpenMQ) for its shared/common message store capabilities in a cluster and need to use Tomcat.  So, all roads seemed to lead me to BTM for JMS connection pooling and transaction support.

First, I will lay out my rather lengthy configuration and error with comments to follow that ...

bitronix-default-config.properties:

bitronix.tm.serverId = spring-btm
bitronix.tm.2pc.async = false
bitronix.tm.2pc.warnAboutZeroResourceTransactions = true
bitronix.tm.disableJmx = false
bitronix.tm.jndi.userTransactionName = java:comp/UserTransaction
bitronix.tm.journal = disk
bitronix.tm.journal.disk.logPart1Filename = btm1.tlog
bitronix.tm.journal.disk.logPart2Filename = btm2.tlog
bitronix.tm.journal.disk.forcedWriteEnabled = true
bitronix.tm.journal.disk.forceBatchingEnabled = true
bitronix.tm.journal.disk.maxLogSize = 2
bitronix.tm.journal.disk.filterLogStatus = false
bitronix.tm.journal.disk.skipCorruptedLogs = false
bitronix.tm.timer.defaultTransactionTimeout = 60
bitronix.tm.timer.gracefulShutdownInterval = 60
bitronix.tm.timer.backgroundRecoveryInterval = 0


applicationContext.xml:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jaxws="http://cxf.apache.org/jaxws"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
              http://www.springframework.org/schema/beans 
              http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
              http://cxf.apache.org/jaxws
              http://cxf.apache.org/schemas/jaxws.xsd
              http://www.springframework.org/schema/aop
              http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
              http://www.springframework.org/schema/tx
              http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">

        <import resource="classpath:META-INF/cxf/cxf.xml" />
        <import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" />
        <import resource="classpath:META-INF/cxf/cxf-servlet.xml" /> 

        <jaxws:endpoint
                id="voiceRequest"
                implementor="com.ess.tts.voicerequest.VoiceRequestImpl"
                address="/voiceRequest"
        />
       
  <bean id="imqConnectionFactory"
  class="bitronix.tm.resource.jms.PoolingConnectionFactory"
  init-method="init"
  destroy-method="close">
                <property name="className" value="com.ess.tts.util.IMQConnectionFactory" />
                <property name="uniqueName" value="imqbroker" />
                <property name="minPoolSize" value="1" />
                <property name="maxPoolSize" value="5" />
                <property name="driverProperties">
                        <props>
                                <prop key="imqBrokerHostName">localhost</prop>
                                <prop key="imqBrokerHostPort">7676</prop>
                                <prop key="imqDefaultUsername">admin</prop>
                                <prop key="imqDefaultPassword">admin</prop>
                        </props>
                </property>
        </bean>
 
  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory" ref="imqConnectionFactory"/>
  </bean>
 
  <bean id="voiceRequestImpl" class="com.ess.tts.voicerequest.VoiceRequestImpl">
  <property name="jmsTemplate" ref="jmsTemplate"/>
  <property name="requestQueue" ref="imqRequestQueue"/>
  </bean>

  <bean id="voiceRequestConsumer" class="com.ess.tts.voicerequest.queue.VoiceRequestConsumer">
  <property name="jmsTemplate" ref="jmsTemplate"/>
  <property name="responseQueue" ref="imqResponseQueue"/>
  </bean>

  <bean id="btmConfig"
  factory-method="getConfiguration"
  class="bitronix.tm.TransactionManagerServices">
                <property name="serverId" value="spring-btm" />
        </bean>

        <bean id="bitronixTransactionManager"
                factory-method="getTransactionManager"
                class="bitronix.tm.TransactionManagerServices"
                depends-on="btmConfig"
                destroy-method="shutdown" />
       
  <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
                <property name="transactionManager" ref="bitronixTransactionManager" />
                <property name="userTransaction" ref="bitronixTransactionManager" />
        </bean>
       
        <aop:aspectj-autoproxy/>
       
        <tx:advice id="txAdvice" transaction-manager="transactionManager"> 
                <tx:attributes> 
                        <tx:method name="*" propagation="REQUIRED" /> 
                </tx:attributes> 
        </tx:advice> 

        <aop:config> 
                <aop:pointcut
                        id="voiceRequestSender"
                        expression="execution(String com.ess.tts.voicerequest.VoiceRequestImpl.submitVoiceRequest(..))" />
                <aop:pointcut
                        id="messageReceptionOperations"
                        expression="execution(void com.ess.tts..*.onMessage(..)) and target(javax.jms.MessageListener)" />
                <aop:advisor advice-ref="txAdvice" pointcut-ref="voiceRequestSender" /> 
                <aop:advisor advice-ref="txAdvice" pointcut-ref="messageReceptionOperations" />
        </aop:config> 

  <bean id="imqRequestQueue" class="com.ess.tts.util.IMQQueue">
  <property name="destinationName" value="voiceRequestQueue"/>
  </bean>
  <bean id="imqResponseQueue" class="com.ess.tts.util.IMQQueue">
  <property name="destinationName" value="voiceResponseQueue"/>
  </bean>

        <bean id="voiceResponseConsumer" class="com.ess.tts.voiceresponse.queue.VoiceResponseConsumer" />
       
        <bean id="jmsRequestContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="imqConnectionFactory"/>
    <property name="destination" ref="imqRequestQueue"/>
    <property name="messageListener" ref="voiceRequestConsumer"/>
    <property name="concurrentConsumers" value="5" />
    <property name="transactionManager" ref="transactionManager"/>
        </bean>
        <bean id="jmsResponseContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="imqConnectionFactory"/>
    <property name="destination" ref="imqResponseQueue"/>
    <property name="messageListener" ref="voiceResponseConsumer"/>
    <property name="concurrentConsumers" value="5" />
    <property name="transactionManager" ref="transactionManager"/>
        </bean>

</beans>


IMQConnectionFactory.java:

public class IMQConnectionFactory extends com.sun.messaging.XAQueueConnectionFactory {

        public void setImqBrokerHostName(String hostname) throws JMSException {
                this.setProperty(ConnectionConfiguration.imqBrokerHostName,hostname);
        }
        public void setImqBrokerHostPort(String port) throws JMSException {
                this.setProperty(ConnectionConfiguration.imqBrokerHostPort,port);
        }
        public void setImqDefaultUsername(String username) throws JMSException {
                this.setProperty(ConnectionConfiguration.imqDefaultUsername,username);
        }
        public void setImqDefaultPassword(String password) throws JMSException {
                this.setProperty(ConnectionConfiguration.imqDefaultPassword,password);
        }
}


VoiceRequestImpl.java:

@WebService(endpointInterface="com.ess.tts.voicerequest.VoiceRequest",
                serviceName="VoiceRequestService",
                portName="VoiceRequestPort")
public class VoiceRequestImpl implements VoiceRequest {

    private static Logger log = LoggerFactory.getLogger(VoiceRequestImpl.class);
    private static JmsTemplate jmsTemplate;
    private static Queue requestQueue;

        public String submitVoiceRequest(final VoicingRequestData voicingData) {
                if (log.isDebugEnabled()) {
                        log.debug("request to voice received with id " + voicingData.getId());
                }
                try {
                jmsTemplate.send(requestQueue, new MessageCreator() {
                    public Message createMessage(Session session) throws JMSException {
                      return session.createObjectMessage(voicingData);
                    }
                });
                } catch (RuntimeException e) {
                        log.error("processing of request failed",e);
                        throw e;
                }
                if (log.isDebugEnabled()) {
                        log.debug("request " + voicingData.getId() + " queued for voicing");
                }
                return "voicing request received";
        }

        public void setJmsTemplate(JmsTemplate jmsTemplate) {
                VoiceRequestImpl.jmsTemplate = jmsTemplate;
        }
        public void setRequestQueue(Queue requestQueue) {
                VoiceRequestImpl.requestQueue = requestQueue;
        }
}


Caused by: javax.jms.JMSException: error enlisting a MessageProducerWrapper of a DualSessionWrapper in state ACCESSIBLE of a JmsPooledConnection of pool imqbroker in state ACCESSIBLE with underlying connection BrokerAddress=localhost:7676(1060), ConnectionID=905260681592230144, ReconnectEnabled: false, IsConnectedToHABroker: false with 1 opened session(s)
        at bitronix.tm.resource.jms.MessageProducerWrapper.enlistResource(MessageProducerWrapper.java:44)
        at bitronix.tm.resource.jms.MessageProducerWrapper.send(MessageProducerWrapper.java:58)
        at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:597)
        at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:574)
        at org.springframework.jms.core.JmsTemplate$3.doInJms(JmsTemplate.java:541)
        at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:471)
        ... 37 more
Caused by: bitronix.tm.internal.BitronixSystemException: resource 'imqbroker' cannot be used outside XA transaction scope. Set allowLocalTransactions to true if you want to allow this and you know your resource supports this.
        at bitronix.tm.resource.common.TransactionContextHelper.enlistInCurrentTransaction(TransactionContextHelper.java:58)
        at bitronix.tm.resource.jms.MessageProducerWrapper.enlistResource(MessageProducerWrapper.java:42)
        ... 42 more



my VoiceRequestConsumer onMessage method is also the method that sends a message to the next Queue

I have read that if the objects being instrumented for transactions are not created by Spring that the pointcuts won't work, but the MessageListener(s) are created by the Spring JMS container and my first sender is a jax-ws web service also instantiated by Spring

So ... where am I going wrong here?

Thanks,
Steve Maring
Reply | Threaded
Open this post in threaded view
|

Re: transaction problem with Tomcat + Spring + OpenMQ

Dennis Brakhane-2
On Wed, Jun 10, 2009 at 9:41 PM, stevenmaring<[hidden email]> wrote:
>
> If I add ...
>
> <property name="allowLocalTransactions" value="true" />
>
> to the Spring config, my messages make it into the queue without errors, but
> what is the downside to this?

That they don't take part in global transactions, which is the whole point of
a transaction manager

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

    http://xircles.codehaus.org/manage_email


Reply | Threaded
Open this post in threaded view
|

Re: transaction problem with Tomcat + Spring + OpenMQ

Ludovic Orban
Administrator
In reply to this post by stevenmaring
You haven't started a JTA transaction before calling JmsTemplate.send(). Some way or another your Spring config is wrong as you configured an AOP pointcut to manage transactions but nothing happens.

I advise you to enable debug logs on bitronix.tm.BitronixTransaction so you can see when transactions are started / committed / rolled back and to try different ways of managing transactions in your Spring config until you find out what you did wrong.