//////////////////////////////////////////////////////////////////////////////// // // Microsoft Research Singularity // // Copyright (c) Microsoft Corporation. All rights reserved. // // Note: Provider-side helper for the IP Channel Contract // using System; using System.Net.IP; using System.Threading; using Microsoft.SingSharp; using Microsoft.Singularity.Channels; using NetStack.Contracts; using NetStack.Runtime; namespace NetStack.Channels.Private { public class TcpConnectionExpThread { private TRef! chanEP; private TcpSession! session; public void Run() { try { RunInternal(); } finally { // This breaks the // NetStack's abstraction - we should be calling // a new Core method to recycle the session TcpSessionPool.Recycle(session); } } private void RunInternal() { TcpConnectionContract.Exp! ep = chanEP.Acquire(); while (true) { switch receive { // // ======================= Start state ======================= // case ep.Connect(uint dstIP, ushort dstPort) : { // Log StateChange Contract Call to RingBuffer and/or Debugger session.LogStateChangeContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.Connect); TcpError tcpError; bool success = session.Connect(new IPv4(dstIP), dstPort, out tcpError); if (success) { ep.SendOK(); } else { ep.SendCouldNotConnect(tcpError); } } break; case ep.BindLocalEndPoint(uint localIP, ushort localPort) : { // Log StateChange Contract Call to RingBuffer and/or Debugger session.LogStateChangeContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.BindLocalEndPoint); bool success = session.BindLocalEndPoint(new IPv4(localIP), localPort); if (success) { ep.SendOK(); } else { ep.SendInvalidEndPoint(); } } break; // // ======================= Bound state ======================= // case ep.Listen(int backlog) : { // Log StateChange Contract Call to RingBuffer and/or Debugger session.LogStateChangeContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.Listen); bool success = session.Listen(backlog); if (success) { ep.SendOK(); } else { ep.SendCouldNotListen(); } } break; // // ======================= Listening state ======================= // case ep.IsSessionAvailable() : { // Log StateChange Contract Call to RingBuffer and/or Debugger session.LogQueryContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.IsSessionAvailable); ep.SendSessionIsAvailable(session.GetNumWaitingListenSessions() > 0); } break; case ep.Accept(TcpConnectionContract.Exp:PreConnected! newEP) : { // Log StateChange Contract Call to RingBuffer and/or Debugger session.LogStateChangeContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.Accept); TcpSession newSession = (TcpSession)session.Accept(); if (newSession != null) { // Transition the newEP to the Connected state newEP.SendReady(); // Wrap it up in a new thread TcpConnectionExpThread newConnection = new TcpConnectionExpThread(newEP, newSession); newConnection.Start(); ep.SendOK(); } else { // This is not expected; Accept() is supposed // to block until there is a new, established // connection. delete newEP; throw new Exception("Unexpected null return value from TcpSession.Accept"); } } break; // // ======================= Connected state ======================= // case ep.Read() : { // Log DataTransfer Contract Call to RingBuffer and/or Debugger session.LogDataTransferContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.Read); byte[] in ExHeap data = session.ReadData(); if (data == null) { if (ep.InState(TcpConnectionContract.ReadResult.Value)) { if (session.ValidForWrite) { ep.SendNoMoreData(); // -> SendOnly } else { ep.SendConnectionClosed(); // -> Zombie } } else { // Must be in the ROReadResult state. Here, there is // no ConnectionClosed message. ep.SendNoMoreData(); // -> Zombie } } else { // Note the unfortunate data copy. ep.SendData(data); } } break; case ep.PollRead(int timeout) : { // Log DataTransfer Contract Call to RingBuffer and/or Debugger session.LogDataTransferContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.PollRead); byte[] in ExHeap data = session.PollData(TimeSpan.FromMilliseconds(timeout)); if (data == null) { if (ep.InState(TcpConnectionContract.PollReadResult.Value)) { if (session.ValidForRead) { ep.SendNoData(); // Just no data for now. -> Connected } else if (session.ValidForWrite) { ep.SendNoMoreData(); // Can't read anymore, but can write -> SendOnly } else { ep.SendConnectionClosed(); // Can't read or write -> Zombie } } else { // Must be in the ROPollReadResult state. Here, there is // no ConnectionClosed message. if (session.ValidForRead) { ep.SendNoData(); // -> ReceiveOnly } else { ep.SendNoMoreData(); // -> Zombie } } } else { ep.SendData(data); } } break; case ep.Write(byte[]! in ExHeap data) : { // Log StateChange Contract Call to RingBuffer and/or Debugger session.LogDataTransferContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.Write); if (!session.ValidForWrite) { delete data; ep.SendCantSend(); // -> Zombie } else { int bytesToWrite = data.Length; int written = session.WriteData(data); if (written == -1) { // A -1 return value indicates the connection has been closed // underneath us while we were trying to write. ep.SendCantSend(); } else if (written != bytesToWrite) { // This is unexpected; the current implementation always // blocks and writes as much data as is provided. throw new Exception("Unexpected partial write in TcpSession.WriteData"); } else { ep.SendOK(); } } } break; case ep.IsDataAvailable() : { // Log Query Contract Call to RingBuffer and/or Debugger session.LogQueryContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.IsDataAvailable); ep.SendDataIsAvailable(session.IsDataAvailable()); } break; case ep.DoneSending() : { // Log StateChange Contract Call to RingBuffer and/or Debugger session.LogStateChangeContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.DoneSending); session.DoneSending(); } break; case ep.DoneReceiving() : { // Log StateChange Contract Call to RingBuffer and/or Debugger session.LogStateChangeContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.DoneReceiving); session.DoneReceiving(); } break; case ep.Abort() : { // Log StateChange Contract Call to RingBuffer and/or Debugger session.LogStateChangeContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.Abort); session.Abort(); } break; // // ======================= Messages from multiple states ======================= // case ep.GetLocalAddress() : { // Log Info Contract Call to RingBuffer and/or Debugger session.LogInfoContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.GetLocalAddress); ep.SendIPAddress((uint)session.LocalAddress); } break; case ep.GetLocalPort() : { // Log Info Contract Call to RingBuffer and/or Debugger session.LogInfoContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.GetLocalPort); ep.SendPort(session.LocalPort); } break; case ep.GetRemoteAddress() : { // Log Info Contract Call to RingBuffer and/or Debugger session.LogInfoContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.GetRemoteAddress); ep.SendIPAddress((uint)session.RemoteAddress); } break; case ep.GetRemotePort() : { // Log Info Contract Call to RingBuffer and/or Debugger session.LogInfoContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.GetRemotePort); ep.SendPort(session.RemotePort); } break; case ep.Close() : { // Log StateChange Contract Call to RingBuffer and/or Debugger session.LogStateChangeContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.Close); session.Close(); } break; case ep.ChannelClosed() : { // Log StateChange Contract Call to RingBuffer and/or Debugger session.LogStateChangeContractCall( TcpSessionEventsSource.TcpSessionContractEntrypoints.ChannelClosed); // Cleanup the Session and EndPoint. session.Close(); session.Dispose(); delete ep; // Exit this thread return; } break; } } } public TcpConnectionExpThread([Claims]TcpConnectionContract.Exp:ReadyState! ep) { chanEP = new TRef(ep); session = (!)((TcpSession)Core.Instance().CreateSession("TCP")); } private TcpConnectionExpThread([Claims]TcpConnectionContract.Exp:Connected! ep, TcpSession! newSession) { chanEP = new TRef(ep); session = (!)newSession; } public void Start() { #if THREAD_POOL Core.Instance().ThreadPool.QueueUserWorkItem(new ThreadStart(Run)); #else Thread newThread = new Thread(new ThreadStart(Run)); newThread.Start(); #endif } } }