Roger,
Thanks for the response. I don't want to be misleading when I use the
word queue. We're actually using topics and topicSubscriber to consume events. Attached is the consumer code (with relevant parts included). We are using weblogic 8.1 as our app server. Again, the flow here is consume a message through receive(), attempt to deliver this message to our client, rollback in case of failure, sleep 60 seconds, then attempt redelivery. When receive is invoked the second time, the message is gone. TopicSession code and the line that does invokes the rollback are in bold.
- Dan
public class RubyMessageListener extends Thread
{
private TopicConnection topicConnection = null;
private TopicSession topicSession = null;
private TopicSubscriber topicSubscriber = null;
Context jndiContext = null;
private RubyMessageListener() {;}
public RubyMessageListener(
String url, AgmemsFilter aFilter,
ClientManager cmParam)
throws NamingException, JMSException, ConfigurationException
{
/**
Create a JNDI API InitialContext object if none exists
yet.
**/
jndiContext = new InitialContext();
/**
We're taking care of the connection establishment at object
startup time so that if we throw exception, we can throw
exception back up to subscribe caller who can report NG to
client
Look up connection factory and queue. If either does
not exist, exit.
**/
TopicConnectionFactory topicConnectionFactory = null;
Topic topic = null;
TextMessage message = null;
topicConnectionFactory = (TopicConnectionFactory) PortableRemoteObject.narrow(jndiContext.lookup("weblogic.jms.ConnectionFactory"), TopicConnectionFactory.class);
topic = (Topic) PortableRemoteObject.narrow(jndiContext.lookup("GovJMSTopic"), Topic.class);
/**
Create connection.
Create session from connection; true means session is
transacted.
Create receiver, then start message delivery.
Close connection.
**/
<b>
topicConnection = topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
topicSubscriber = topicSession.createSubscriber(topic);
topicConnection.start();
</b>
myLogger.info("RubyMessageListener(): queue connection has been started, now listening on events");
}
public void confirmAndFallout(TopicSession ts, String sleepTime, boolean commitFlag)
{
try
{
if (commitFlag)
{
ts.commit();
myLogger.info("confirmAndFallout(): transaction is committed.");
}
else
{
ts.rollback();
myLogger.info("confirmAndFallout(): transaction successfully rolled back, must wait the sleepTime, " + sleepTime + " ms, before pulling message off of queue.");
try
{
Thread.sleep(Integer.parseInt(sleepTime));
}
catch (Exception e)
{
myLogger.error("confirmAndFallout(): After rollback, thread sleep failed, moving on without sleep.");
}
}
myLogger.info("confirmAndFallout(): sleep has been completed, return to event listening.");
return;
}
catch (JMSException je)
{
try
{
myLogger.error("confirmAndFallout(): Unable to confirm delivery transaction, sleep, then try once more before bailing.");
Thread.sleep(Integer.parseInt(sleepTime));
if (commitFlag)
{
ts.commit();
myLogger.info("confirmAndFallout(): transaction is committed.");
}
else
{
ts.rollback();
myLogger.info("confirmAndFallout(): transaction successfully rolled back.");
}
}
catch (Exception e)
{
myLogger.error("confirmAndFallout(): The second attempt at jms confirmation has failed, return.");
return;
}
}
}
public void run()
{
try
{
myLogger.info("run(): Client has asked us to begin pulling events for url: " + listenerUrl);
while (!haltFlag)
{
myLogger.info("run(): BACK up at top of receive loop");
StringBuffer errstr = new StringBuffer();
//receive will initiate our jms transaction,
//commit or rollback will end the transaction
//receive is blocking, wait 55 minutes before committing
//transaction and looping again. We do this to avoid
//default 60 minute transaction timeout
Message m = topicSubscriber.receive(3300000);
if (m != null)
{
if (m instanceof TextMessage)
{
TextMessage message = (TextMessage) m;
String messageText = "";
String eventType = "";
Document
doc;
Node rootNode=null;
try
{
messageText = message.getText();
myLogger.info("run(): the following message has been pulled from the queue: " + messageText);
eventType = "";
doc = rubyxml.stringToXML(messageText);
rootNode = doc.getFirstChild();
}
catch (Exception e)
{
myLogger.error("run(): unable to understand the message we've pulled off the queue, commit transaction and continue. " + listenerUrl + " will not be receiving this message.");
confirmAndFallout(topicSession, listenerSleep, true);
continue;
}
String operationNodeName = "";
if (rootNode!=null)
operationNodeName = rootNode.getNodeName();
myLogger.info("run(): operationNodeName: " + operationNodeName);
if (operationNodeName.equals("tktInfoEvt"))
eventType = "tktInfoEvt";
else if (operationNodeName.equals("prmInfoEvt"))
eventType = "prmInfoEvt";
else if (operationNodeName.equals("rubyFacilityStatusEvt"))
eventType = "rubyFacilityStatusEvt";
else if (operationNodeName.equals("killEvt"))
eventType = "killEvt";
else
eventType = "heartbeatEvt";
/** If the message is a rubyFacilityStatusEvt,
tktInfoEvt, or prmInfoEvt
- perform filter(listenerUrl, mcn) to determine
whether our client cares about this message
- if client cares, stamp(url) to refresh clock,
convert the xml string to
java class, and send
message
- if client does not care, invoke discardedMsg(mcn)
and commit the transaction
- if ws request succeeds, commit
- if ws request fails, rollback and sit tight
for one minute
**/
else if (eventType.equals("rubyFacilityStatusEvt") ||
eventType.equals("tktInfoEvt") ||
eventType.equals("prmInfoEvt"))
{
String mcn = rubyxml.getValueByTag((Element)rootNode, "mcn", errstr);
if (gs.filter(listenerUrl, mcn)
== AgmemsFilter.FILTER_PASSED)
{
myLogger.info("run(): we've pulled a " + eventType + " message off of the queue and it's time to send it out to client: " + listenerUrl);
messageCount += 1;
Socket agmemsSocket = null;
try
{
if (eventType.equals("rubyFacilityStatusEvt"))
agmemsSocket = HttpUtilities.sendCommand(cu.populateFacilityEvt(facStatusServiceUrl, messageText, messageCount, facStatusAction, rdate, agmemsUsername, agmemsPassword, "", false), agmemsHost, Integer.parseInt(agmemsPort), Integer.parseInt(agmemsTimeout), facStatusServiceUrl);
else if (eventType.equals("tktInfoEvt"))
agmemsSocket = HttpUtilities.sendCommand(cu.populateTicketEvt(tktInfoServiceUrl, messageText, messageCount, tktInfoAction, rdate, agmemsUsername, agmemsPassword), agmemsHost, Integer.parseInt(agmemsPort), Integer.parseInt(agmemsTimeout), tktInfoServiceUrl);
else
agmemsSocket = HttpUtilities.sendCommand(cu.populatePrmEvt(prmInfoServiceUrl, messageText, messageCount, prmInfoAction, rdate, agmemsUsername, agmemsPassword), agmemsHost, Integer.parseInt(agmemsPort), Integer.parseInt(agmemsTimeout), prmInfoServiceUrl);
}
catch (Exception e)
{
//these are the kind of failures,
//timeout, socketException, etc. that we
//would like to retry
myLogger.error("run(): ERROR, " + eventType + " push was unable to be sent to " + listenerUrl + " because of following exception, rolling back transaction." + e);
confirmAndFallout(topicSession, listenerSleep, false);
continue;
}
myLogger.info("run(): Continued to readReplyFromSocket");
StringBuffer respStr = new StringBuffer();
try
{
HttpUtilities.readReplyFromSocket
(agmemsSocket, respStr);
myLogger.info("run(): response returned was " + respStr);
}
catch (Exception ie)
{
<b>//rollback, something wrong on client side
myLogger.error("run(): ERROR, " + eventType + " NOT delivered to " + listenerUrl + " rolling back transaction. " + ie.toString());
confirmAndFallout(topicSession, listenerSleep, false);
continue;
</b>
}
int agmemsMessageNumber=0;
agmemsMessageNumber =
cu.parseAgmemsResponse(respStr.toString());
if ((agmemsMessageNumber==-1) ||
(agmemsMessageNumber!=messageCount))
{
confirmAndFallout(topicSession, listenerSleep, true);
myLogger.error("run(): ERROR, " + eventType + " push response came back with NOT delivered to " + listenerUrl + " confirming transaction.");
}
else
{
confirmAndFallout(topicSession, listenerSleep, true);
myLogger.info("run(): " + eventType + " delivered to " + listenerUrl + " and transaction is committed.");
}
}
//Assumes CLIENT_INACTIVE return
else
{
//commit for cleanup only
confirmAndFallout(topicSession, listenerSleep, true);
setHaltFlag(true);
myLogger.error("run(): " + eventType + " NOT delivered to " + listenerUrl + ", CLIENT_INACTIVE, halting message receive on this listener.");
}
}
else
{
myLogger.error("run(): we've pulled a text message from the queue that is UNKNOWN, commit and continue.");
confirmAndFallout(topicSession, listenerSleep, true);
}
}
else
{
myLogger.error("run(): we've pulled a message from the queue that is UNKNOWN and not text, commit and continue.");
confirmAndFallout(topicSession, listenerSleep, true);
}
}
else
{
myLogger.error("run(): something strange happened, receive() returned with NO MESSAGE, commit and continue.");
confirmAndFallout(topicSession, listenerSleep, true);
}
}
}
catch (JMSException je)
{
myLogger.error("run(): ERROR, exception, cleanup connections: " + je.toString());
}
catch (Exception e)
{
myLogger.error("run(): ERROR, exception, cleanup connections: " + e.toString());
}
finally
{
myLogger.error("run(): broken out of message receive loop, closing session, subscription, and topicConnection and KILLING this thread.");
if (topicConnection != null)
{
try
{
cm.listenerTable.remove(listenerUrl);
topicSession.close();
topicSubscriber.close();
topicConnection.close();
}
catch (JMSException e)
{
myLogger.error("run(): As part of cleanup process, there was an exception attempting to close our topicConnection.");
}
}
}
}
}