• Post Reply Bookmark Topic Watch Topic
  • New Topic
programming forums Java Mobile Certification Databases Caching Books Engineering Micro Controllers OS Languages Paradigms IDEs Build Tools Frameworks Application Servers Open Source This Site Careers Other Pie Elite all forums
this forum made possible by our volunteer staff, including ...
Marshals:
  • Campbell Ritchie
  • Tim Cooke
  • paul wheaton
  • Jeanne Boyarsky
  • Ron McLeod
Sheriffs:
  • Paul Clapham
  • Liutauras Vilda
  • Devaka Cooray
Saloon Keepers:
  • Tim Holloway
  • Roland Mueller
Bartenders:

JMS - rollback but no redelivery

 
Greenhorn
Posts: 10
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
All,

I have a simple pub/sub on weblogic 8.1 that has a independent thread doing sychronous consumption. The session is transacted. Everything is smooth except when I attempt to rollback a transaction. In such a case, the message is not redelivered when I re-invoke the receive method 1 minute later. It is discarded.

Looking at the pending messages on the queue, the message is always discarded 40 seconds after transaction is rolled back. If I shorten my wait and call the receive method 20 seconds after the rollback, the message is discarded after 20 seconds on the receive call. I was expecting to pull the message I had previously pulled.

Message has infinite lifespan, redelivery limit is infinite. I also created a template and hardcoded the redelivery limit for the jms server and topic with no success.

Any pointers or leads would be appreciated.

thanks - Dan
 
Sheriff
Posts: 10445
227
IntelliJ IDE Ubuntu
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
I havent worked with weblogic, but how are you rolling back the transaction? Are you throwing an application exception? In that case the transaction will not be rolled back and you will have to invoke the setRollbackOnly method on the context. Not sure whether this is going to help you.
 
Dan Cleary
Greenhorn
Posts: 10
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
jaikiran,

Thanks for the reply. To rollback, I'm using the TopicSession's rollback() method. I'm not using EJB's or MDB's

- Dan
 
Ranch Hand
Posts: 1683
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
When a queue receiver rolls back a received message, the message is redelivered to the queue. Post your code and we may be able to help. What server are you using?
 
Dan Cleary
Greenhorn
Posts: 10
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Roger,

Thanks for the response. I don't want to be misleading when I use the word queue. We're actually using topics and topicSubscriber to consume events. Attached is the consumer code (with relevant parts included). We are using weblogic 8.1 as our app server. Again, the flow here is consume a message through receive(), attempt to deliver this message to our client, rollback in case of failure, sleep 60 seconds, then attempt redelivery. When receive is invoked the second time, the message is gone. TopicSession code and the line that does invokes the rollback are in bold.

- Dan


public class RubyMessageListener extends Thread
{
private TopicConnection topicConnection = null;
private TopicSession topicSession = null;
private TopicSubscriber topicSubscriber = null;
Context jndiContext = null;

private RubyMessageListener() {;}

public RubyMessageListener(String url, AgmemsFilter aFilter,
ClientManager cmParam)
throws NamingException, JMSException, ConfigurationException
{
/**
Create a JNDI API InitialContext object if none exists
yet.
**/
jndiContext = new InitialContext();

/**
We're taking care of the connection establishment at object
startup time so that if we throw exception, we can throw
exception back up to subscribe caller who can report NG to
client

Look up connection factory and queue. If either does
not exist, exit.
**/

TopicConnectionFactory topicConnectionFactory = null;
Topic topic = null;
TextMessage message = null;

topicConnectionFactory = (TopicConnectionFactory) PortableRemoteObject.narrow(jndiContext.lookup("weblogic.jms.ConnectionFactory"), TopicConnectionFactory.class);
topic = (Topic) PortableRemoteObject.narrow(jndiContext.lookup("GovJMSTopic"), Topic.class);

/**
Create connection.
Create session from connection; true means session is
transacted.
Create receiver, then start message delivery.
Close connection.
**/
<b>
topicConnection = topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
topicSubscriber = topicSession.createSubscriber(topic);
topicConnection.start();
</b>
myLogger.info("RubyMessageListener(): queue connection has been started, now listening on events");
}

public void confirmAndFallout(TopicSession ts, String sleepTime, boolean commitFlag)
{
try
{
if (commitFlag)
{
ts.commit();
myLogger.info("confirmAndFallout(): transaction is committed.");
}
else
{
ts.rollback();
myLogger.info("confirmAndFallout(): transaction successfully rolled back, must wait the sleepTime, " + sleepTime + " ms, before pulling message off of queue.");
try
{
Thread.sleep(Integer.parseInt(sleepTime));
}
catch (Exception e)
{
myLogger.error("confirmAndFallout(): After rollback, thread sleep failed, moving on without sleep.");
}
}
myLogger.info("confirmAndFallout(): sleep has been completed, return to event listening.");
return;
}
catch (JMSException je)
{
try
{
myLogger.error("confirmAndFallout(): Unable to confirm delivery transaction, sleep, then try once more before bailing.");
Thread.sleep(Integer.parseInt(sleepTime));

if (commitFlag)
{
ts.commit();
myLogger.info("confirmAndFallout(): transaction is committed.");
}
else
{
ts.rollback();
myLogger.info("confirmAndFallout(): transaction successfully rolled back.");
}
}
catch (Exception e)
{
myLogger.error("confirmAndFallout(): The second attempt at jms confirmation has failed, return.");
return;
}
}
}

public void run()
{
try
{
myLogger.info("run(): Client has asked us to begin pulling events for url: " + listenerUrl);

while (!haltFlag)
{
myLogger.info("run(): BACK up at top of receive loop");
StringBuffer errstr = new StringBuffer();
//receive will initiate our jms transaction,
//commit or rollback will end the transaction

//receive is blocking, wait 55 minutes before committing
//transaction and looping again. We do this to avoid
//default 60 minute transaction timeout
Message m = topicSubscriber.receive(3300000);

if (m != null)
{
if (m instanceof TextMessage)
{
TextMessage message = (TextMessage) m;
String messageText = "";
String eventType = "";
Document doc;
Node rootNode=null;

try
{
messageText = message.getText();

myLogger.info("run(): the following message has been pulled from the queue: " + messageText);

eventType = "";

doc = rubyxml.stringToXML(messageText);
rootNode = doc.getFirstChild();
}
catch (Exception e)
{
myLogger.error("run(): unable to understand the message we've pulled off the queue, commit transaction and continue. " + listenerUrl + " will not be receiving this message.");
confirmAndFallout(topicSession, listenerSleep, true);
continue;
}

String operationNodeName = "";
if (rootNode!=null)
operationNodeName = rootNode.getNodeName();

myLogger.info("run(): operationNodeName: " + operationNodeName);

if (operationNodeName.equals("tktInfoEvt"))
eventType = "tktInfoEvt";
else if (operationNodeName.equals("prmInfoEvt"))
eventType = "prmInfoEvt";
else if (operationNodeName.equals("rubyFacilityStatusEvt"))
eventType = "rubyFacilityStatusEvt";
else if (operationNodeName.equals("killEvt"))
eventType = "killEvt";
else
eventType = "heartbeatEvt";


/** If the message is a rubyFacilityStatusEvt,
tktInfoEvt, or prmInfoEvt
- perform filter(listenerUrl, mcn) to determine
whether our client cares about this message
- if client cares, stamp(url) to refresh clock,
convert the xml string to java class, and send
message
- if client does not care, invoke discardedMsg(mcn)
and commit the transaction
- if ws request succeeds, commit
- if ws request fails, rollback and sit tight
for one minute
**/

else if (eventType.equals("rubyFacilityStatusEvt") ||
eventType.equals("tktInfoEvt") ||
eventType.equals("prmInfoEvt"))
{
String mcn = rubyxml.getValueByTag((Element)rootNode, "mcn", errstr);


if (gs.filter(listenerUrl, mcn)
== AgmemsFilter.FILTER_PASSED)
{
myLogger.info("run(): we've pulled a " + eventType + " message off of the queue and it's time to send it out to client: " + listenerUrl);

messageCount += 1;
Socket agmemsSocket = null;

try
{
if (eventType.equals("rubyFacilityStatusEvt"))
agmemsSocket = HttpUtilities.sendCommand(cu.populateFacilityEvt(facStatusServiceUrl, messageText, messageCount, facStatusAction, rdate, agmemsUsername, agmemsPassword, "", false), agmemsHost, Integer.parseInt(agmemsPort), Integer.parseInt(agmemsTimeout), facStatusServiceUrl);
else if (eventType.equals("tktInfoEvt"))
agmemsSocket = HttpUtilities.sendCommand(cu.populateTicketEvt(tktInfoServiceUrl, messageText, messageCount, tktInfoAction, rdate, agmemsUsername, agmemsPassword), agmemsHost, Integer.parseInt(agmemsPort), Integer.parseInt(agmemsTimeout), tktInfoServiceUrl);
else
agmemsSocket = HttpUtilities.sendCommand(cu.populatePrmEvt(prmInfoServiceUrl, messageText, messageCount, prmInfoAction, rdate, agmemsUsername, agmemsPassword), agmemsHost, Integer.parseInt(agmemsPort), Integer.parseInt(agmemsTimeout), prmInfoServiceUrl);
}
catch (Exception e)
{
//these are the kind of failures,
//timeout, socketException, etc. that we
//would like to retry
myLogger.error("run(): ERROR, " + eventType + " push was unable to be sent to " + listenerUrl + " because of following exception, rolling back transaction." + e);
confirmAndFallout(topicSession, listenerSleep, false);
continue;
}

myLogger.info("run(): Continued to readReplyFromSocket");

StringBuffer respStr = new StringBuffer();

try
{
HttpUtilities.readReplyFromSocket
(agmemsSocket, respStr);
myLogger.info("run(): response returned was " + respStr);
}
catch (Exception ie)
{

<b>//rollback, something wrong on client side
myLogger.error("run(): ERROR, " + eventType + " NOT delivered to " + listenerUrl + " rolling back transaction. " + ie.toString());
confirmAndFallout(topicSession, listenerSleep, false);
continue;
</b>
}

int agmemsMessageNumber=0;

agmemsMessageNumber =
cu.parseAgmemsResponse(respStr.toString());

if ((agmemsMessageNumber==-1) ||
(agmemsMessageNumber!=messageCount))
{
confirmAndFallout(topicSession, listenerSleep, true);
myLogger.error("run(): ERROR, " + eventType + " push response came back with NOT delivered to " + listenerUrl + " confirming transaction.");
}
else
{
confirmAndFallout(topicSession, listenerSleep, true);
myLogger.info("run(): " + eventType + " delivered to " + listenerUrl + " and transaction is committed.");
}
}
//Assumes CLIENT_INACTIVE return
else
{
//commit for cleanup only
confirmAndFallout(topicSession, listenerSleep, true);
setHaltFlag(true);
myLogger.error("run(): " + eventType + " NOT delivered to " + listenerUrl + ", CLIENT_INACTIVE, halting message receive on this listener.");
}
}
else
{
myLogger.error("run(): we've pulled a text message from the queue that is UNKNOWN, commit and continue.");
confirmAndFallout(topicSession, listenerSleep, true);
}
}
else
{
myLogger.error("run(): we've pulled a message from the queue that is UNKNOWN and not text, commit and continue.");
confirmAndFallout(topicSession, listenerSleep, true);
}
}
else
{
myLogger.error("run(): something strange happened, receive() returned with NO MESSAGE, commit and continue.");
confirmAndFallout(topicSession, listenerSleep, true);
}
}
}
catch (JMSException je)
{
myLogger.error("run(): ERROR, exception, cleanup connections: " + je.toString());
}
catch (Exception e)
{
myLogger.error("run(): ERROR, exception, cleanup connections: " + e.toString());
}
finally
{
myLogger.error("run(): broken out of message receive loop, closing session, subscription, and topicConnection and KILLING this thread.");
if (topicConnection != null)
{
try
{
cm.listenerTable.remove(listenerUrl);
topicSession.close();
topicSubscriber.close();
topicConnection.close();
}
catch (JMSException e)
{
myLogger.error("run(): As part of cleanup process, there was an exception attempting to close our topicConnection.");
}
}
}
}
}
 
Roger Chung-Wee
Ranch Hand
Posts: 1683
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
My immediate reaction is that I don't like the fact that you have created a thread. Incoming JMS related requests execute in the JMS execute queue/thread pool, and the effect of an application created thread is unpredictable.

Also, do you need synchronous JMS? Bear in mind that JMS is basically for asynchronous messaging, and to invoke a blocking receive will tie up resources. This is why it is normally strongly recommended that you do not use synchronous JMS.
 
Dan Cleary
Greenhorn
Posts: 10
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Roger,

Architecture-wise, we're a bit bound. There is an existing requirement that messages must be delivered in-order to our client, if a message send fails, we must keep retrying that send every minute until their servers come back up (while queuing up new messages from our back end). Subsequently, I don't want to pull any new message until I'm sure the last one was successfully delivered.

- Dan
 
Ranch Hand
Posts: 775
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
With a topic you can't guarantee the messages will be delivered in order. Topics are the UDP of JMS, and the server will likely process their deliveries in whatever fashion creates the least overhead. Since a topic can have multiple consumers the topic isn't going to block waiting on you to decide if a message is going to be acknowledged or not. Even with a queue it isn't healthy to depend on strict ordering because you have to throw away all ability to scale to multiple threads and keep your fingers firmly crossed on redeliveries, particularly with external JMS providers like MQ that have the notion of explicit push-back of delivered messages.
 
Roger Chung-Wee
Ranch Hand
Posts: 1683
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
WebLogic Server has a proprietary facility to guarantee ordered redeiveries to asynchronous consumers so long as there is a single consumer. This means that only one instance of a MessageListener must be consuming the messages. Maybe this is an avenue worth exploring.
 
Dan Cleary
Greenhorn
Posts: 10
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
Conclusion: A bonehead mistake on our part (not my code!!). The message producer was setting the message timeToLive in seconds, not milliseconds... The value is correct in our properties file but he had it overridden internally. Thank you to those who took the time to give this a look.

- Dan
 
So I left, I came home, and I ate some pie. And then I read this tiny ad:
New web page for Paul's Rocket Mass Heaters movies
https://coderanch.com/t/785239/web-page-Paul-Rocket-Mass
reply
    Bookmark Topic Watch Topic
  • New Topic