singrdk/base/Applications/Network/TcpBlast/TcpBlast.sg

461 lines
16 KiB
Plaintext

////////////////////////////////////////////////////////////////////////////////
//
// Microsoft Research Singularity
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// File: TcpBlast.cs
//
// Note: Simple Singularity test program.
//
using System;
using System.Diagnostics;
using System.Net.IP;
using Microsoft.SingSharp;
using Microsoft.Singularity;
using Microsoft.Singularity.Channels;
using Microsoft.Singularity.Directory;
using NetStack.Contracts;
using NetStack.Channels.Public;
using Microsoft.Contracts;
using Microsoft.SingSharp.Reflection;
using Microsoft.Singularity.Applications;
using Microsoft.Singularity.Io;
using Microsoft.Singularity.Configuration;
[assembly: Transform(typeof(ApplicationResourceTransform))]
namespace Microsoft.Singularity.Applications.Network
{
[ConsoleCategory(HelpMessage="Blast TCP data to a ipAddress, port", DefaultAction=true)]
internal class Parameters {
[InputEndpoint("data")]
public readonly TRef<UnicodePipeContract.Exp:READY> Stdin;
[OutputEndpoint("data")]
public readonly TRef<UnicodePipeContract.Imp:READY> Stdout;
reflective internal Parameters();
internal int AppMain() {
Console.WriteLine("Use @accept or @connect to choose which mode.");
return -1;
}
}
[ConsoleCategory(HelpMessage="Wait for clients to connect and blast/gulp data.", Action="accept")]
internal class AcceptParameters
{
[InputEndpoint("data")]
public readonly TRef<UnicodePipeContract.Exp:READY> Stdin;
[OutputEndpoint("data")]
public readonly TRef<UnicodePipeContract.Imp:READY> Stdout;
[Endpoint]
public readonly TRef<TcpContract.Imp:Start> tcpRef;
[LongParameter("port", Mandatory=false, HelpMessage="Port to accept connections on. Default is 4000.", Default=4000)]
internal long port;
[BoolParameter("blast", Mandatory=false, Default=false, HelpMessage="Specifies to blast. Otherwise, gulp.")]
internal bool blast;
[LongParameter( "numBytes", Mandatory=false, HelpMessage="Total bytes to send", Default = 1000000)]
internal long numBytes;
[LongParameter( "chunkSize", Mandatory=false, HelpMessage="Chunks size to send", Default = 256)]
internal long chunkSize;
[StringParameter("seq", Mandatory=false, HelpMessage="Sequence to use ('fib' or 'add')", Default="add")]
internal string seq;
reflective internal AcceptParameters();
internal int AppMain() {
TcpBlast.RunAccept(this);
return 0;
}
}
[ConsoleCategory(HelpMessage="Connect to a TCP/IP service and blast/gulp data.", Action="connect")]
internal class ConnectParameters
{
[InputEndpoint("data")]
public readonly TRef<UnicodePipeContract.Exp:READY> Stdin;
[OutputEndpoint("data")]
public readonly TRef<UnicodePipeContract.Imp:READY> Stdout;
[Endpoint]
public readonly TRef<TcpContract.Imp:Start> tcpRef;
[BoolParameter("blast", Mandatory=false, Default=false, HelpMessage="Specifies to blast. Otherwise, gulp.")]
internal bool blast;
[StringParameter( "address", Mandatory=true, Position=0, HelpMessage="IP Address to send to")]
internal string address;
[LongParameter( "port", Mandatory=true, HelpMessage="Port to connect to, default 4000", Default=4000)]
internal long port;
[LongParameter( "numBytes", Mandatory=false, HelpMessage="Total bytes to send", Default = 10000)]
internal long numBytes;
[LongParameter( "chunkSize", Mandatory=false, HelpMessage="Chunks size to send", Default = 256)]
internal long chunkSize;
[StringParameter("seq", Mandatory=false, HelpMessage="Sequence to use ('fib' or 'add')", Default="add")]
internal string seq;
reflective internal ConnectParameters();
internal int AppMain() {
TcpBlast.RunConnect(this);
return 0;
}
}
public static class TcpBlast
{
private const string RepeatPattern = "Here is a repeating string of text that serves as content. ";
public static int Blast([Claims] TcpConnectionContract.Imp:ReadyState! tcpConn,
IPv4 address, ushort port, uint numBytes, uint chunkSize)
{
Console.WriteLine("TcpBlast to {0} at port {1} for {2} bytes in {3}-byte chunks",
address, port, numBytes, chunkSize);
Console.Write("Connecting...");
try {
// Try to connect to the remote host
tcpConn.SendConnect((uint)address, port);
switch receive {
case tcpConn.CouldNotConnect(TcpError error) :
Console.WriteLine("Failed to connect: TcpError = " + System.Net.Sockets.TcpException.GetMessageForTcpError(error));
delete tcpConn;
return -1;
break;
case tcpConn.OK() :
// success;
break;
case tcpConn.ChannelClosed() :
// how rude
Console.WriteLine("Netstack channel closed unexpectedly");
delete tcpConn;
return -1;
break;
}
Console.WriteLine("Connected");
byte[] in ExHeap buf;
uint bytesLeft = numBytes, patternLength = (uint)RepeatPattern.Length;
while (bytesLeft > 0) {
uint thisPass = chunkSize < bytesLeft ? chunkSize : bytesLeft;
buf = new[ExHeap] byte[thisPass];
for (uint i = 0; i < thisPass; i++) {
buf[(int)i] = (byte)RepeatPattern[(int)i % (int)patternLength];
}
tcpConn.SendWrite(buf);
switch receive {
case tcpConn.OK() :
Debug.WriteLine(String.Format("Sent 0x{0:x} bytes.", thisPass));
break;
case tcpConn.CantSend() :
Console.WriteLine("Connection closed unexpectedly");
delete tcpConn;
return -1;
break;
}
bytesLeft -= thisPass;
}
tcpConn.SendClose();
delete tcpConn;
Console.WriteLine("Done");
}
catch (Exception e) {
Console.WriteLine("Unexpected exception: {0}", e);
}
return 0;
}
public static void Usage()
{
Console.WriteLine("tcpgulp <ipAddress> <port> [numBytes] [chunkSize]");
}
internal static int RunConnect(ConnectParameters! config)
{
IPv4 host;
try {
host = IPv4.Parse(config.address);
}
catch (FormatException e) {
Console.WriteLine("{0}: {1}", e, config.address);
return -1;
}
if (config.port > 65536 || config.port < 0) {
Console.WriteLine("Port number out of range: {0}", config.port);
return -1;
}
ushort port = (ushort) config.port;
uint numBytes = (uint) config.numBytes;
uint chunkSize = (uint) config.chunkSize;
TcpContract.Imp tcpConn = ((!)config.tcpRef).Acquire();
if (tcpConn == null) {
Console.WriteLine("Could not initialize TCP endpoint.");
return 1;
}
tcpConn.RecvReady();
TcpConnectionContract.Imp! connImp;
TcpConnectionContract.Exp! connExp;
TcpConnectionContract.NewChannel(out connImp, out connExp);
tcpConn.SendCreateTcpSession(connExp);
connImp.RecvReady();
delete tcpConn;
return Blast(connImp, host, port, numBytes, chunkSize);
}
internal static void RunAccept(AcceptParameters! config)
{
if (config.seq == null) {
config.seq = "";
}
IInt32SequenceFactory sequenceFactory = WellKnownSequences.GetWellKnownSequence(config.seq);
if (sequenceFactory == null) {
Console.WriteLine("The sequence '{0}' is not recognized.", config.seq);
return;
}
Console.WriteLine("Initializing TCP...");
TcpContract.Imp! tcp = ((!)config.tcpRef).Acquire();
tcp.RecvReady();
Console.WriteLine("Creating TCP listener...");
TcpConnectionContract.Imp! listener;
TcpConnectionContract.Exp! listenerExp;
TcpConnectionContract.NewChannel(out listener, out listenerExp);
tcp.SendCreateTcpSession(listenerExp);
tcp.RecvAck();
listener.RecvReady();
delete tcp;
Console.WriteLine("Binding...");
listener.SendBindLocalEndPoint(0, (ushort)config.port);
switch receive {
case listener.OK():
Console.WriteLine("Successfully bound to port.");
break;
case listener.InvalidEndPoint():
Console.WriteLine("Bind failed. InvalidEndPoint.");
delete listener;
return;
}
listener.SendListen(50);
switch receive {
case listener.OK():
Console.WriteLine("Listening...");
break;
case listener.CouldNotListen():
Console.WriteLine("Listen failed. No details are available.");
delete listener;
return;
}
for (;;) {
TcpConnectionContract.Imp! client;
TcpConnectionContract.Exp! clientExp;
TcpConnectionContract.NewChannel(out client, out clientExp, TcpConnectionContract.PreConnected.Value);
listener.SendAccept(clientExp);
switch receive {
case listener.OK():
Console.WriteLine("Listener returned 'OK' from 'Accept'. Waiting for 'Ready' on client...");
switch receive {
case client.Ready():
RunClient(client, config.blast, sequenceFactory);
break;
case client.ChannelClosed():
Console.WriteLine("Client closed channel!");
break;
}
delete client;
break;
case listener.ChannelClosed():
Console.WriteLine("Listener closed channel.");
delete client;
delete listener;
return;
}
}
}
static void RunClient(TcpConnectionContract.Imp:ReadyState! client, bool blast, IInt32SequenceFactory! sequenceFactory)
{
if (blast) {
BlastClient(client, sequenceFactory);
}
else {
GulpClient(client, sequenceFactory);
}
}
static void BlastClient(TcpConnectionContract.Imp:ReadyState! client, IInt32SequenceFactory! sequenceFactory)
{
Console.WriteLine("Blasting TCP client...");
IInt32Sequence! sequence = sequenceFactory();
int blockSize = 5112;
int iteration = 0;
long sequenceNumber = 0;
for (;;) {
byte[]! in ExHeap block = new[ExHeap] byte[blockSize];
for (int i = 0; i < blockSize; i++) {
int next = sequence.GetNext();
block[i] = (byte)(next & 0xff);
}
client.SendWrite(block);
switch receive {
case client.OK():
break;
case client.CantSend():
Console.WriteLine("The TCP/IP stack returned 'CantSend' on a TCP session. Shouldn't happen.");
return;
case client.ChannelClosed():
Console.WriteLine("The TCP/IP stack closed the client channel unexpectedly.");
return;
}
Console.Write('.');
sequenceNumber += blockSize;
if (iteration == 64) {
Console.WriteLine("");
iteration = 0;
}
}
}
static void GulpClient(TcpConnectionContract.Imp:ReadyState! client, IInt32SequenceFactory! sequenceFactory)
{
IInt32Sequence! sequence = sequenceFactory();
Console.WriteLine("Gulping from client...");
int iteration = 0;
long sequenceNumber = 0;
for (;;) {
client.SendRead();
switch receive {
case client.Data(byte[]! in ExHeap block):
// Verify that the contents match the expected sequence.
for (int i = 0; i < block.Length; i++) {
int next = sequence.GetNext();
byte expected = (byte)(next & 0xff);
if (block[i] != expected) {
Console.WriteLine("Received sequence does not match expected sequence!");
Console.WriteLine("At stream offset {0} (0x{0:x}), expected 0x{1:x}, received 0x{2:x}.",
sequenceNumber,
expected,
block[i]);
delete block;
return;
}
}
sequenceNumber += block.Length;
delete block;
break;
case client.NoData():
Console.WriteLine("Peer has closed connection.");
return;
case client.ChannelClosed():
Console.WriteLine("The TCP/IP stack has closed the client channel unexpectedly.");
return;
}
// Console.Write('.');
if (iteration == 64) {
// Console.WriteLine("");
iteration = 0;
}
}
}
}
class AFewPrimes : IInt32Sequence
{
static readonly int[]! primes = { 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31 };
int index;
public int GetNext()
{
index = (index + 1) % primes.Length;
return primes[index];
}
}
}