//////////////////////////////////////////////////////////////////////////////// // // Microsoft Research Singularity // // Copyright (c) Microsoft Corporation. All rights reserved. // // File: FSFileInstanceWorker.sg // // Note: Handles servicing read/write requests for all active files // using System; using System.Threading; using Microsoft.SingSharp; using Microsoft.Singularity; using Microsoft.Singularity.Channels; using Microsoft.Singularity.Directory; using Iso9660; namespace Microsoft.Singularity.FileSystem { [CLSCompliant(false)] public enum FileSystemEvent : ushort { StartRead = 1, StopRead = 2, CacheHit = 3, ProcessRequest = 4, StartTrackEndpoint = 5, StopTrackEndpoint = 6, StartBind = 7, CacheMiss = 8, StopBind = 9, } public class FSFileInstanceWorker { private static bool started = false; private static bool terminating = false; private static bool endpointCaptured = false; private static bool endpointAvailable = true; private static object lockToken = new Object(); private static Iso9660FileStream pendingFile; private static TRef pendingEPT = null; private static TRef signalEPT; private static FSThreadPool threadPool; public static void Start() { terminating = false; started = false; threadPool = new FSThreadPool(1, new FSThreadPoolCallback(ProcessRequest)); Thread t = new Thread(new ThreadStart(Run)); t.Start(); lock (lockToken) { while (!started) Monitor.Wait(lockToken); } } // called from FSWorker when a connect message is received // also when a worker thread is finished public static bool TrackEndpoint([Claims] FileContract.Exp:Ready s, Iso9660FileStream file) { lock (lockToken) { while (!endpointAvailable && !terminating) { Monitor.Wait(lockToken); } if (terminating) { delete s; return false; } endpointAvailable = false; endpointCaptured = false; pendingFile = file; pendingEPT = new TRef(s); ThreadPoolControlContract.Imp signalEP = signalEPT.Acquire(); signalEP.SendTrackEndpoint(); //signalEP.RecvAckTrackEndpoint(); while (!endpointCaptured && !terminating) { Monitor.Wait(lockToken); } endpointAvailable = true; Monitor.PulseAll(lockToken); if (pendingEPT != null) { signalEPT.Release(signalEP); delete pendingEPT.Acquire(); return false; } else { signalEP.RecvAckTrackEndpoint(); signalEPT.Release(signalEP); return true; } } } public static void Terminate() { lock (lockToken) { ThreadPoolControlContract.Imp signalEP = signalEPT.Acquire(); signalEP.SendTerminate(); while (!terminating) { Monitor.Wait(lockToken); } signalEP.RecvAckTerminate(); started = false; delete signalEP; } } // worker thread that actually services requests protected static void Run() { bool success; EMap fileSet = new EMap(); ThreadPoolControlContract.Imp! imp; ThreadPoolControlContract.Exp! exp; ThreadPoolControlContract.NewChannel(out imp, out exp); signalEPT = new TRef(imp); lock (lockToken) { endpointAvailable = true; started = true; Monitor.Pulse(lockToken); } for (bool run = true; run;) { //fileSet = fileSetT.Acquire(); //byte[] buf; switch receive { case exp.TrackEndpoint(): lock (lockToken) { endpointCaptured = true; Tracing.Log(Tracing.Debug,"Track Start"); if (!terminating) { FileContract.Exp newEP = pendingEPT.Acquire(); pendingEPT = null; fileSet.Add(newEP, pendingFile); } Monitor.PulseAll(lockToken); exp.SendAckTrackEndpoint(); Tracing.Log(Tracing.Debug,"Track end"); } break; case exp.Terminate(): lock (lockToken) { terminating = true; threadPool.KillPool(); Monitor.PulseAll(lockToken); } exp.SendAckTerminate(); run = false; break; case ep.Read(shBuf, bufOffset, fileOffset, maxLength) in fileSet~>file: //Stdio.RawDevice.SetDebug(); Tracing.Log(Tracing.Debug,"Read start"); threadPool.EnqueueItem( new FSFileRequestInfo( FSRequestAction.Read, new TRef(ep), new ByteContainer(shBuf), file, bufOffset, fileOffset, maxLength)); break; case ep.Close() in fileSet~>file: file.Close(); ep.SendAckClose(); delete ep; break; case ep.ChannelClosed() in fileSet~>file: file.Close(); delete ep; break; case unsatisfiable: //DebugStub.Print("FSFileWorker shutting down\n"); run = false; break; //return; }//switch //fileSetT.Release(fileSet); } //for fileSet.Dispose(); delete exp; } //run protected static void ProcessRequest(FSRequestInfo request) { FSFileRequestInfo info = (FSFileRequestInfo)request; FileContract.Exp ep = info.endpointT.Acquire(); Monitoring.Log(Monitoring.Provider.Iso9660, (ushort)FileSystemEvent.ProcessRequest); if (info.action == FSRequestAction.Read) { info.fileStream.Seek(info.fileOffset, System.IO.SeekOrigin.Begin); long totalRead = (long)info.fileStream.Read(info.buf, (int)info.bufOffset, (int)info.maxLength); ep.SendAckRead(info.buf.GetVector(), totalRead, 0); Tracing.Log(Tracing.Debug,"Read end"); TrackEndpoint(ep, info.fileStream); } else { throw new Exception("Invalid FS Request Action!"); } } // ProcessRequest } //FSFileWorker class } //namespace