Tuesday, May 20, 2008

Multithreaded Queue Service with TcpListener and TcpClient

Yesterday I wrote about a design issue I have with a report generation system I built.  In particular, I need a way to indicate to a set Windows services that new reports need to be generated.  The root of the scheme is a trigger that copies data from an invitation's table (participants are invited to take an assessment--once the assessment is complete, the report needs to be generated) to a queue implemented in Microsoft SQL Server 2005.  Read about the details of the problem here.

I have been playing with this problem a little bit tonight.  The requirement that I have is that the trigger must be able to quickly transmit a simple message to the waiting report services and return.  Triggers should never block (which in this experiment, the do--I am just having fun here).  The first thing is to mark the database as TRUSTWORTHY and deploy the CRL trigger with full access.  As I mentioned in the previous post, there are dangers and side-effects, and even a better way to enable threading and access to the network stack from within a CLR trigger, but for the purposes of my experiment, it is sufficient.

The newest iteration uses a server (call it QueueFxServer for this discussion), implemented as a Windows service, that waits for incoming TCP connections to port 4444.  The client is a simple console application that simulates the actions take by the CLR trigger.

QueueFxServer waits for incoming TCP connections and puts the work required to read the simple message from the client on the thread pool.  The thread pool does the work of unpacking the simple message, which in this case is a GUID.  The GUID is enqueued on a thread-safe queue and signals another thread, which is blocking on a call to WaitOne (it is a ManualResetEvent).  This other thread, called the QueueWorkThread thread, does the work to process the message.  Why separate threads?  

The thread that handles accepting client requests must do so very quickly to prevent the client from blocking.  The requirement is to accept the incoming client request, unpack the GUID, and enqueue it for processing.  The QueueWorkThread can run when processor usage is low.  The detecting of processor time is done through performance counters, which you can see below.

Here is the server implementation:

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Diagnostics;
   4: using System.Net;
   5: using System.Net.Sockets;
   6: using System.ServiceProcess;
   7: using System.Text;
   8: using System.Threading;
   9:  
  10: namespace ColbyAfrica.Experiments.QueueFx.Services.Windows.QueueFxServer
  11: {
  12:     public partial class Service : ServiceBase
  13:     {
  14:         private const int BUFFER_SIZE = 1024;
  15:         private TcpListener _QueueServer = null;
  16:         private volatile bool _Stop = false;
  17:         private Queue<Guid> _WorkQueue = null;
  18:         private ManualResetEvent _WorkQueuedEvent = new ManualResetEvent(false);
  19:  
  20:         public Service()
  21:         {
  22:             InitializeComponent();
  23:         }
  24:  
  25:         private Queue<Guid> WorkQueue
  26:         {
  27:             get { return _WorkQueue; }
  28:             set { _WorkQueue = value; }
  29:         }
  30:  
  31:         protected override void OnStart(string[] args)
  32:         {
  33:             Thread serviceMainThread = new Thread(ServiceMain);
  34:  
  35:             serviceMainThread.Name = "QueueFxServer";
  36:             serviceMainThread.Start();
  37:         }
  38:  
  39:         protected override void OnStop()
  40:         {
  41:             _Stop = true;
  42:  
  43:             _QueueServer.Stop();
  44:  
  45:             _QueueServer = null;
  46:         }
  47:  
  48:         private void ServiceMain()
  49:         {
  50:             Debugger.Break();
  51:  
  52:             try
  53:             {
  54:                 WorkQueue = new Queue<Guid>();
  55:  
  56:                 Thread queueWorkThread = new Thread(QueueWorkThread);
  57:  
  58:                 queueWorkThread.Name = "Queue Work Thread";
  59:                 queueWorkThread.Start();
  60:  
  61:                 _QueueServer = new TcpListener(IPAddress.Any, 44444);
  62:                 _QueueServer.Start();
  63:  
  64:                 while (!_Stop)
  65:                 {
  66:                     TcpClient client = null;
  67:                     bool enqueued = false;
  68:  
  69:                     try
  70:                     {
  71:                         client = _QueueServer.AcceptTcpClient();
  72:  
  73:                         ThreadPool.QueueUserWorkItem(ClientHandlerThread, client);
  74:  
  75:                         enqueued = true;
  76:                     }
  77:                     catch (SocketException exception)
  78:                     {
  79:                         Debug.Print(exception.Message);
  80:                         if (_Stop)
  81:                         {
  82:                             break;
  83:                         }
  84:                     }
  85:                     catch (ApplicationException exception) //    Per the QueueUserWorkItem docs..
  86:                     {
  87:                         Debug.Print(exception.Message);
  88:                     }
  89:                     catch (OutOfMemoryException exception)
  90:                     {
  91:                         Debug.Print(exception.Message);
  92:                     }
  93:                     finally
  94:                     {
  95:                         if (enqueued == false && client != null && client.Connected)
  96:                         {
  97:                             client.Close();
  98:                         }
  99:                     }
 100:                 }
 101:             }
 102:             catch (Exception exception)
 103:             {
 104:             }
 105:             finally
 106:             {
 107:                 if (_QueueServer != null)
 108:                 {
 109:                     _QueueServer.Stop();
 110:                 }
 111:             }
 112:         }
 113:  
 114:         private void QueueWorkThread()
 115:         {
 116:             int workerThreads;
 117:             int ioThreads;
 118:  
 119:             List<PerformanceCounter> cpuUsageList = new List<PerformanceCounter>();
 120:  
 121:             for (int i = 0; i < Environment.ProcessorCount; i++)
 122:             {
 123:                 PerformanceCounter cpuUsage = new PerformanceCounter("Processor", "% Processor Time", i.ToString());
 124:                 cpuUsageList.Add(cpuUsage);
 125:             }
 126:  
 127:             while (!_Stop)
 128:             {
 129:                 ThreadPool.GetAvailableThreads(out workerThreads, out ioThreads);
 130:  
 131:                 while (workerThreads == 0 || IsAnyCpuTimeAboveThreshold(cpuUsageList, 75.00))
 132:                 {
 133:                     Thread.Sleep(0);
 134:                 }
 135:  
 136:                 if (_WorkQueuedEvent.WaitOne(1000, true))
 137:                 {
 138:                     //  Work items queued
 139:                     int count;
 140:  
 141:                     lock (_WorkQueue)
 142:                     {
 143:                         count = _WorkQueue.Count;
 144:                     }
 145:  
 146:                     for (int i = 0; i < count; i++)
 147:                     {
 148:                         Guid guid;
 149:  
 150:                         lock (_WorkQueue)
 151:                         {
 152:                             guid = _WorkQueue.Dequeue();
 153:                         }
 154:  
 155:                         Debug.Print(guid.ToString());
 156:                     }
 157:                     _WorkQueuedEvent.Reset();
 158:                 }
 159:             }
 160:         }
 161:  
 162:         private bool IsAnyCpuTimeAboveThreshold(List<PerformanceCounter> cpuUsageList, double threshold)
 163:         {
 164:             foreach (PerformanceCounter performanceCounter in cpuUsageList)
 165:             {
 166:                 double value = performanceCounter.NextValue();
 167:  
 168:                 if (value > threshold)
 169:                 {
 170:                     Debug.Print(string.Format("Processor time @{0}", value));
 171:                     return true;
 172:                 }
 173:             }
 174:  
 175:             return false;
 176:         }
 177:  
 178:         private void ClientHandlerThread(object newClient)
 179:         {
 180:             TcpClient client = newClient as TcpClient;
 181:  
 182:             if (client != null)
 183:             {
 184:                 try
 185:                 {
 186:                     Thread.CurrentThread.Name = "ConnectionMain: " + client.Client.RemoteEndPoint;
 187:  
 188:                     using (NetworkStream stream = client.GetStream())
 189:                     {
 190:                         if (stream.CanRead)
 191:                         {
 192:                             List<byte> message = new List<byte>();
 193:                             byte[] buffer = new byte[BUFFER_SIZE];
 194:  
 195:                             do
 196:                             {
 197:                                 int bytesReceived = stream.Read(buffer, 0, buffer.Length);
 198:  
 199:                                 for (int i = 0; i < bytesReceived; i++)
 200:                                 {
 201:                                     message.Add(buffer[i]);
 202:                                 }
 203:                             } while (stream.DataAvailable);
 204:  
 205:                             Guid guid = new Guid(message.ToArray());
 206:  
 207:                             lock (_WorkQueue)
 208:                             {
 209:                                 _WorkQueue.Enqueue(guid);
 210:                             }
 211:  
 212:                             _WorkQueuedEvent.Set();
 213:  
 214:                             string response =
 215:                                 string.Format("{0} queued for {1}.", guid.ToString(), client.Client.RemoteEndPoint);
 216:  
 217:                             buffer = Encoding.Default.GetBytes(response);
 218:  
 219:                             stream.Write(buffer, 0, buffer.Length);
 220:                             stream.Flush();
 221:                             stream.Close();
 222:                         }
 223:                     }
 224:                 }
 225:                 catch (Exception exception)
 226:                 {
 227:                     Debugger.Break();
 228:                 }
 229:                 finally
 230:                 {
 231:                     client.Close();
 232:                 }
 233:             }
 234:         }
 235:     }
 236: }

The client implementation (with functionality that wouldn't be present in the CLR trigger!):

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Diagnostics;
   4: using System.Linq;
   5: using System.Net.Sockets;
   6: using System.Text;
   7:  
   8: namespace ColbyAfrica.Experiments.QueueFx.Clients.ConsoleClient
   9: {
  10:     class Program
  11:     {
  12:         private const int BUFFER_SIZE = 1024;
  13:  
  14:         static void Main(string[] args)
  15:         {
  16:             TcpClient client = null;
  17:             NetworkStream stream = null;
  18:             byte[] buffer;
  19:  
  20:             try
  21:             {
  22:                 client = new TcpClient("192.168.1.1", 44444);
  23:                 stream = client.GetStream();
  24:  
  25:                 buffer = Guid.NewGuid().ToByteArray();
  26:                 stream.Write(buffer, 0, buffer.Length);
  27:  
  28:                 if (stream.CanRead)
  29:                 {
  30:                     List<byte> message = new List<byte>();
  31:                     byte[] responseBuffer = new byte[BUFFER_SIZE];
  32:  
  33:                     do
  34:                     {
  35:                         int bytesReceived = stream.Read(responseBuffer, 0, buffer.Length);
  36:  
  37:                         for (int i = 0; i < bytesReceived; i++)
  38:                         {
  39:                             message.Add(responseBuffer[i]);
  40:                         }
  41:  
  42:                     } while (stream.DataAvailable);
  43:  
  44:                     Console.Out.WriteLine(Encoding.Default.GetString(message.ToArray()));
  45:                     Console.Out.WriteLine();
  46:                     //Console.Out.WriteLine("Press any key to continue.");
  47:                     //Console.ReadKey();
  48:                 }
  49:             }
  50:             catch(Exception exception)
  51:             {
  52:                 Debugger.Break();
  53:             }
  54:             finally
  55:             {
  56:                 if (stream != null)
  57:                 {
  58:                     stream.Close();
  59:                     stream.Dispose();
  60:                 }
  61:  
  62:                 if (client != null)
  63:                 {
  64:                     client.Close();
  65:                 }
  66:  
  67:             }               
  68:         }
  69:     }
  70: }

I will discuss how this works in a post tomorrow.  I am done for the evening!

1 comment :

Anonymous said...

http://eekshop.com
If all your family members filed bankruptcy,have to worry about under no circumstances really do not think discouraged all your family members are for no reason going to be the objective some form of So many people asked over protection under a bankruptcy laws a long time ago arranged of many years because relating to the innovative financial crisis, that there is the fact that don't you think reason you shall no longer be really do not think a little as though a multi function exclude back and forth from going to be the society Bankruptcy may just do not be the case talented day of the week all over the your life for more information regarding be aware that but take heart a resource box is usually that definitely rrn no way going to be the put an end to regarding life In fact,a resource box does have a multi function positive side of things as element allows to re-start your financial life expectancy debt-free and to explore reevaluate past mistakes.

Disclaimer

Content on this site is provided "AS IS" with no warranties and confers no rights. Additionally, all content on this site is my own personal opinion and does not represent my employer's view in any way.