Mar 13 2010

A simple custom thread queue

Category: ThreadingMike Lovell @ 8:05 pm

If you use the built-in ‘ThreadPool’, you lose a lot of control over the thread after you queue up your work.  Here’s an example of a custom thread queue.  You can add more work to it and adjust the maximum threads whilst it’s running, like the built-in ‘ThreadPool’But you can issue a ‘Stop’ and terminate all the currently running threads.  You can expand the classes to add even more functionality but I’ve just covered the basics.

As always, I’m going to handle all my ‘work’ using the ‘Action’ class.  I’ll be using the ‘Queue’ class to hold all my ‘work’ inside, and a ‘List’ to hold my threads, for which I’ll be making a simple wrapper for the ‘Thread’ class called ‘ManagedThread’.

First lets define the ‘ManagedThread’ class.  We’ll tack onto the end of the ‘work’ a new delegate to call a ‘Completed’ event.  So this will code will run after the ‘work’ is completed and fire an event that can be monitored later in our code.

   1:  // Author: Mike Lovell (mike.lovell@gotinker.com)
   2:   
   3:  class ManagedThread
   4:  {
   5:      private    Thread    thread;
   6:      private    Action    work;
   7:      private bool    isRunning = false;
   8:   
   9:   
  10:      public bool IsRunning
  11:      {
  12:          get { return isRunning; }
  13:      }
  14:   
  15:   
  16:      public    event    EventHandler<EventArgs>    Completed;
  17:   
  18:   
  19:      public void AssignWork(Action work)
  20:      {
  21:          this.work = null;    // Clear old work
  22:   
  23:          this.work += work;
  24:      }
  25:   
  26:   
  27:      public void Start()
  28:      {
  29:          if (isRunning) throw new InvalidOperationException("The thread is already started");
  30:   
  31:          work += delegate()    // Add our completed handler to the end of the work
  32:              {
  33:                  if (Completed != null) Completed(this, EventArgs.Empty);    // If there's a completed
  34:                                                                              // handled, call it!
  35:   
  36:                  isRunning = false;
  37:   
  38:                  thread = null;
  39:              };
  40:   
  41:          thread = new Thread(new ThreadStart(work));
  42:   
  43:          isRunning = true;
  44:   
  45:          thread.Start();
  46:      }
  47:   
  48:   
  49:      public void Stop()
  50:      {
  51:          if (!isRunning) return;    // Not running anyway
  52:   
  53:          if (thread.ThreadState != ThreadState.Stopped) thread.Abort();
  54:      }
  55:  }
  56:   

Now lets create a class to hold the threads and handle the work queue.  We’re going to add a ‘Completed’ event to this class also, but this is going to only fire when we’re ‘Completed’ ALL the work.  It can potential fire more than once (it will fire when the queue is empty, and there are no threads running – i.e. all the work assigned has been completed).

  57:   
  58:  class ThreadQueue
  59:  {
  60:      public enum QueueStatus
  61:      {
  62:          Stopped,
  63:          Stopping,
  64:          Started
  65:      }
  66:   
  67:   
  68:      public    event    EventHandler<EventArgs>    Completed;
  69:   
  70:      private            Queue<Action>            workQueue    = new Queue<Action>();
  71:      private            List<ManagedThread>        threads        = new List<ManagedThread>();
  72:   
  73:      public    int         MaxThreads            = 10;
  74:      private    int         runningThreads        =  0;
  75:      private    bool        threadingMoreWork    = false;
  76:      private QueueStatus status                = QueueStatus.Stopped;
  77:   
  78:   
  79:      public QueueStatus Status
  80:      {
  81:          get { return status; }
  82:          set { status = value; }
  83:      }
  84:   
  85:   
  86:      public int RunningThreads
  87:      {
  88:          get { return runningThreads; }
  89:          set { runningThreads = value; }
  90:      }
  91:   
  92:   
  93:      public void AssignWork(Action work)
  94:      {
  95:          lock (workQueue)
  96:          {
  97:              workQueue.Enqueue(work);
  98:          }
  99:      }
 100:   
 101:   
 102:      public int WorkQueueSize()
 103:      {
 104:          return workQueue.Count;
 105:      }
 106:   
 107:   
 108:      public void Start()
 109:      {
 110:          if (status != QueueStatus.Stopped) throw new InvalidOperationException("Cannot start when Status != Stopped");
 111:   
 112:          status = QueueStatus.Started;
 113:   
 114:          ThreadMoreWork();
 115:      }
 116:   
 117:   
 118:      private void ThreadMoreWork()
 119:      {
 120:          threadingMoreWork = true;
 121:   
 122:          while (status == QueueStatus.Started && runningThreads < MaxThreads)
 123:          {    // If there's more to do, thread more work
 124:              lock (workQueue)
 125:              {
 126:                  if (workQueue.Count > 0)
 127:                  {
 128:                      lock (threads)
 129:                      {
 130:                          var thread = (ManagedThread)null;
 131:   
 132:                          if ((thread = AvailableThread()) == null)
 133:                          {    // If there's no ManagedThreads not 'Running', create a new one
 134:                              thread = new ManagedThread();
 135:   
 136:                              thread.Completed += delegate(object sender, EventArgs args)
 137:                                  {    // When the thread is completed, reduce the runningthread counter
 138:                                      runningThreads--;
 139:  
 140:                                      lock (workQueue)
 141:                                      {
 142:                                          if (workQueue.Count == 0 && runningThreads == 0)
 143:                                          {    // All work is done, queue is empty
 144:                                              lock (threads)
 145:                                              {
 146:                                                  threads.Clear();
 147:                                              }
 148:   
 149:                                              status                = QueueStatus.Stopped;
 150:                                              threadingMoreWork    = false;
 151:   
 152:                                              if (Completed != null)Completed(this, EventArgs.Empty);
 153:                                          }
 154:                                      }
 155:   
 156:                                      if (!threadingMoreWork) ThreadMoreWork();    // Thread more work again
 157:                                  };
 158:   
 159:                              threads.Add(thread);
 160:                          }
 161:   
 162:                          thread.AssignWork(workQueue.Dequeue());    // Assign work from the queue to the new thread
 163:   
 164:                          runningThreads++;    // Increase the runningthread counter
 165:   
 166:                          thread.Start();        // Start the thread
 167:                      }
 168:                  }
 169:              }
 170:          }
 171:   
 172:          threadingMoreWork = false;
 173:      }
 174:   
 175:   
 176:      public void Stop()
 177:      {
 178:          status = QueueStatus.Stopping;    // Indicate stopping is in progess
 179:   
 180:          lock (threads)
 181:          {
 182:   
 183:              var active    =
 184:                  (    // Get a list of the active threads
 185:                      from    item
 186:                      in        threads
 187:                      where    item.IsRunning
 188:                      select    item
 189:                  ).ToList();
 190:   
 191:              active.ForEach(delegate(ManagedThread thread)
 192:                  {    // Stop each thread
 193:                      thread.Stop();
 194:                  });
 195:   
 196:              RunningThreads = 0;    // There are no more threads running, so set to 0
 197:   
 198:              threads.Clear();    // Clear out the ManagedThread List
 199:   
 200:              if (Completed != null) Completed(this, EventArgs.Empty);    // If there's a completed
 201:                                                                          // handler call it
 202:          }
 203:      }
 204:   
 205:   
 206:      private ManagedThread AvailableThread()
 207:      {
 208:          var matches =
 209:              (    // Use LINQ to get the threads not running
 210:                  from    item
 211:                  in        threads
 212:                  where    !item.IsRunning
 213:                  select    item
 214:              );
 215:   
 216:          if (matches.Count() == 0) return null;    // if there is none, return a null
 217:   
 218:          return matches.First();    // Otherwise, return the first one in the enumeration
 219:      }
 220:  }
 221:   

As you can see, when new work is assigned, we enter it into our ‘Queue<Action>’ class called ‘workQueue’ .  The function ‘ThreadMoreWork’ is used to start up new threads (if needed).  When we create a new thread and add it into ‘threads’ we hook up the ‘Completed’ event of that thread to call ‘ThreadMoreWork’ (if it’s not already running).  This means as soon as a thread is completed, we’re going to check if we need to start another one – If we do, well, we do!  In our ‘stop’ function, we’re using LINQ to find all the running threads, and using the very handy ‘ForEach’ function to Stop all the individual threads.  Then we fire the ‘Completed’ event.

Now we have a simple Console application to demonstrate the classes.  It’s going to queue up 100 pieces of ‘work’ (all will just print a ‘.’ to Console).  We’ll leave the maximum threads at the default we defined earlier (10).  We also have a timer setup to run every 5 seconds and report the progress.  We’ve hooked up the ‘Completed’ event to print ‘Completed’ to console when we’re all done.

 222:   
 223:  class Program
 224:  {
 225:      static void Main(string[] cargs)
 226:      {
 227:          var threadQueue = new ThreadQueue();
 228:          var checkTimer    = new Timer(delegate
 229:              {    // Poll for updated progress every 5 seconds
 230:                  Console.WriteLine("nThread Information Update! Running={0}, Remaining={1}", threadQueue.RunningThreads, threadQueue.WorkQueueSize());
 231:   
 232:              }, null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
 233:   
 234:          threadQueue.Completed += delegate(object sender, EventArgs args)
 235:              {
 236:                  // We've finished, stop the checkTimer and notify Console
 237:                  checkTimer.Dispose();
 238:                  Console.WriteLine("nCompleted!");
 239:              };
 240:   
 241:          for (int i=0; i<100; i++)
 242:          {    // Queue up our test work
 243:              threadQueue.AssignWork(delegate()
 244:                  {
 245:                      Console.Write(".");
 246:   
 247:                      Thread.Sleep(new Random().Next(5000));
 248:                  });
 249:          }
 250:   
 251:          Console.WriteLine("(Press enter to stop early)");
 252:   
 253:          threadQueue.Start();    // Start processing
 254:   
 255:          Console.ReadLine();
 256:   
 257:          if (threadQueue.Status != ThreadQueue.QueueStatus.Stopped) threadQueue.Stop();
 258:   
 259:          Console.ReadLine();    // Pause so we can see the output
 260:      }
 261:  }

Run it, and the output should look a little like this…

(Press enter to stop early)
......................
Thread Information Update! Running=10, Remaining=78
................
Thread Information Update! Running=10, Remaining=62
....................
Thread Information Update! Running=10, Remaining=42
....................
Thread Information Update! Running=10, Remaining=22
......................
Thread Information Update! Running=10, Remaining=0
Completed!

Job done.

Download Visual Studio 2010 Project (8.03k)

Tags: , , , , ,


Mar 10 2010

Object Initializers, delegates, anonymous types, type inference and LINQ

Category: UncategorizedMike Lovell @ 9:04 pm

I thought I’d put together a quick and nasty demo showing the use of object initializers, delegates, anonymous types, type inference and LINQ in one place.

The main reason for the existence of ‘var’ is to support anonymous types (although you’ll notice I use ‘var’ practically everywhere, that’s just my style!).

I can define a new type in my code simply by implying its properties. For example:

   1:    var myType = new
   2:        {
   3:            ParamA = "valuea",
   4:            ParamB = "valueb",
   5:            Number  = 123
   6:        }

The compiler (and the VS.NET environment, as we can use intellisense against it) creates the class definition internally. We can now access the properties of that object, just as if we defined it explicitly:

   1:  Console.WriteLine("{0} {1} {2}", myType.ParamA, myType.ParamB, Number);

Why? Well I guess the most common reason for using anonymous types is to return a subset of data from a LINQ query. This is something we’ll do in the demo.

I like to use the type inference capability of ‘var’ to make my code more (in my mind!) cleaner. For example:

   1:  MyClass classObject = new MyClass();

Can be written as:

   1:  var classObject = new MyClass();

I think the later looks better, it seems kinder of my eyes but it’s a personal preference thing, so don’t follow me on that habit just for the sake of it! :-)

I’m also a heavy user of ‘Object initializers’ rather than constructors. Although when you approach more complex classes there often is a need to control the instantiation of an object more tightly and go with the constructor route. ‘Object initializers’ allow me to write nice readable code because it uses named parameters (note: you can now use named parameters with constructors) and it’s nice to not have to write constructors for simple classes. So my code might look a little like this:

   1:  var classObject = newMyPersonClass()
   2:      {
   3:          FirstName = "Mike",
   4:          LastName = "Lovell"
   5:      };

Delegates and LINQ are pretty vast topics, so I’m not going to attempt to cover them apart from to demonstrate their use in a way that might be helpful, so on to the demo…

First, lets declare a couple of classes to use.  For this we’re going to use a simple class called ‘Person’, and a collection of ‘Person’ (by inheriting List<Person>) called ‘People’

   1:  // Author: Mike Lovell (mike.lovell@gotinker.com)
   2:   
   3:  class Person
   4:  {
   5:      public    stringFirstName = "";
   6:      public    stringLastName = "";
   7:  }
   8:   
   9:   
  10:  class People : List<Person>
  11:  {
  12:  }
  13:  

Now lets define a new instance of our collection and fill it with some test data.  As we haven’t defined any constructors, we’re going to use ‘Object initializers’ to define the values of ‘FirstName’ and ‘LastName’

  14:   
  15:  class Program
  16:  {
  17:      static void Main(string[] args)
  18:      {
  19:          var people = new People();
  20:   
  21:          people.Add(new Person()
  22:              {
  23:                  FirstName    = "Bob",
  24:                  LastName    = "Smith"
  25:              });
  26:   
  27:          people.Add(new Person()
  28:              {
  29:                  FirstName    = "John",
  30:                  LastName    = "Doe"
  31:              });
  32:   
  33:          people.Add(new Person()
  34:              {
  35:                  FirstName    = "Bill",
  36:                  LastName    = "Johnson"
  37:              });
  38:   

Now the delegate, we can use an anonymous delegate to make a rather nifty ‘ForEach’ loop to display the value of each item

  39:          Console.WriteLine("All People:n");
  40:   
  41:          people.ForEach(delegate(Person person)
  42:              {
  43:                  Console.WriteLine("{0} {1}", person.FirstName, person.LastName);
  44:              });
  45:   

Now the LINQ and anonymous type.  We want to find all the instances of ‘Person’ where their first name begins with a ‘b’, then we’re going to create a anonymous type to store their full name in (of course we could just return it to a string array, but this is a contrived example).  We’ll then display the results to ‘Console’IEnumerable<T> does not have a ‘ForEach’ method so I will be using a more conventional ‘for’ loop!

  46:          var bNames    = from i
  47:                          in people
  48:                          where i.FirstName.ToLower().StartsWith("b")
  49:                          select new
  50:                              {
  51:                                  FullName = String.Format("{0} {1}", i.FirstName, i.LastName)
  52:                              };
  53:   
  54:          Console.WriteLine("nFirst names that start with a 'b':n");
  55:   
  56:          foreach (var person in bNames)
  57:          {
  58:              Console.WriteLine(person.FullName);
  59:          }
  60:   
  61:          Console.ReadLine();
  62:      }
  63:  }

And there we have it.

Download Visual Studio 2010 Project (6.69k)

Tags: , , , , , , ,