215 lines
8.5 KiB
Plaintext
215 lines
8.5 KiB
Plaintext
////////////////////////////////////////////////////////////////////////////////
|
|
//
|
|
// Microsoft Research Singularity
|
|
//
|
|
// Copyright (c) Microsoft Corporation. All rights reserved.
|
|
//
|
|
// Note: Multiplexes any number of input pipes to a single output pipe.
|
|
// Also features a channel to submit new input pipes.
|
|
//
|
|
|
|
using System;
|
|
using System.Threading;
|
|
using Microsoft.SingSharp;
|
|
using Microsoft.Singularity;
|
|
using Microsoft.Singularity.Channels;
|
|
using Tty = Microsoft.Singularity.Io.Tty;
|
|
|
|
namespace Microsoft.Singularity.Io
|
|
{
|
|
|
|
public contract PipeMultiplexControl {
|
|
in message NewInput(UnicodePipeContract.Exp:EXPMOVABLE! client);
|
|
out message Ack();
|
|
|
|
state START: NewInput? -> Ack! -> START;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Allows creating and managing a multiplexer that forwards data from several pipes
|
|
/// to a single output pipe.
|
|
/// </summary>
|
|
public sealed class PipeMultiplexer : ITracked {
|
|
|
|
private class PipeThread {
|
|
TRef<UnicodePipeContract.Exp:READY> initialClientArgument;
|
|
TRef<UnicodePipeContract.Imp:READY>! threadStartupArgument1;
|
|
TRef<PipeMultiplexControl.Exp:START>! threadStartupArgument2;
|
|
|
|
|
|
public PipeThread([Claims] UnicodePipeContract.Imp:READY! output,
|
|
[Claims] PipeMultiplexControl.Exp:START! control,
|
|
[Claims] UnicodePipeContract.Exp:READY? initialClient)
|
|
{
|
|
this.threadStartupArgument1 = new TRef<UnicodePipeContract.Imp:READY>(output);
|
|
this.threadStartupArgument2 = new TRef<PipeMultiplexControl.Exp:START>(control);
|
|
if (initialClient != null) {
|
|
this.initialClientArgument =
|
|
new TRef<UnicodePipeContract.Exp:READY>(initialClient);
|
|
}
|
|
}
|
|
|
|
|
|
public void Start() {
|
|
UnicodePipeContract.Exp:READY? initialClient = null;
|
|
if (this.initialClientArgument != null) {
|
|
initialClient = this.initialClientArgument.Acquire();
|
|
}
|
|
MessagePump(this.threadStartupArgument1.Acquire(),
|
|
this.threadStartupArgument2.Acquire(),
|
|
initialClient);
|
|
}
|
|
|
|
|
|
private static void MessagePump([Claims] UnicodePipeContract.Imp:READY! output,
|
|
[Claims] PipeMultiplexControl.Exp:START! control,
|
|
[Claims] UnicodePipeContract.Exp:READY? initialClient)
|
|
{
|
|
ESet<UnicodePipeContract.Imp:ACK> waitingForOutputAck =
|
|
new ESet<UnicodePipeContract.Imp:ACK>();
|
|
|
|
ESet<UnicodePipeContract.Exp:ACK> clientWaitingForAck =
|
|
new ESet<UnicodePipeContract.Exp:ACK>();
|
|
|
|
ESet<UnicodePipeContract.Imp:READY> outputReady =
|
|
new ESet<UnicodePipeContract.Imp:READY>();
|
|
|
|
ESet<UnicodePipeContract.Exp:READY> clients =
|
|
new ESet<UnicodePipeContract.Exp:READY>();
|
|
|
|
if (initialClient != null) {
|
|
clients.Add(initialClient);
|
|
}
|
|
outputReady.Add(output);
|
|
|
|
bool done = false;
|
|
|
|
try {
|
|
while (!done) {
|
|
switch receive {
|
|
|
|
case control.NewInput(client):
|
|
//DebugStub.WriteLine("pipe multiplexer new client [{0}]",
|
|
// __arglist(clients.Count+1));
|
|
client.SendMoved();
|
|
clients.Add(client);
|
|
control.SendAck();
|
|
break;
|
|
|
|
case control.ChannelClosed():
|
|
//DebugStub.WriteLine("pipe multiplexer control channel closed");
|
|
done = true;
|
|
break;
|
|
|
|
case client.Write(buffer, index, count) in clients &&
|
|
outputReady.Head(outputEp):
|
|
// a client wants to write and the output is ready
|
|
//DebugStub.WriteLine("pipe multiplexer [{0}] client writes",
|
|
// __arglist(clients.Count+1));
|
|
outputEp.SendWrite(buffer, index, count);
|
|
clientWaitingForAck.Add(client);
|
|
waitingForOutputAck.Add(outputEp);
|
|
continue;
|
|
|
|
case outputEp.AckWrite(buffer) in waitingForOutputAck &&
|
|
clientWaitingForAck.Head(client):
|
|
// a client is waiting for ack and output sent us ack
|
|
//DebugStub.WriteLine("pipe multiplexer: output Acks");
|
|
client.SendAckWrite(buffer);
|
|
clients.Add(client);
|
|
outputReady.Add(outputEp);
|
|
continue;
|
|
|
|
case client.ChannelClosed() in clients:
|
|
//DebugStub.WriteLine("pipe multiplexer: client departs");
|
|
delete client;
|
|
continue;
|
|
|
|
case outputEp.ChannelClosed() in waitingForOutputAck:
|
|
DebugStub.WriteLine("pipe multiplexer: output closes");
|
|
delete outputEp;
|
|
done = true;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
finally {
|
|
waitingForOutputAck.Dispose();
|
|
outputReady.Dispose();
|
|
clientWaitingForAck.Dispose();
|
|
clients.Dispose();
|
|
delete control;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Can be null when control channel closed
|
|
private PipeMultiplexControl.Imp:START controlChannel;
|
|
|
|
private PipeMultiplexer([Claims] PipeMultiplexControl.Imp:START! control) {
|
|
this.controlChannel = control;
|
|
}
|
|
|
|
public static PipeMultiplexer!
|
|
Start([Claims] UnicodePipeContract.Imp:READY! output,
|
|
[Claims] UnicodePipeContract.Exp:READY? initialClient)
|
|
{
|
|
PipeMultiplexControl.Imp! controlImp;
|
|
PipeMultiplexControl.Exp! controlExp;
|
|
PipeMultiplexControl.NewChannel(out controlImp, out controlExp);
|
|
|
|
PipeThread pipeThread = new PipeThread(output, controlExp, initialClient);
|
|
|
|
Thread t = new Thread(new ThreadStart(pipeThread.Start));
|
|
t.Start();
|
|
return new PipeMultiplexer(controlImp);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Adds a fresh input pipe to the multiplexer and returns the client end.
|
|
/// </summary>
|
|
public UnicodePipeContract.Imp:READY FreshClient() {
|
|
expose (this) {
|
|
PipeMultiplexControl.Imp control = this.controlChannel;
|
|
if (control == null) return null; // channel closed
|
|
|
|
UnicodePipeContract.Imp! client;
|
|
UnicodePipeContract.Exp! exp;
|
|
UnicodePipeContract.NewChannel(out client, out exp, UnicodePipeContract.EXPMOVABLE.Value);
|
|
control.SendNewInput(exp);
|
|
switch receive {
|
|
case control.Ack():
|
|
// put client into correct state:
|
|
|
|
switch receive {
|
|
case client.Moved():
|
|
return client;
|
|
|
|
case client.ChannelClosed():
|
|
delete client;
|
|
return null;
|
|
}
|
|
|
|
case control.ChannelClosed():
|
|
delete control;
|
|
delete client;
|
|
this.controlChannel = null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
delete this.controlChannel;
|
|
}
|
|
|
|
void ITracked.Acquire() {}
|
|
void ITracked.Release() {}
|
|
void ITracked.Expose() {}
|
|
void ITracked.UnExpose() {}
|
|
|
|
}
|
|
}
|