In my application Mule (v2.2.1) is listing to WMQ and we have defined 10 consumer for the Mule connector.
We have implemented retry policy also as below .
So 10 consumer is listing the queue and now due to some unwanted reason some consumer are getting disconnected , due to that huge backlog is getting created in MQ.
We want - if any consumer is getting disconnected i.e if consumer number <10 then MULE should retry and will get 10 consumer. how could we do that ?
In our case when all the consumer is getting down then only MULE is retrying i.e retrying only in case of connector down .
we want retry in case of consumer is down also.
Spring Config:
<bean id="threadingPolicyTemplate"
class="com.test.platform.mule.mqpolicies.async.AsynchronousRetryTemplate">
<constructor-arg index="0">
<bean class="com.test.platform.mule.mqpolicies.policies.SimpleRetryPolicyTemplate">
<constructor-arg index="0" value="15000"/>
<constructor-arg index="1" value="1"/>
</bean>
</constructor-arg>
</bean>
Java Class:
import java.util.Map;
import javax.resource.spi.work.WorkException;
import org.mule.api.context.WorkManager;
import org.mule.api.retry.RetryCallback;
import org.mule.api.retry.RetryContext;
import org.mule.api.retry.RetryNotifier;
import org.mule.api.retry.RetryPolicy;
import org.mule.api.retry.RetryPolicyTemplate;
import org.mule.retry.RetryPolicyExhaustedException;
import org.mule.util.concurrent.Latch;
import com.test.platform.mule.mqpolicies.policies.RetryForeverPolicyTemplate;
/**
* The Class AsynchronousRetryTemplate.
*/
public class AsynchronousRetryTemplate implements RetryPolicyTemplate {
/** The Constant RETRY_TEMPLTE_300000. */
private static final String RETRY_TEMPLTE_300000 = "300000";
/** The delegate. */
private final RetryPolicyTemplate delegate;
/** The start latch. */
private Latch startLatch;
/**
* Instantiates a new asynchronous retry template.
*
* @param delegate the delegate
*/
public AsynchronousRetryTemplate(RetryPolicyTemplate delegate) {
this.delegate = delegate;
}
/**
* Execute.
*
* @param callback the callback
* @param workManager the work manager
*
* @return the retry context
*
* @throws Exception the exception
*/
public RetryContext execute(RetryCallback callback, WorkManager workManager)
throws Exception {
RetryWorker worker = new RetryWorker(this.delegate, callback, workManager, this.startLatch);
FutureRetryContext context = worker.getRetryContext();
try {
System.out.println("After Inside sync block****");
workManager.doWork(worker);
System.out.println("After Do Work" + context.isOk());
} catch (Exception e) {
e.printStackTrace();
System.out.println("****Inside Exception block****");
try {
RetryPolicyTemplate delegateForever = new RetryForeverPolicyTemplate(Long.parseLong(RETRY_TEMPLTE_300000));
RetryWorker workerTemp = new RetryWorker(delegateForever, callback, workManager, this.startLatch);
FutureRetryContext contextTemp = workerTemp.getRetryContext();
System.out.println("Inside Async block****");
workManager.scheduleWork(workerTemp);
return contextTemp;
} catch (WorkException w) {
throw new RetryPolicyExhaustedException(w, null);
}
}
return context;
}
Mule Config:
<jms:connector name="mqConnector"
specification="1.0.2b"
jndiInitialFactory="com.sun.jndi.fscontext.RefFSContextFactory"
jndiProviderUrl="file:///mqlocal/home/mqm/jmsadmin/Jndi"
connectionFactoryJndiName="TestApp"
numberOfConsumers="10">
<spring:property name="retryPolicyTemplate" ref="threadingPolicyTemplate" />
<spring:property name="jmsSupport" ref="customJmsSupport" />
</jms:connector>