If you use the built-in ‘ThreadPool’, you lose a lot of control over the thread after you queue up your work. Here’s an example of a custom thread queue. You can add more work to it and adjust the maximum threads whilst it’s running, like the built-in ‘ThreadPool’ – But you can issue a ‘Stop’ and terminate all the currently running threads. You can expand the classes to add even more functionality but I’ve just covered the basics.
As always, I’m going to handle all my ‘work’ using the ‘Action’ class. I’ll be using the ‘Queue’ class to hold all my ‘work’ inside, and a ‘List’ to hold my threads, for which I’ll be making a simple wrapper for the ‘Thread’ class called ‘ManagedThread’.
First lets define the ‘ManagedThread’ class. We’ll tack onto the end of the ‘work’ a new delegate to call a ‘Completed’ event. So this will code will run after the ‘work’ is completed and fire an event that can be monitored later in our code.
1: // Author: Mike Lovell (mike.lovell@gotinker.com)
2:
3: class ManagedThread
4: {
5: private Thread thread;
6: private Action work;
7: private bool isRunning = false;
8:
9:
10: public bool IsRunning
11: {
12: get { return isRunning; }
13: }
14:
15:
16: public event EventHandler<EventArgs> Completed;
17:
18:
19: public void AssignWork(Action work)
20: {
21: this.work = null; // Clear old work
22:
23: this.work += work;
24: }
25:
26:
27: public void Start()
28: {
29: if (isRunning) throw new InvalidOperationException("The thread is already started");
30:
31: work += delegate() // Add our completed handler to the end of the work
32: {
33: if (Completed != null) Completed(this, EventArgs.Empty); // If there's a completed
34: // handled, call it!
35:
36: isRunning = false;
37:
38: thread = null;
39: };
40:
41: thread = new Thread(new ThreadStart(work));
42:
43: isRunning = true;
44:
45: thread.Start();
46: }
47:
48:
49: public void Stop()
50: {
51: if (!isRunning) return; // Not running anyway
52:
53: if (thread.ThreadState != ThreadState.Stopped) thread.Abort();
54: }
55: }
56:
Now lets create a class to hold the threads and handle the work queue. We’re going to add a ‘Completed’ event to this class also, but this is going to only fire when we’re ‘Completed’ ALL the work. It can potential fire more than once (it will fire when the queue is empty, and there are no threads running – i.e. all the work assigned has been completed).
57:
58: class ThreadQueue
59: {
60: public enum QueueStatus
61: {
62: Stopped,
63: Stopping,
64: Started
65: }
66:
67:
68: public event EventHandler<EventArgs> Completed;
69:
70: private Queue<Action> workQueue = new Queue<Action>();
71: private List<ManagedThread> threads = new List<ManagedThread>();
72:
73: public int MaxThreads = 10;
74: private int runningThreads = 0;
75: private bool threadingMoreWork = false;
76: private QueueStatus status = QueueStatus.Stopped;
77:
78:
79: public QueueStatus Status
80: {
81: get { return status; }
82: set { status = value; }
83: }
84:
85:
86: public int RunningThreads
87: {
88: get { return runningThreads; }
89: set { runningThreads = value; }
90: }
91:
92:
93: public void AssignWork(Action work)
94: {
95: lock (workQueue)
96: {
97: workQueue.Enqueue(work);
98: }
99: }
100:
101:
102: public int WorkQueueSize()
103: {
104: return workQueue.Count;
105: }
106:
107:
108: public void Start()
109: {
110: if (status != QueueStatus.Stopped) throw new InvalidOperationException("Cannot start when Status != Stopped");
111:
112: status = QueueStatus.Started;
113:
114: ThreadMoreWork();
115: }
116:
117:
118: private void ThreadMoreWork()
119: {
120: threadingMoreWork = true;
121:
122: while (status == QueueStatus.Started && runningThreads < MaxThreads)
123: { // If there's more to do, thread more work
124: lock (workQueue)
125: {
126: if (workQueue.Count > 0)
127: {
128: lock (threads)
129: {
130: var thread = (ManagedThread)null;
131:
132: if ((thread = AvailableThread()) == null)
133: { // If there's no ManagedThreads not 'Running', create a new one
134: thread = new ManagedThread();
135:
136: thread.Completed += delegate(object sender, EventArgs args)
137: { // When the thread is completed, reduce the runningthread counter
138: runningThreads--;
139:
140: lock (workQueue)
141: {
142: if (workQueue.Count == 0 && runningThreads == 0)
143: { // All work is done, queue is empty
144: lock (threads)
145: {
146: threads.Clear();
147: }
148:
149: status = QueueStatus.Stopped;
150: threadingMoreWork = false;
151:
152: if (Completed != null)Completed(this, EventArgs.Empty);
153: }
154: }
155:
156: if (!threadingMoreWork) ThreadMoreWork(); // Thread more work again
157: };
158:
159: threads.Add(thread);
160: }
161:
162: thread.AssignWork(workQueue.Dequeue()); // Assign work from the queue to the new thread
163:
164: runningThreads++; // Increase the runningthread counter
165:
166: thread.Start(); // Start the thread
167: }
168: }
169: }
170: }
171:
172: threadingMoreWork = false;
173: }
174:
175:
176: public void Stop()
177: {
178: status = QueueStatus.Stopping; // Indicate stopping is in progess
179:
180: lock (threads)
181: {
182:
183: var active =
184: ( // Get a list of the active threads
185: from item
186: in threads
187: where item.IsRunning
188: select item
189: ).ToList();
190:
191: active.ForEach(delegate(ManagedThread thread)
192: { // Stop each thread
193: thread.Stop();
194: });
195:
196: RunningThreads = 0; // There are no more threads running, so set to 0
197:
198: threads.Clear(); // Clear out the ManagedThread List
199:
200: if (Completed != null) Completed(this, EventArgs.Empty); // If there's a completed
201: // handler call it
202: }
203: }
204:
205:
206: private ManagedThread AvailableThread()
207: {
208: var matches =
209: ( // Use LINQ to get the threads not running
210: from item
211: in threads
212: where !item.IsRunning
213: select item
214: );
215:
216: if (matches.Count() == 0) return null; // if there is none, return a null
217:
218: return matches.First(); // Otherwise, return the first one in the enumeration
219: }
220: }
221:
As you can see, when new work is assigned, we enter it into our ‘Queue<Action>’ class called ‘workQueue’ . The function ‘ThreadMoreWork’ is used to start up new threads (if needed). When we create a new thread and add it into ‘threads’ we hook up the ‘Completed’ event of that thread to call ‘ThreadMoreWork’ (if it’s not already running). This means as soon as a thread is completed, we’re going to check if we need to start another one – If we do, well, we do! In our ‘stop’ function, we’re using LINQ to find all the running threads, and using the very handy ‘ForEach’ function to Stop all the individual threads. Then we fire the ‘Completed’ event.
Now we have a simple Console application to demonstrate the classes. It’s going to queue up 100 pieces of ‘work’ (all will just print a ‘.’ to Console). We’ll leave the maximum threads at the default we defined earlier (10). We also have a timer setup to run every 5 seconds and report the progress. We’ve hooked up the ‘Completed’ event to print ‘Completed’ to console when we’re all done.
222:
223: class Program
224: {
225: static void Main(string[] cargs)
226: {
227: var threadQueue = new ThreadQueue();
228: var checkTimer = new Timer(delegate
229: { // Poll for updated progress every 5 seconds
230: Console.WriteLine("nThread Information Update! Running={0}, Remaining={1}", threadQueue.RunningThreads, threadQueue.WorkQueueSize());
231:
232: }, null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
233:
234: threadQueue.Completed += delegate(object sender, EventArgs args)
235: {
236: // We've finished, stop the checkTimer and notify Console
237: checkTimer.Dispose();
238: Console.WriteLine("nCompleted!");
239: };
240:
241: for (int i=0; i<100; i++)
242: { // Queue up our test work
243: threadQueue.AssignWork(delegate()
244: {
245: Console.Write(".");
246:
247: Thread.Sleep(new Random().Next(5000));
248: });
249: }
250:
251: Console.WriteLine("(Press enter to stop early)");
252:
253: threadQueue.Start(); // Start processing
254:
255: Console.ReadLine();
256:
257: if (threadQueue.Status != ThreadQueue.QueueStatus.Stopped) threadQueue.Stop();
258:
259: Console.ReadLine(); // Pause so we can see the output
260: }
261: }
Run it, and the output should look a little like this…
(Press enter to stop early)
......................
Thread Information Update! Running=10, Remaining=78
................
Thread Information Update! Running=10, Remaining=62
....................
Thread Information Update! Running=10, Remaining=42
....................
Thread Information Update! Running=10, Remaining=22
......................
Thread Information Update! Running=10, Remaining=0
Completed!
Job done.
