////////////////////////////////////////////////////////////////////////////////
//
// Microsoft Research Singularity
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// Note: Multiplexes any number of input pipes to a single output pipe.
// Also features a channel to submit new input pipes.
//
using System;
using System.Threading;
using Microsoft.SingSharp;
using Microsoft.Singularity;
using Microsoft.Singularity.Channels;
using Tty = Microsoft.Singularity.Io.Tty;
namespace Microsoft.Singularity.Io
{
public contract PipeMultiplexControl {
in message NewInput(UnicodePipeContract.Exp:EXPMOVABLE! client);
out message Ack();
state START: NewInput? -> Ack! -> START;
}
///
/// Allows creating and managing a multiplexer that forwards data from several pipes
/// to a single output pipe.
///
public sealed class PipeMultiplexer : ITracked {
private class PipeThread {
TRef initialClientArgument;
TRef! threadStartupArgument1;
TRef! threadStartupArgument2;
public PipeThread([Claims] UnicodePipeContract.Imp:READY! output,
[Claims] PipeMultiplexControl.Exp:START! control,
[Claims] UnicodePipeContract.Exp:READY? initialClient)
{
this.threadStartupArgument1 = new TRef(output);
this.threadStartupArgument2 = new TRef(control);
if (initialClient != null) {
this.initialClientArgument =
new TRef(initialClient);
}
}
public void Start() {
UnicodePipeContract.Exp:READY? initialClient = null;
if (this.initialClientArgument != null) {
initialClient = this.initialClientArgument.Acquire();
}
MessagePump(this.threadStartupArgument1.Acquire(),
this.threadStartupArgument2.Acquire(),
initialClient);
}
private static void MessagePump([Claims] UnicodePipeContract.Imp:READY! output,
[Claims] PipeMultiplexControl.Exp:START! control,
[Claims] UnicodePipeContract.Exp:READY? initialClient)
{
ESet waitingForOutputAck =
new ESet();
ESet clientWaitingForAck =
new ESet();
ESet outputReady =
new ESet();
ESet clients =
new ESet();
if (initialClient != null) {
clients.Add(initialClient);
}
outputReady.Add(output);
bool done = false;
try {
while (!done) {
switch receive {
case control.NewInput(client):
//DebugStub.WriteLine("pipe multiplexer new client [{0}]",
// __arglist(clients.Count+1));
client.SendMoved();
clients.Add(client);
control.SendAck();
break;
case control.ChannelClosed():
//DebugStub.WriteLine("pipe multiplexer control channel closed");
done = true;
break;
case client.Write(buffer, index, count) in clients &&
outputReady.Head(outputEp):
// a client wants to write and the output is ready
//DebugStub.WriteLine("pipe multiplexer [{0}] client writes",
// __arglist(clients.Count+1));
outputEp.SendWrite(buffer, index, count);
clientWaitingForAck.Add(client);
waitingForOutputAck.Add(outputEp);
continue;
case outputEp.AckWrite(buffer) in waitingForOutputAck &&
clientWaitingForAck.Head(client):
// a client is waiting for ack and output sent us ack
//DebugStub.WriteLine("pipe multiplexer: output Acks");
client.SendAckWrite(buffer);
clients.Add(client);
outputReady.Add(outputEp);
continue;
case client.ChannelClosed() in clients:
//DebugStub.WriteLine("pipe multiplexer: client departs");
delete client;
continue;
case outputEp.ChannelClosed() in waitingForOutputAck:
DebugStub.WriteLine("pipe multiplexer: output closes");
delete outputEp;
done = true;
break;
}
}
}
finally {
waitingForOutputAck.Dispose();
outputReady.Dispose();
clientWaitingForAck.Dispose();
clients.Dispose();
delete control;
}
}
}
/// Can be null when control channel closed
private PipeMultiplexControl.Imp:START controlChannel;
private PipeMultiplexer([Claims] PipeMultiplexControl.Imp:START! control) {
this.controlChannel = control;
}
public static PipeMultiplexer!
Start([Claims] UnicodePipeContract.Imp:READY! output,
[Claims] UnicodePipeContract.Exp:READY? initialClient)
{
PipeMultiplexControl.Imp! controlImp;
PipeMultiplexControl.Exp! controlExp;
PipeMultiplexControl.NewChannel(out controlImp, out controlExp);
PipeThread pipeThread = new PipeThread(output, controlExp, initialClient);
Thread t = new Thread(new ThreadStart(pipeThread.Start));
t.Start();
return new PipeMultiplexer(controlImp);
}
///
/// Adds a fresh input pipe to the multiplexer and returns the client end.
///
public UnicodePipeContract.Imp:READY FreshClient() {
expose (this) {
PipeMultiplexControl.Imp control = this.controlChannel;
if (control == null) return null; // channel closed
UnicodePipeContract.Imp! client;
UnicodePipeContract.Exp! exp;
UnicodePipeContract.NewChannel(out client, out exp, UnicodePipeContract.EXPMOVABLE.Value);
control.SendNewInput(exp);
switch receive {
case control.Ack():
// put client into correct state:
switch receive {
case client.Moved():
return client;
case client.ChannelClosed():
delete client;
return null;
}
case control.ChannelClosed():
delete control;
delete client;
this.controlChannel = null;
return null;
}
}
}
public void Dispose()
{
delete this.controlChannel;
}
void ITracked.Acquire() {}
void ITracked.Release() {}
void ITracked.Expose() {}
void ITracked.UnExpose() {}
}
}