//////////////////////////////////////////////////////////////////////////////// // // Microsoft Research Singularity // // Copyright (c) Microsoft Corporation. All rights reserved. // // File: PipeMultiplexer.sg // // Note: Multiplexes any number of input pipes to a single output pipe. // Also features a channel to submit new input pipes. // using System; using System.Threading; using Microsoft.SingSharp; using Microsoft.Singularity; using Microsoft.Singularity.Channels; using Tty = Microsoft.Singularity.Io.Tty2006; namespace Microsoft.Singularity.Io { public contract PipeMultiplexControl { in message NewInput(UnicodePipeContract.Exp:EXPMOVABLE! client); out message Ack(); state START: NewInput? -> Ack! -> START; } /// /// Allows creating and managing a multiplexer that forwards data from several pipes /// to a single output pipe. /// public sealed class PipeMultiplexer : ITracked { private class PipeThread { TRef initialClientArgument; TRef! threadStartupArgument1; TRef! threadStartupArgument2; public PipeThread([Claims] UnicodePipeContract.Imp:READY! output, [Claims] PipeMultiplexControl.Exp:START! control, [Claims] UnicodePipeContract.Exp:READY? initialClient) { this.threadStartupArgument1 = new TRef(output); this.threadStartupArgument2 = new TRef(control); if (initialClient != null) { this.initialClientArgument = new TRef(initialClient); } } public void Start() { UnicodePipeContract.Exp:READY? initialClient = null; if (this.initialClientArgument != null) { initialClient = this.initialClientArgument.Acquire(); } MessagePump(this.threadStartupArgument1.Acquire(), this.threadStartupArgument2.Acquire(), initialClient); } private static void MessagePump([Claims] UnicodePipeContract.Imp:READY! output, [Claims] PipeMultiplexControl.Exp:START! control, [Claims] UnicodePipeContract.Exp:READY? initialClient) { ESet waitingForOutputAck = new ESet(); ESet clientWaitingForAck = new ESet(); ESet outputReady = new ESet(); ESet clients = new ESet(); if (initialClient != null) { clients.Add(initialClient); } outputReady.Add(output); bool done = false; try { while (!done) { switch receive { case control.NewInput(client): //DebugStub.WriteLine("pipe multiplexer new client [{0}]", // __arglist(clients.Count+1)); client.SendMoved(); clients.Add(client); control.SendAck(); break; case control.ChannelClosed(): //DebugStub.WriteLine("pipe multiplexer control channel closed"); done = true; break; case client.Write(buffer, index, count) in clients && outputReady.Head(outputEp): // a client wants to write and the output is ready //DebugStub.WriteLine("pipe multiplexer [{0}] client writes", // __arglist(clients.Count+1)); outputEp.SendWrite(buffer, index, count); clientWaitingForAck.Add(client); waitingForOutputAck.Add(outputEp); continue; case outputEp.AckWrite(buffer) in waitingForOutputAck && clientWaitingForAck.Head(client): // a client is waiting for ack and output sent us ack //DebugStub.WriteLine("pipe multiplexer: output Acks"); client.SendAckWrite(buffer); clients.Add(client); outputReady.Add(outputEp); continue; case client.ChannelClosed() in clients: //DebugStub.WriteLine("pipe multiplexer: client departs"); delete client; continue; case outputEp.ChannelClosed() in waitingForOutputAck: DebugStub.WriteLine("pipe multiplexer: output closes"); delete outputEp; done = true; break; } } } finally { waitingForOutputAck.Dispose(); outputReady.Dispose(); clientWaitingForAck.Dispose(); clients.Dispose(); delete control; } } } /// Can be null when control channel closed private PipeMultiplexControl.Imp:START controlChannel; private PipeMultiplexer([Claims] PipeMultiplexControl.Imp:START! control) { this.controlChannel = control; } public static PipeMultiplexer! Start([Claims] UnicodePipeContract.Imp:READY! output, [Claims] UnicodePipeContract.Exp:READY? initialClient) { PipeMultiplexControl.Imp! controlImp; PipeMultiplexControl.Exp! controlExp; PipeMultiplexControl.NewChannel(out controlImp, out controlExp); PipeThread pipeThread = new PipeThread(output, controlExp, initialClient); Thread t = new Thread(new ThreadStart(pipeThread.Start)); t.Start(); return new PipeMultiplexer(controlImp); } /// /// Adds a fresh input pipe to the multiplexer and returns the client end. /// public UnicodePipeContract.Imp:READY FreshClient() { expose (this) { PipeMultiplexControl.Imp control = this.controlChannel; if (control == null) return null; // channel closed UnicodePipeContract.Imp! client; UnicodePipeContract.Exp! exp; UnicodePipeContract.NewChannel(out client, out exp, UnicodePipeContract.EXPMOVABLE.Value); control.SendNewInput(exp); switch receive { case control.Ack(): // put client into correct state: switch receive { case client.Moved(): return client; case client.ChannelClosed(): delete client; return null; } case control.ChannelClosed(): delete control; delete client; this.controlChannel = null; return null; } } } public void Dispose() { delete this.controlChannel; } void ITracked.Acquire() {} void ITracked.Release() {} void ITracked.Expose() {} void ITracked.UnExpose() {} } }