java - Active MQ 5.10.0 message re-delivery policy not working -
i have never worked activemq's before. doing research project need redeliver message according listeners logic. per activemq's documentation, am:
using session.client_acknowledge mode depending on logic, acknowledging message or doing session.recover message again.using current code able read messages off testqueue once. re-delivery policies arn't working. tried settign initial delay message delivered automatically. wan message redelivered 5 times. thats not happening.
i creating connection factory, connection, session , queue using next code:
redeliverypolicy redeliverypolicy = new redeliverypolicy(); redeliverypolicy.setinitialredeliverydelay(0); redeliverypolicy.setmaximumredeliveries(5); redeliverypolicy.setredeliverydelay(1000); redeliverypolicy.setqueue("*"); redeliverypolicy.setbackoffmultiplier(2); mill = new activemqconnectionfactory(activemqconnection.default_broker_url); factory.setredeliverypolicy(redeliverypolicy); connection = (activemqconnection) factory.createconnection(); // connection.setredeliverypolicy(redeliverypolicy); connection.start(); session session = connection.createsession(false, session.client_acknowledge); string queuename = "testqueue"+(uuid.randomuuid()); system.out.println("creating queue: "+queuename); testqueue = session.createqueue(queuename); i tried setting redeliverypolicy activemqconnectionfactory , activemqconnection both (didnt work me.)
i in consumer doing next set message listener:
messageconsumer consumer = session.createconsumer(testqueue); consumer.setmessagelistener(new mymessagelistener(id, session)); mymessagelistener:
public void onmessage(message message) { seek { if (message instanceof textmessage) { textmessage textmessage = (textmessage) message; string text = textmessage.gettext(); system.out.println("consuming["+threadid+"] message: " + text); if(text.contains("rd")){ system.out.println("acknowledgethismessage"); ((com.sun.messaging.jms.message)message).acknowledgethismessage(); } else{ session.recover(); } } else{ system.out.println("consuming["+threadid+"] weird message: " + message); } } grab (jmsexception e) { e.printstacktrace(); } } if need more information, allow me know can update post. in advance.
i able redeliverypolicies work correctly if create listener within consumer. not sure why works , making seperate class doesn't. consumer looks like:
try { messageconsumer consumer = session.createconsumer(testqueue); consumer.setmessagelistener(new messagelistener() { public void onmessage(message message) { seek { if (message instanceof textmessage) { textmessage textmessage = (textmessage) message; string text = textmessage.gettext(); system.out.println("consuming["+threadid+"] message: " + text); if(text.contains("rd")){ system.out.println("acknowledgethismessage"); ((com.sun.messaging.jms.message)message).acknowledgethismessage(); } else{ system.out.println("recovering message"); session.recover(); } } else{ system.out.println("consuming["+threadid+"] weird message: " + message); } } grab (jmsexception e) { e.printstacktrace(); } } }); } grab (jmsexception e) { e.printstacktrace(); } hope helps someone.
java activemq
No comments:
Post a Comment