c# - MSMQ ReceiveById failing -
i have next code
public class msmqqueueprovider : iqueueprovider { public void veryfyisavailable(string name) { var queueaddress = string.format(@" .\private$\{0}", name); var message = "there problem while starting neasymessaging."; if (messagequeue.exists(queueaddress)) { using (var queue = new messagequeue(queueaddress)) { if (queue.canwrite && queue.canread) return; if (queue.canread == false) { message += string.format("queue {0} reachable not readable", queueaddress); throw new queueproviderproviderexception(message); } message += string.format("queue {0} reachable not writable", queueaddress); throw new queueproviderproviderexception(message); } } message += string.format("queue {0} cannot found", queueaddress); throw new queueproviderproviderexception(message); } public queuemessage peek(string queuename) { var queue = new messagequeue(string.format(@" .\private$\{0}", queuename), queueaccessmode.peek); var message = queue.peek(); // resharper disable 1 time possiblenullreferenceexception homecoming new queuemessage(message.id, message.label, new streamreader(message.bodystream).readtoend()); } public queuemessage receive(string queuename) { var queue = new messagequeue(string.format(@" .\private$\{0}", queuename), queueaccessmode.receive); var message = queue.receive(messagequeuetransactiontype.automatic); // resharper disable 1 time possiblenullreferenceexception homecoming new queuemessage(message.id, message.label, new streamreader(message.bodystream).readtoend()); } public queuemessage receivebyid(string queuename, string messageid) { var queue = new messagequeue(string.format(@" .\private$\{0}", queuename), queueaccessmode.receive); var message = queue.receivebyid(messageid, messagequeuetransactiontype.automatic); // resharper disable 1 time possiblenullreferenceexception homecoming new queuemessage(message.id, message.label, new streamreader(message.bodystream).readtoend()); } public void queuemessage(string messagecontent, string messagename, string queuename) { var queueaddress = string.format(@" .\private$\{0}", queuename); using (var streamreader = new stringreader(messagecontent)) { var message = new message { timetobereceived = message.infinitetimeout, timetoreachqueue = message.infinitetimeout, label = messagename, useauthentication = false, recoverable = true }; using (var queue = new messagequeue(queueaddress, queueaccessmode.send)) { using (var streamwriter = new streamwriter(message.bodystream)) { streamwriter.write(streamreader.readtoend()); streamwriter.flush(); queue.send(message, messagequeuetransactiontype.automatic); } queue.close(); } } } } public class unitofwork : iunitofwork { private transactionscope _transaction; public void start() { var transactionoptions = new transactionoptions { timeout = transactionmanager.maximumtimeout }; _transaction = new transactionscope(transactionscopeoption.requiresnew, transactionoptions); } public void completedwithsuccess() { if (transaction.current.transactioninformation.status == transactionstatus.active) { _transaction.complete(); } _transaction.dispose(); } public void completedwithfail() { _transaction.dispose(); } } public sealed partial class service : servicebase { private readonly ilog _log = logmanager.getlogger(typeof(service)); private readonly manualresetevent _shutdownevent = new manualresetevent(false); private thread _workerthread; private iqueueprovider _queueprovider; private iendpointconfiguration _configuration; private icontainer _container; public service() { initializecomponent(); servicename = ""; eventlog.log = ""; } public void init() { var endpointbootstrap = new endpointbootstrap(); endpointbootstrap.initialize(); _container = endpointbootstrap.ioccontainer; _queueprovider = _container.resolve<iqueueprovider>(); _configuration = _container.resolve<iendpointconfiguration>(); _workerthread = new thread(dowork) { name = "worker thread", isbackground = true }; _workerthread.start(); } protected override void onstart(string[] args) { init(); } protected override void onstop() { _shutdownevent.set(); if (!_workerthread.join(3000)) { _workerthread.abort(); } } private void dowork() { while (!_shutdownevent.waitone(0)) { var queuemessage = _queueprovider.peek(_configuration.queuename); seek { processmessage(queuemessage); } grab (exception ex) { _log.error(ex); movemessagetoerrorqueue(queuemessage.id); } } } private void processmessage(queuemessage message) { using (var dependencyscope = _container.beginlifetimescope()) { var unitofwork = dependencyscope.resolve<iunitofwork>(); unitofwork.start(); var messageprocessor = new messageprocessor(dependencyscope); seek { messageprocessor.handlemessage(message); _queueprovider.receivebyid(_configuration.queuename, message.id); } grab (exception ex) { _log.error(ex); unitofwork.completedwithfail(); throw; } unitofwork.completedwithsuccess(); } } private void movemessagetoerrorqueue(string messageid) { seek { using (var dependencyscope = _container.beginlifetimescope()) { var unitofwork = dependencyscope.resolve<iunitofwork>(); unitofwork.start(); var message = _queueprovider.receivebyid(_configuration.queuename, messageid); seek { _queueprovider.queuemessage(message.body, message.name, _configuration.queueerrorname); unitofwork.completedwithsuccess(); } grab { unitofwork.completedwithfail(); throw; } } } grab (exception ex) { _log.error(ex); } } } basically thought simple @ to the lowest degree on paper. messages taken queue fine , on dev machine works fine, problem when deploy code our server (windows 2008). if message not processed remove message queue , set error queue, problem is, method getbyid cannot find message:
private void movemessagetoerrorqueue(string messageid) var message = _queueprovider.receivebyid(_configuration.queuename, messageid); it works fine on dev boxes, can find way prepare this.
any help welcome.
thanks
update
following paul's comment:
hi paul, help. unfortunately unless understood wrong not begin received do. right pick message queue , since there 1 thread reading queue if later on receive message id, seems logic message still there. why think need things way do. peek message, create transaction scope , processing, , of course of study sql server sessions created during execution enroll transaction. if goes wrong during message processing need rollback changes done database , rollback transaction, need set failing message in error queue. can't in 1 transaction, remove message queue seek process message , if fail set error queue, because still need rollback database changes.
rather peek , trying message id etc, why not utilize beginreceive/receive or similar - way have message already.
just create sure set queue properties include body of message (can't remember if that's default receive etc)
msdn - messagequeue.beginreceive method msdn - messagequeue.receive method
(edit...)
if want improve visibility of messages etc seek downloading msmq inspector (and yes tool, created these sorts of scenarios, i.e. what's going on?!)
if utilize it, turn on "constant peek mode" , should see massages coming in. enable journal queues using messages processed see them in journal etc. perhaps double check message id's not beingness modified @ times not expect etc. also, different os , msmq run times , setup may causing different behaviours... hard without running code (which looks of posted could!!)
pk :-)
c# msmq
No comments:
Post a Comment