//////////////////////////////////////////////////////////////////////////////// // // Microsoft Research Singularity // // Copyright (c) Microsoft Corporation. All rights reserved. // // File: TcpBlast.cs // // Note: Simple Singularity test program. // using System; using System.Diagnostics; using System.Net.IP; using Microsoft.SingSharp; using Microsoft.Singularity; using Microsoft.Singularity.Channels; using Microsoft.Singularity.Directory; using NetStack.Contracts; using NetStack.Channels.Public; using Microsoft.Contracts; using Microsoft.SingSharp.Reflection; using Microsoft.Singularity.Applications; using Microsoft.Singularity.Io; using Microsoft.Singularity.Configuration; [assembly: Transform(typeof(ApplicationResourceTransform))] namespace Microsoft.Singularity.Applications.Network { [ConsoleCategory(HelpMessage="Blast TCP data to a ipAddress, port", DefaultAction=true)] internal class Parameters { [InputEndpoint("data")] public readonly TRef Stdin; [OutputEndpoint("data")] public readonly TRef Stdout; reflective internal Parameters(); internal int AppMain() { Console.WriteLine("Use @accept or @connect to choose which mode."); return -1; } } [ConsoleCategory(HelpMessage="Wait for clients to connect and blast/gulp data.", Action="accept")] internal class AcceptParameters { [InputEndpoint("data")] public readonly TRef Stdin; [OutputEndpoint("data")] public readonly TRef Stdout; [Endpoint] public readonly TRef tcpRef; [LongParameter("port", Mandatory=false, HelpMessage="Port to accept connections on. Default is 4000.", Default=4000)] internal long port; [BoolParameter("blast", Mandatory=false, Default=false, HelpMessage="Specifies to blast. Otherwise, gulp.")] internal bool blast; [LongParameter( "numBytes", Mandatory=false, HelpMessage="Total bytes to send", Default = 1000000)] internal long numBytes; [LongParameter( "chunkSize", Mandatory=false, HelpMessage="Chunks size to send", Default = 256)] internal long chunkSize; [StringParameter("seq", Mandatory=false, HelpMessage="Sequence to use ('fib' or 'add')", Default="add")] internal string seq; reflective internal AcceptParameters(); internal int AppMain() { TcpBlast.RunAccept(this); return 0; } } [ConsoleCategory(HelpMessage="Connect to a TCP/IP service and blast/gulp data.", Action="connect")] internal class ConnectParameters { [InputEndpoint("data")] public readonly TRef Stdin; [OutputEndpoint("data")] public readonly TRef Stdout; [Endpoint] public readonly TRef tcpRef; [BoolParameter("blast", Mandatory=false, Default=false, HelpMessage="Specifies to blast. Otherwise, gulp.")] internal bool blast; [StringParameter( "address", Mandatory=true, Position=0, HelpMessage="IP Address to send to")] internal string address; [LongParameter( "port", Mandatory=true, HelpMessage="Port to connect to, default 4000", Default=4000)] internal long port; [LongParameter( "numBytes", Mandatory=false, HelpMessage="Total bytes to send", Default = 10000)] internal long numBytes; [LongParameter( "chunkSize", Mandatory=false, HelpMessage="Chunks size to send", Default = 256)] internal long chunkSize; [StringParameter("seq", Mandatory=false, HelpMessage="Sequence to use ('fib' or 'add')", Default="add")] internal string seq; reflective internal ConnectParameters(); internal int AppMain() { TcpBlast.RunConnect(this); return 0; } } public static class TcpBlast { private const string RepeatPattern = "Here is a repeating string of text that serves as content. "; public static int Blast([Claims] TcpConnectionContract.Imp:ReadyState! tcpConn, IPv4 address, ushort port, uint numBytes, uint chunkSize) { Console.WriteLine("TcpBlast to {0} at port {1} for {2} bytes in {3}-byte chunks", address, port, numBytes, chunkSize); Console.Write("Connecting..."); try { // Try to connect to the remote host tcpConn.SendConnect((uint)address, port); switch receive { case tcpConn.CouldNotConnect(TcpError error) : Console.WriteLine("Failed to connect: TcpError = " + System.Net.Sockets.TcpException.GetMessageForTcpError(error)); delete tcpConn; return -1; break; case tcpConn.OK() : // success; break; case tcpConn.ChannelClosed() : // how rude Console.WriteLine("Netstack channel closed unexpectedly"); delete tcpConn; return -1; break; } Console.WriteLine("Connected"); byte[] in ExHeap buf; uint bytesLeft = numBytes, patternLength = (uint)RepeatPattern.Length; while (bytesLeft > 0) { uint thisPass = chunkSize < bytesLeft ? chunkSize : bytesLeft; buf = new[ExHeap] byte[thisPass]; for (uint i = 0; i < thisPass; i++) { buf[(int)i] = (byte)RepeatPattern[(int)i % (int)patternLength]; } tcpConn.SendWrite(buf); switch receive { case tcpConn.OK() : Debug.WriteLine(String.Format("Sent 0x{0:x} bytes.", thisPass)); break; case tcpConn.CantSend() : Console.WriteLine("Connection closed unexpectedly"); delete tcpConn; return -1; break; } bytesLeft -= thisPass; } tcpConn.SendClose(); delete tcpConn; Console.WriteLine("Done"); } catch (Exception e) { Console.WriteLine("Unexpected exception: {0}", e); } return 0; } public static void Usage() { Console.WriteLine("tcpgulp [numBytes] [chunkSize]"); } internal static int RunConnect(ConnectParameters! config) { IPv4 host; try { host = IPv4.Parse(config.address); } catch (FormatException e) { Console.WriteLine("{0}: {1}", e, config.address); return -1; } if (config.port > 65536 || config.port < 0) { Console.WriteLine("Port number out of range: {0}", config.port); return -1; } ushort port = (ushort) config.port; uint numBytes = (uint) config.numBytes; uint chunkSize = (uint) config.chunkSize; TcpContract.Imp tcpConn = ((!)config.tcpRef).Acquire(); if (tcpConn == null) { Console.WriteLine("Could not initialize TCP endpoint."); return 1; } tcpConn.RecvReady(); TcpConnectionContract.Imp! connImp; TcpConnectionContract.Exp! connExp; TcpConnectionContract.NewChannel(out connImp, out connExp); tcpConn.SendCreateTcpSession(connExp); connImp.RecvReady(); delete tcpConn; return Blast(connImp, host, port, numBytes, chunkSize); } internal static void RunAccept(AcceptParameters! config) { if (config.seq == null) { config.seq = ""; } IInt32SequenceFactory sequenceFactory = WellKnownSequences.GetWellKnownSequence(config.seq); if (sequenceFactory == null) { Console.WriteLine("The sequence '{0}' is not recognized.", config.seq); return; } Console.WriteLine("Initializing TCP..."); TcpContract.Imp! tcp = ((!)config.tcpRef).Acquire(); tcp.RecvReady(); Console.WriteLine("Creating TCP listener..."); TcpConnectionContract.Imp! listener; TcpConnectionContract.Exp! listenerExp; TcpConnectionContract.NewChannel(out listener, out listenerExp); tcp.SendCreateTcpSession(listenerExp); tcp.RecvAck(); listener.RecvReady(); delete tcp; Console.WriteLine("Binding..."); listener.SendBindLocalEndPoint(0, (ushort)config.port); switch receive { case listener.OK(): Console.WriteLine("Successfully bound to port."); break; case listener.InvalidEndPoint(): Console.WriteLine("Bind failed. InvalidEndPoint."); delete listener; return; } listener.SendListen(50); switch receive { case listener.OK(): Console.WriteLine("Listening..."); break; case listener.CouldNotListen(): Console.WriteLine("Listen failed. No details are available."); delete listener; return; } for (;;) { TcpConnectionContract.Imp! client; TcpConnectionContract.Exp! clientExp; TcpConnectionContract.NewChannel(out client, out clientExp, TcpConnectionContract.PreConnected.Value); listener.SendAccept(clientExp); switch receive { case listener.OK(): Console.WriteLine("Listener returned 'OK' from 'Accept'. Waiting for 'Ready' on client..."); switch receive { case client.Ready(): RunClient(client, config.blast, sequenceFactory); break; case client.ChannelClosed(): Console.WriteLine("Client closed channel!"); break; } delete client; break; case listener.ChannelClosed(): Console.WriteLine("Listener closed channel."); delete client; delete listener; return; } } } static void RunClient(TcpConnectionContract.Imp:ReadyState! client, bool blast, IInt32SequenceFactory! sequenceFactory) { if (blast) { BlastClient(client, sequenceFactory); } else { GulpClient(client, sequenceFactory); } } static void BlastClient(TcpConnectionContract.Imp:ReadyState! client, IInt32SequenceFactory! sequenceFactory) { Console.WriteLine("Blasting TCP client..."); IInt32Sequence! sequence = sequenceFactory(); int blockSize = 5112; int iteration = 0; long sequenceNumber = 0; for (;;) { byte[]! in ExHeap block = new[ExHeap] byte[blockSize]; for (int i = 0; i < blockSize; i++) { int next = sequence.GetNext(); block[i] = (byte)(next & 0xff); } client.SendWrite(block); switch receive { case client.OK(): break; case client.CantSend(): Console.WriteLine("The TCP/IP stack returned 'CantSend' on a TCP session. Shouldn't happen."); return; case client.ChannelClosed(): Console.WriteLine("The TCP/IP stack closed the client channel unexpectedly."); return; } Console.Write('.'); sequenceNumber += blockSize; if (iteration == 64) { Console.WriteLine(""); iteration = 0; } } } static void GulpClient(TcpConnectionContract.Imp:ReadyState! client, IInt32SequenceFactory! sequenceFactory) { IInt32Sequence! sequence = sequenceFactory(); Console.WriteLine("Gulping from client..."); int iteration = 0; long sequenceNumber = 0; for (;;) { client.SendRead(); switch receive { case client.Data(byte[]! in ExHeap block): // Verify that the contents match the expected sequence. for (int i = 0; i < block.Length; i++) { int next = sequence.GetNext(); byte expected = (byte)(next & 0xff); if (block[i] != expected) { Console.WriteLine("Received sequence does not match expected sequence!"); Console.WriteLine("At stream offset {0} (0x{0:x}), expected 0x{1:x}, received 0x{2:x}.", sequenceNumber, expected, block[i]); delete block; return; } } sequenceNumber += block.Length; delete block; break; case client.NoData(): Console.WriteLine("Peer has closed connection."); return; case client.ChannelClosed(): Console.WriteLine("The TCP/IP stack has closed the client channel unexpectedly."); return; } // Console.Write('.'); if (iteration == 64) { // Console.WriteLine(""); iteration = 0; } } } } class AFewPrimes : IInt32Sequence { static readonly int[]! primes = { 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31 }; int index; public int GetNext() { index = (index + 1) % primes.Length; return primes[index]; } } }