/////////////////////////////////////////////////////////////////////////////// // // Microsoft Research Singularity // // Copyright (c) Microsoft Corporation. All rights reserved. // // File: FatThreadPool.sg // // Note: // using System.Collections; using System.Threading; using Microsoft.SingSharp; using Microsoft.Singularity.Channels; namespace Microsoft.Singularity.Services.Fat.Fs { /// This class implements a non-Singleton /// threadpool and has a configurable work queue /// size. internal sealed class FatThreadPool { object monitor; TContainer>! commandContainer; int queuedItems; readonly int maxQueuedItems; volatile int threadCount; volatile bool shutdown; ManualResetEvent shutdownEvent; [ Microsoft.Contracts.NotDelayed ] internal FatThreadPool(int maxThreads, int maxQueuedItems) requires maxThreads > 0; requires maxQueuedItems > 0; requires maxThreads <= maxQueuedItems; { this.monitor = new object(); this.commandContainer = new TContainer>( new TQueue() ); this.queuedItems = 0; this.maxQueuedItems = maxQueuedItems; this.threadCount = maxThreads; this.shutdown = false; this.shutdownEvent = new ManualResetEvent(false); base(); for (int i = 0; i < maxThreads; i++) { Thread t = new Thread(new ThreadStart(WorkerMain)); t.Start(); } } /// Queues call to constructor supplied /// delegate with user supplied object. This method /// blocks if maximum queue size is reached. internal void QueueUserWorkItem([Claims] AsyncCommand! command) { Monitor.Enter(this.monitor); while (this.queuedItems == this.maxQueuedItems) { Monitor.Wait(this.monitor); } TQueue commands = commandContainer.Acquire(); commands.AddHead(command); commandContainer.Release(commands); this.queuedItems++; Monitor.Pulse(this.monitor); Monitor.Exit(this.monitor); } private void WorkerMain() { while (true) { AsyncCommand command; lock (this.monitor) { while (queuedItems == 0 && !shutdown) { Monitor.Wait(this.monitor); } if (queuedItems == 0 && shutdown) { break; } TQueue commands = commandContainer.Acquire(); command = commands.ExtractTail(); assert(command != null); commandContainer.Release(commands); if (this.queuedItems-- == this.maxQueuedItems) { Monitor.Pulse(this.monitor); } } command.Execute(); ((ITracked)command).Dispose(); } lock (this.monitor) { if (--this.threadCount == 0) { shutdownEvent.Set(); } } } /// /// Shutdown thread pool instance. Caller blocks until /// the threads in the pool have finished outstanding /// work items. /// internal void Shutdown() { lock (this.monitor) { shutdown = true; Monitor.PulseAll(this.monitor); } shutdownEvent.WaitOne(); TQueue commands = commandContainer.Acquire(); DebugStub.Assert(commands.Empty); commandContainer.Release(commands); } } }