Hi everyone,
I am trying to send a queue message with reply. The message was sent and received successfully. But the the queue sender never received the reply from the receiver.
The debug tells me that: before the TextMessage was sent, I can see that the replyTo is a "TemporaryQueue". When the TextMessage was received, the string message and long property "ID" were all correct, but the "replyTo" of the TextMessage is "null".
I think the "null" is the reason of reply message never received. But I do not know what is wrong. The following is my source code.
Thanks a lot for help!
package com.jmstest;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.commons.logging.LogFactory;
import com.cot.express.jms.cache.JmsConnectionFactoryCache;
import com.cot.express.jms.core.JmsConstants;
public class QueueMessageSender {
private static QueueMessageSender instance;
private QueueConnectionFactory qFactory = null;
private QueueConnection qconn = null;
private QueueSession qSession = null;
private Queue queue = null;
QueueSender qSender = null;
/* temp queue for reply */
private TemporaryQueue replyQueue = null;
private long queueId = -1;
private QueueReceiver replyReceiver = null;
private QueueMessageSender() {
}
public static QueueMessageSender getInstance(){
if(instance == null){
instance = new QueueMessageSender();
}
return instance;
}
public void send() throws Exception {
try{
//(1) get Cached QueueConnectionFactory
qFactory = JmsConnectionFactoryCache.getQueueConnectionFactory(JmsConstants.QUEUE_CONNECTION_FACTORY);
//(2) create a QueueConnection with the queue connection factory
qconn = qFactory.createQueueConnection();
//(3) create a QueueSession with the QueueConnection
//qSession = qconn.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
qSession = qconn.createQueueSession(false,Session.CLIENT_ACKNOWLEDGE);
//(4) JNDI lookup for the Queue
Context ctx = new InitialContext();
queue = (Queue)ctx.lookup(JmsConstants.QUEUE_NAME);
//(5) create a QueueSender with the QueueSession
qSender = qSession.createSender(queue);
//(6) create a temp queue for reply
replyQueue = qSession.createTemporaryQueue();
//Using time as topic ID
queueId = System.currentTimeMillis();
//(7)create a text message
String message = "This is a test.";
TextMessage msg = qSession.createTextMessage();
msg.setText(message);
//(8) set reply message
msg.setJMSReplyTo(replyQueue);
msg.setLongProperty("ID",queueId);
//(9) send the text message
qSender.send(msg);
//send a non-text control message to indicate the end of the message stream
qSender.send(qSession.createMessage());
//wait some time and try to get the reply message
boolean received = false;
Message replyMessage = null;
int waittime = new Integer(JmsConstants.RECEIVE_WAIT_TIME).intValue();
replyReceiver = qSession.createReceiver(replyQueue);
while (!received){
Thread.sleep(1500);
qconn.start();
replyMessage = replyReceiver.receive(waittime*1000);
String m = ((TextMessage)replyMessage).getText();
if(m.equals("success")){
LogFactory.getLog(getClass()).debug("++++++++++++++++++++++++++");
LogFactory.getLog(getClass()).debug("Message received and processed successfully!");
LogFactory.getLog(getClass()).debug("++++++++++++++++++++++++++");
}
}
}catch(Exception e){
throw e;
}finally{
try{
if(qconn != null){
qconn.close();
}
}catch(JMSException e){
//do nothing
}
}
}
}
****************************************
package com.jmstest;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.cot.express.jms.cache.JmsConnectionFactoryCache;
import com.cot.express.jms.core.JmsConstants;
import com.cot.framework.core.exception.BaseException;
public class QueueMessageReceiver {
private Log log = LogFactory.getLog(getClass());
private static QueueMessageReceiver instance;
private QueueConnectionFactory qFactory = null;
private QueueConnection qconn = null;
private QueueSession qSession = null;
private Queue queue = null;
private QueueReceiver qReceiver = null;
private Message msg = null;
private QueueSender sender = null;
private TextMessage replyMsg = null;
private QueueSession replySession = null;
private QueueMessageReceiver() {
}
public static QueueMessageReceiver getInstance(){
if(instance == null){
instance = new QueueMessageReceiver();
}
return instance;
}
private void receive() throws BaseException {
try{
//(1) get Cached QueueConnectionFactory
qFactory = JmsConnectionFactoryCache.getQueueConnectionFactory(JmsConstants.QUEUE_CONNECTION_FACTORY);
//(2) Create a QueueConnection with the QueueConnectionFactory
qconn = (QueueConnection)qFactory.createQueueConnection();
//(3) create a QueueSession with the QueueConnection
qSession = qconn.createQueueSession(true,0);
//(4) JNDI lookup for the Queue
Context ctx = new InitialContext();
queue = (Queue)ctx.lookup(JmsConstants.QUEUE_NAME);
//(5) create a QueueReceiver with the QueueSession
qReceiver = qSession.createReceiver(queue);
//(6) start the message receiver from the QueueConnection
qconn.start();
//(7) receive the message and return it
int waittime = new Integer(JmsConstants.RECEIVE_WAIT_TIME).intValue();
msg = qReceiver.receive(waittime);
TextMessage message = null;
if(msg != null && msg instanceof TextMessage){
message = (TextMessage)msg;
}
//(8) send the reply message
Queue replyQueue = (Queue)msg.getJMSReplyTo();
sender = qSession.createSender(replyQueue);
replyMsg = qSession.createTextMessage("success");
replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID());
replyMsg.setLongProperty("ID",msg.getLongProperty("ID"));
sender.send(replyMsg);
}catch(Exception e){
log.error("Error in Queue receiver initialization: ", e);
throw new BaseException("Error in Queue receiver initialization: " + e.getMessage());
}finally{
try{
if(qconn != null){
qconn.close();
}
}catch (JMSException e){
//do nothing
}
}
}
}