Mar 13 2010

A simple custom thread queue

Category: ThreadingMike Lovell @ 8:05 pm

If you use the built-in ‘ThreadPool’, you lose a lot of control over the thread after you queue up your work.  Here’s an example of a custom thread queue.  You can add more work to it and adjust the maximum threads whilst it’s running, like the built-in ‘ThreadPool’But you can issue a ‘Stop’ and terminate all the currently running threads.  You can expand the classes to add even more functionality but I’ve just covered the basics.

As always, I’m going to handle all my ‘work’ using the ‘Action’ class.  I’ll be using the ‘Queue’ class to hold all my ‘work’ inside, and a ‘List’ to hold my threads, for which I’ll be making a simple wrapper for the ‘Thread’ class called ‘ManagedThread’.

First lets define the ‘ManagedThread’ class.  We’ll tack onto the end of the ‘work’ a new delegate to call a ‘Completed’ event.  So this will code will run after the ‘work’ is completed and fire an event that can be monitored later in our code.

   1:  // Author: Mike Lovell (mike.lovell@gotinker.com)
   2:   
   3:  class ManagedThread
   4:  {
   5:      private    Thread    thread;
   6:      private    Action    work;
   7:      private bool    isRunning = false;
   8:   
   9:   
  10:      public bool IsRunning
  11:      {
  12:          get { return isRunning; }
  13:      }
  14:   
  15:   
  16:      public    event    EventHandler<EventArgs>    Completed;
  17:   
  18:   
  19:      public void AssignWork(Action work)
  20:      {
  21:          this.work = null;    // Clear old work
  22:   
  23:          this.work += work;
  24:      }
  25:   
  26:   
  27:      public void Start()
  28:      {
  29:          if (isRunning) throw new InvalidOperationException("The thread is already started");
  30:   
  31:          work += delegate()    // Add our completed handler to the end of the work
  32:              {
  33:                  if (Completed != null) Completed(this, EventArgs.Empty);    // If there's a completed
  34:                                                                              // handled, call it!
  35:   
  36:                  isRunning = false;
  37:   
  38:                  thread = null;
  39:              };
  40:   
  41:          thread = new Thread(new ThreadStart(work));
  42:   
  43:          isRunning = true;
  44:   
  45:          thread.Start();
  46:      }
  47:   
  48:   
  49:      public void Stop()
  50:      {
  51:          if (!isRunning) return;    // Not running anyway
  52:   
  53:          if (thread.ThreadState != ThreadState.Stopped) thread.Abort();
  54:      }
  55:  }
  56:   

Now lets create a class to hold the threads and handle the work queue.  We’re going to add a ‘Completed’ event to this class also, but this is going to only fire when we’re ‘Completed’ ALL the work.  It can potential fire more than once (it will fire when the queue is empty, and there are no threads running – i.e. all the work assigned has been completed).

  57:   
  58:  class ThreadQueue
  59:  {
  60:      public enum QueueStatus
  61:      {
  62:          Stopped,
  63:          Stopping,
  64:          Started
  65:      }
  66:   
  67:   
  68:      public    event    EventHandler<EventArgs>    Completed;
  69:   
  70:      private            Queue<Action>            workQueue    = new Queue<Action>();
  71:      private            List<ManagedThread>        threads        = new List<ManagedThread>();
  72:   
  73:      public    int         MaxThreads            = 10;
  74:      private    int         runningThreads        =  0;
  75:      private    bool        threadingMoreWork    = false;
  76:      private QueueStatus status                = QueueStatus.Stopped;
  77:   
  78:   
  79:      public QueueStatus Status
  80:      {
  81:          get { return status; }
  82:          set { status = value; }
  83:      }
  84:   
  85:   
  86:      public int RunningThreads
  87:      {
  88:          get { return runningThreads; }
  89:          set { runningThreads = value; }
  90:      }
  91:   
  92:   
  93:      public void AssignWork(Action work)
  94:      {
  95:          lock (workQueue)
  96:          {
  97:              workQueue.Enqueue(work);
  98:          }
  99:      }
 100:   
 101:   
 102:      public int WorkQueueSize()
 103:      {
 104:          return workQueue.Count;
 105:      }
 106:   
 107:   
 108:      public void Start()
 109:      {
 110:          if (status != QueueStatus.Stopped) throw new InvalidOperationException("Cannot start when Status != Stopped");
 111:   
 112:          status = QueueStatus.Started;
 113:   
 114:          ThreadMoreWork();
 115:      }
 116:   
 117:   
 118:      private void ThreadMoreWork()
 119:      {
 120:          threadingMoreWork = true;
 121:   
 122:          while (status == QueueStatus.Started && runningThreads < MaxThreads)
 123:          {    // If there's more to do, thread more work
 124:              lock (workQueue)
 125:              {
 126:                  if (workQueue.Count > 0)
 127:                  {
 128:                      lock (threads)
 129:                      {
 130:                          var thread = (ManagedThread)null;
 131:   
 132:                          if ((thread = AvailableThread()) == null)
 133:                          {    // If there's no ManagedThreads not 'Running', create a new one
 134:                              thread = new ManagedThread();
 135:   
 136:                              thread.Completed += delegate(object sender, EventArgs args)
 137:                                  {    // When the thread is completed, reduce the runningthread counter
 138:                                      runningThreads--;
 139:  
 140:                                      lock (workQueue)
 141:                                      {
 142:                                          if (workQueue.Count == 0 && runningThreads == 0)
 143:                                          {    // All work is done, queue is empty
 144:                                              lock (threads)
 145:                                              {
 146:                                                  threads.Clear();
 147:                                              }
 148:   
 149:                                              status                = QueueStatus.Stopped;
 150:                                              threadingMoreWork    = false;
 151:   
 152:                                              if (Completed != null)Completed(this, EventArgs.Empty);
 153:                                          }
 154:                                      }
 155:   
 156:                                      if (!threadingMoreWork) ThreadMoreWork();    // Thread more work again
 157:                                  };
 158:   
 159:                              threads.Add(thread);
 160:                          }
 161:   
 162:                          thread.AssignWork(workQueue.Dequeue());    // Assign work from the queue to the new thread
 163:   
 164:                          runningThreads++;    // Increase the runningthread counter
 165:   
 166:                          thread.Start();        // Start the thread
 167:                      }
 168:                  }
 169:              }
 170:          }
 171:   
 172:          threadingMoreWork = false;
 173:      }
 174:   
 175:   
 176:      public void Stop()
 177:      {
 178:          status = QueueStatus.Stopping;    // Indicate stopping is in progess
 179:   
 180:          lock (threads)
 181:          {
 182:   
 183:              var active    =
 184:                  (    // Get a list of the active threads
 185:                      from    item
 186:                      in        threads
 187:                      where    item.IsRunning
 188:                      select    item
 189:                  ).ToList();
 190:   
 191:              active.ForEach(delegate(ManagedThread thread)
 192:                  {    // Stop each thread
 193:                      thread.Stop();
 194:                  });
 195:   
 196:              RunningThreads = 0;    // There are no more threads running, so set to 0
 197:   
 198:              threads.Clear();    // Clear out the ManagedThread List
 199:   
 200:              if (Completed != null) Completed(this, EventArgs.Empty);    // If there's a completed
 201:                                                                          // handler call it
 202:          }
 203:      }
 204:   
 205:   
 206:      private ManagedThread AvailableThread()
 207:      {
 208:          var matches =
 209:              (    // Use LINQ to get the threads not running
 210:                  from    item
 211:                  in        threads
 212:                  where    !item.IsRunning
 213:                  select    item
 214:              );
 215:   
 216:          if (matches.Count() == 0) return null;    // if there is none, return a null
 217:   
 218:          return matches.First();    // Otherwise, return the first one in the enumeration
 219:      }
 220:  }
 221:   

As you can see, when new work is assigned, we enter it into our ‘Queue<Action>’ class called ‘workQueue’ .  The function ‘ThreadMoreWork’ is used to start up new threads (if needed).  When we create a new thread and add it into ‘threads’ we hook up the ‘Completed’ event of that thread to call ‘ThreadMoreWork’ (if it’s not already running).  This means as soon as a thread is completed, we’re going to check if we need to start another one – If we do, well, we do!  In our ‘stop’ function, we’re using LINQ to find all the running threads, and using the very handy ‘ForEach’ function to Stop all the individual threads.  Then we fire the ‘Completed’ event.

Now we have a simple Console application to demonstrate the classes.  It’s going to queue up 100 pieces of ‘work’ (all will just print a ‘.’ to Console).  We’ll leave the maximum threads at the default we defined earlier (10).  We also have a timer setup to run every 5 seconds and report the progress.  We’ve hooked up the ‘Completed’ event to print ‘Completed’ to console when we’re all done.

 222:   
 223:  class Program
 224:  {
 225:      static void Main(string[] cargs)
 226:      {
 227:          var threadQueue = new ThreadQueue();
 228:          var checkTimer    = new Timer(delegate
 229:              {    // Poll for updated progress every 5 seconds
 230:                  Console.WriteLine("nThread Information Update! Running={0}, Remaining={1}", threadQueue.RunningThreads, threadQueue.WorkQueueSize());
 231:   
 232:              }, null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
 233:   
 234:          threadQueue.Completed += delegate(object sender, EventArgs args)
 235:              {
 236:                  // We've finished, stop the checkTimer and notify Console
 237:                  checkTimer.Dispose();
 238:                  Console.WriteLine("nCompleted!");
 239:              };
 240:   
 241:          for (int i=0; i<100; i++)
 242:          {    // Queue up our test work
 243:              threadQueue.AssignWork(delegate()
 244:                  {
 245:                      Console.Write(".");
 246:   
 247:                      Thread.Sleep(new Random().Next(5000));
 248:                  });
 249:          }
 250:   
 251:          Console.WriteLine("(Press enter to stop early)");
 252:   
 253:          threadQueue.Start();    // Start processing
 254:   
 255:          Console.ReadLine();
 256:   
 257:          if (threadQueue.Status != ThreadQueue.QueueStatus.Stopped) threadQueue.Stop();
 258:   
 259:          Console.ReadLine();    // Pause so we can see the output
 260:      }
 261:  }

Run it, and the output should look a little like this…

(Press enter to stop early)
......................
Thread Information Update! Running=10, Remaining=78
................
Thread Information Update! Running=10, Remaining=62
....................
Thread Information Update! Running=10, Remaining=42
....................
Thread Information Update! Running=10, Remaining=22
......................
Thread Information Update! Running=10, Remaining=0
Completed!

Job done.

Download Visual Studio 2010 Project (8.03k)

Tags: , , , , ,


Mar 11 2010

Locking and multi-threading

Category: ThreadingMike Lovell @ 9:58 pm

I noted over the last few days a lot of questions about ‘lock’ came up on the Microsoft Newsgroups.  I’ve come up with a nice demonstration of the flow of a multi-threaded application with and without locking.

Often in a multi-threaded application, you will have shared resources that all the threads need to use.  Now if those resources do not allow simultaneous access (a good example of this is writing to a text file) you’re going to end up throwing an exception if you try and use the same shared resource from two different threads at once, and you don’t use locking.

Lets move onto the example (the project can be downloaded at the bottom of this article)…

First lets make a class which can simulate a shared object and doesn’t like being called simultaneously.  It’s going to print ‘!’ to console if a ‘collision’ has occurred (where in a real world application, an Exception would occur) when the function ‘Write’ is used - If the function isn’t called simultaneously, it will just print ‘.’

   1:  class KindOfASync
   2:  {
   3:      bool writing = false;
   4:   
   5:      public void Write()
   6:      {
   7:          if (writing)
   8:          {
   9:              Console.Write("!");
  10:              return;
  11:          }
  12:   
  13:          writing = true;
  14:   
  15:          Console.Write(".");
  16:   
  17:          writing = false;
  18:      }
  19:  }
  20:   

Okay, pretty simple class.  Lets up the complication 100x! :-)   We’ll now create a few constants to define how many threads we want to use at once and the amount of iterations (the amount of times we’ll make the ‘Write’ call to our class above).  We’ll then make two ‘Thread’ and ‘Action’ arrays, ‘___A’ will demonstrate not using locks, and ‘___B’ will demonstrate using locks.  The ‘Action’ class is a great way for us to queue up all the work we want our threads to do.

  21:   
  22:  class Program
  23:  {
  24:      const    int    ThreadsToUse    = 10;
  25:      const    int    Iterations        = 300;
  26:   
  27:   
  28:      static void Main(string[] args)
  29:      {
  30:          var myClass        = new KindOfASync();
  31:   
  32:          var threadPoolA    = new Thread[ThreadsToUse];
  33:          var threadPoolB    = new Thread[ThreadsToUse];
  34:  
  35:          var workA        = new Action[ThreadsToUse];
  36:          var workB        = new Action[ThreadsToUse];

Now lets queue up all the work we want our threads to handle.  We’ll evenly divide the iterations between our threads by using the ‘modulus %’.  We’re going to use the int ’completed’ to detect when we’ve completed our iterations

  37:   
  38:          var completed    = 0;
  39:   
  40:          for (int i=0; i < Iterations; i++)
  41:          {
  42:              var threadInUse = i % ThreadsToUse;
  43:   
  44:              workA[threadInUse] += delegate()
  45:                  {
  46:                      myClass.Write();
  47:   
  48:                      completed++;
  49:                  };
  50:   
  51:              workB[threadInUse] += delegate()
  52:                  {
  53:                      lock (myClass)
  54:                          {
  55:                              myClass.Write();
  56:                          }
  57:   
  58:                      completed++;
  59:                  };
  60:          }
  61:   

Now lets tell ‘threadPoolA’ to work through ‘workA’ (no locking) and wait for it to complete.

  62:          Console.WriteLine("Running 'threadPoolA' (WITHOUT locking)");
  63:   
  64:          for (int i=0; i<ThreadsToUse; i++)
  65:          {
  66:              threadPoolA[i] = new Thread(new ThreadStart(workA[i]));
  67:              threadPoolA[i].Start();
  68:          }
  69:   
  70:          while (completed < Iterations)
  71:          {
  72:              Thread.Sleep(10);
  73:          }
  74:   

Now lets tell ‘threadPoolB’ to work through ‘workB’ (locking) and wait for it to complete

  75:          Console.WriteLine("n");
  76:  
  77:   
  78:          completed = 0;
  79:   
  80:          Console.WriteLine("Running 'threadPoolB' (WITH locking)");
  81:   
  82:          for (int i=0; i<ThreadsToUse; i++)
  83:          {
  84:              threadPoolB[i] = new Thread(new ThreadStart(workB[i]));
  85:              threadPoolB[i].Start();
  86:          }
  87:   
  88:          while (completed < Iterations)
  89:          {
  90:              Thread.Sleep(10);
  91:          }
  92:   
  93:   
  94:          Console.ReadLine();
  95:      }
  96:  }

And the results

Running 'threadPoolA' (WITHOUT locking)
...!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!...............................................
................................................................................
..................................................!.............................
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!..............................
Running 'threadPoolB' (WITH locking)
................................................................................
................................................................................
................................................................................
............................................................

The amount of ‘!’ you get will vary depending on the amount of iterations, the amount of threads, and your processor and the other work your computer is doing when it runs.  ‘lock’ allows you to obtain a mutual-exclusive lock on an object whilst inside the statement block, releasing it when it’s complete.  Not using it correctly can cause many headaches.

Some other blogs covering the subject:

Download Visual Studio 2010 Project (6.48k)

Tags: , , , , , , ,