/////////////////////////////////////////////////////////////////////////////// // // Microsoft Research Singularity // // Copyright (c) Microsoft Corporation. All rights reserved. // // File: NetStackThreadPool.sg // // Note: adapted from CassiniThreadPool.sg // #if THREAD_POOL using System.Collections; using System.Threading; namespace NetStack.Runtime { /// This class implements a non-Singleton /// threadpool and has a configurable work queue /// size. public sealed class NetStackThreadPool { object monitor; Queue commands; // Queue of ThreadStart int queuedItems; readonly int maxQueuedItems; volatile int threadCount; volatile bool shutdown; ManualResetEvent shutdownEvent; [Microsoft.Contracts.NotDelayed] public NetStackThreadPool(int maxThreads, int maxQueuedItems) requires maxThreads > 0; requires maxQueuedItems > 0; requires maxThreads <= maxQueuedItems; { this.monitor = new object(); this.commands = new Queue(); this.queuedItems = 0; this.maxQueuedItems = maxQueuedItems; this.threadCount = maxThreads; this.shutdown = false; this.shutdownEvent = new ManualResetEvent(false); for (int i = 0; i < maxThreads; i++) { Thread t = new Thread(new ThreadStart(WorkerMain)); t.Start(); } } /// Queues call to constructor supplied /// delegate with user supplied object. This method /// blocks if maximum queue size is reached. public void QueueUserWorkItem(ThreadStart! command) { Monitor.Enter(this.monitor); while (this.queuedItems == this.maxQueuedItems) { Monitor.Wait(this.monitor); } commands.Enqueue(command); this.queuedItems++; Monitor.Pulse(this.monitor); Monitor.Exit(this.monitor); } private void WorkerMain() { while (true) { ThreadStart command; lock (this.monitor) { while (queuedItems == 0 && !shutdown) { Monitor.Wait(this.monitor); } if (queuedItems == 0 && shutdown) { break; } command = (ThreadStart) (commands.Dequeue()); assert(command != null); if (this.queuedItems-- == this.maxQueuedItems) { Monitor.Pulse(this.monitor); } } command(); } lock (this.monitor) { if (--this.threadCount == 0) { shutdownEvent.Set(); } } } /// /// Shutdown thread pool instance. Caller blocks until /// the threads in the pool have finished outstanding /// work items. /// internal void Shutdown() { lock (this.monitor) { shutdown = true; Monitor.PulseAll(this.monitor); } shutdownEvent.WaitOne(); shutdownEvent.Reset(); Microsoft.Singularity.DebugStub.Assert(commands.Count == 0); } } } #endif // THREAD_POOL