Sunday, January 22, 2012

Tech : Singleton MDB on Glassfish 3 on OpenMq

With EJB 3.1 specification it is possible to specify MDB (message driven beans) that can be included in a glassfish web profile and packaged into .WAR file.  This enables pooling out of the box with minimal configuration.

With all these new changes, sometimes it gets quite confusing for IDE like Netbeans to keep up.  In my experience configuring JMS on Netbeans was bit of chore once you start customizing.   I needed to connect to a public REST interface with a rate limit, so I actually only wanted one JMS message at time to be processed, which was more work then I thought.

Its key to note the pool size of the CONSUMER and CONNECTION are not what makes the JMS consumer concurrent, instead its the MDB bean pooling.  This is important to note before you start changing your <connector-connection-pool> with max-pool-size="1" steady-pool-size="1".  Doing so will only change the max connections allowed, which is not what you want typically and cause timeouts as soon as more than one client connects.  Also imqConnectionFlowLimit="50"  imqConsumerFlowLimit = "1000", only change how we batch messages from the queue broker to the consumer.  It again does not impact concurrency.

Finally using synchronized keyword has no impact on a MDB.  It is because the MDB is design to run on clusters, which would not make is possible to synchronize across a cluster.  You should never try to manage threading when using any bean, including an MDB.

Below are steps to configure simple MDB with Queue Connection.  I'm use CMT (container managed transactions) and also ObjectMessage DTO to serialize the message.  A thread sleep is put into to achieve quick and dirty rate buferring.

1) You need to add your queue broker via jar files, imq.jar and jms.jar to your project http://mq.java.net/.

2) Add Connector resources for the jms/javaee6/FranchiseLocationQueue in glassfish-resources.xml (note old name sun-resources.xml)
 

        
            
            
    
    
3) Create a message driven consumer.  Note the transaction are managed by the container and connection by the persistence unit.  Transaction is started automatically before OnMessage();
 
@MessageDriven(mappedName = MMSG_FranchiseLocationConsumer.QUEUE_NAME,  description="Geoencodes Franchises", activationConfig = {
        @ActivationConfigProperty(  propertyName = "destinationType", propertyValue = "javax.jms.Queue"),                                                                                  
        @ActivationConfigProperty(  propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),  
} )
public class MMSG_FranchiseLocationConsumer implements MessageListener 
{
    
    public final static String CONN_FACTORY_NAME = "jms/javaee6/FranchiseLocationFactory";    
    public final static String QUEUE_NAME = "jms/javaee6/FranchiseLocationQueue"; 

    @PersistenceContext(unitName = "myPU")
    private EntityManager _em;
       
    //This is the context of this bean.
    @Resource
    private MessageDrivenContext context;      
    
    private static Log log = LogFactory.getLog(MMSG_FranchiseLocationConsumer.class);   
    
    // Constructor. Establish JMS publisher and subscriber 
    public MMSG_FranchiseLocationConsumer() throws Exception 
    {        
            log.info("Starting JMS Queue" + QUEUE_NAME);
    }
        
    // Receive message from topic subscriber  
    
    /*
     * The default is for the container to start a transaction before the onMessage method is invoked and will commit the transaction when the method returns, 
     * unless the transaction was marked as rollback through the message-driven context. There is more about transactions to discuss but for our discussion of MDBs, 
     * this will suffice. 
     */
    //
    @Override
    public void onMessage(Message message) {

        String msgId = "";
        MMSG_FranchiseDTO franchiseDTO = null;
        try 
        {
            ObjectMessage objectMessage = (ObjectMessage) message;
            
            msgId = objectMessage.getJMSMessageID();
            franchiseDTO = (MMSG_FranchiseDTO) objectMessage.getObject();
            
            log.info("Recieved Msg with Id:" + msgId + "with city:" + franchiseDTO.getFr().getAddress1() );           
        } 
        catch (JMSException e)
        { 
            log.debug("Failed to process message: " + msgId + " with exception" + e);
            context.setRollback(); return;
        }         
        addFranchiseToDb( franchiseDTO.getFr(), franchiseDTO.getBusinessGroupId() );
    }
    
    private void addFranchiseToDb(FranchisesRaw fr, int businessGroupId)
    {
        try
        {
            Thread.sleep(1); //note we could be prudent and keep the last accessed time and put a timer on it.
        }
        catch (InterruptedException e)
        {
            log.error("Interrupted Exception" + e);
        }                
        //Call your rate limited API here
    }         
}

4)  Add producer code in function as you'd like
Context ctx = null;           
ctx = new InitialContext();
_connFactory = (ConnectionFactory) ctx.lookup(MMSG_FranchiseLocationConsumer.CONN_FACTORY_NAME);
_franchiseQueue = (Queue) ctx.lookup(MMSG_FranchiseLocationConsumer.QUEUE_NAME);
initConnection();            
Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(_franchiseQueue);
ObjectMessage message = session.createObjectMessage();
message.setJMSMessageID("test message 1");
message.setObject( new MMSG_FranchiseDTO( franchiseRaw, businessGroupId ) );
producer.send(message);
session.close();
_connection.close();

5) Add the DTO object that we can pass across as a message (note you can bundle other POJO's like Database Entity files across.
        
public class MMSG_FranchiseDTO implements Serializable
{
   int _businessGroupId;
    FranchisesRaw _fr;
}

6) Now if you want a singleton bean, the following needs to be added manually to the web/WEB-INF folder in glassfish-ejb-jar.xml  By default Netbeans will copy everything in WEB-INF, so you don't need to worry about adding to build-impl.xml.  Also if you ask Netbean to add a"standard-deployment-descriptor" it will only add a ejb-jar.xml, which DOES NOT have all the options you get from the glassfish deployment descriptor.

  
  First Module
  
    
      MMSG_FranchiseLocationConsumer
      jms/javaee6/FranchiseLocationQueue
      
        1
        1
        1
        600
      
    
    
      singleton-bean-pool
      true
    
  


And there you go.  You can manipulate the pool size and make the MDB singleton or not as you wish. The JMS by default will run on the same JVM in "embedded" mode, which in my case was exactly what I needed.

No comments:

Post a Comment