using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Microsoft.MixedReality.WebRTC;
namespace NamedPipeSignaler
{
public class NamedPipeSignaler : IDisposable
{
public PeerConnection PeerConnection { get; }
public bool IsClient { get; }
public PeerConnection.IceCandidateReadytoSendDelegate IceCandidateReceived;
public PeerConnection.LocalSdpReadyToSendDelegate SdpMessageReceived;
private NamedPipeClientStream _clientPipe = null;
private NamedPipeServerStream _serverPipe = null;
private string _basePipeName;
private StreamWriter _sendStream = null;
private StreamReader _recvStream = null;
private readonly string _serverName = ".";
private readonly BufferBlock<string> _outgoingMessages = new BufferBlock<string>();
public NamedPipeSignaler(PeerConnection peerConnection, string pipeName)
{
PeerConnection = peerConnection;
_basePipeName = pipeName;
PeerConnection.DataChannelAdded += PeerConnection_DataChannelAdded;
PeerConnection.DataChannelRemoved += PeerConnection_DataChannelRemoved;
PeerConnection.LocalSdpReadytoSend += PeerConnection_LocalSdpReadytoSend;
PeerConnection.IceCandidateReadytoSend += PeerConnection_IceCandidateReadytoSend;
IsClient = false;
try
{
_serverPipe = new NamedPipeServerStream(pipeName, PipeDirection.In);
Console.WriteLine("Created pipe server; acting as server.");
}
catch (IOException)
{
Console.WriteLine("Pipe server already exists; acting as client.");
IsClient = true;
}
}
public async Task StartAsync()
{
if (IsClient)
{
Console.Write("Attempting to connect to the remote peer...");
_clientPipe = new NamedPipeClientStream(_serverName, _basePipeName, PipeDirection.Out);
await _clientPipe.ConnectAsync();
Console.WriteLine("Connected to the remote peer.");
Console.WriteLine($"There are currently {_clientPipe.NumberOfServerInstances} pipe server instances open.");
_serverPipe = new NamedPipeServerStream(_basePipeName + "_r", PipeDirection.In);
Console.Write("Waiting for the remote peer to connect back...");
await _serverPipe.WaitForConnectionAsync();
}
else
{
Console.Write("Waiting for the remote peer to connect...");
await _serverPipe.WaitForConnectionAsync();
Console.WriteLine("Remote peer connected.");
Console.Write("Attempting to connect back to the remote peer...");
_clientPipe = new NamedPipeClientStream(_serverName, _basePipeName + "_r", PipeDirection.Out);
await _clientPipe.ConnectAsync();
}
Console.WriteLine("Signaler connection established.");
_sendStream = new StreamWriter(_clientPipe);
_sendStream.AutoFlush = true;
_recvStream = new StreamReader(_serverPipe);
_ = Task.Factory.StartNew(ProcessIncomingMessages, TaskCreationOptions.LongRunning);
_ = Task.Run(() => WriteOutgoingMessagesAsync(CancellationToken.None));
}
private void ProcessIncomingMessages()
{
string line;
while ((line = _recvStream.ReadLine()) != null)
{
Console.WriteLine($"[<-] {line}");
if (line == "ice")
{
string sdpMid = _recvStream.ReadLine();
int sdpMlineindex = int.Parse(_recvStream.ReadLine());
string candidate = "";
while ((line = _recvStream.ReadLine()) != null)
{
if (line.Length == 0)
{
break;
}
candidate += line;
candidate += "\n";
}
Console.WriteLine($"[<-] ICE candidate: {sdpMid} {sdpMlineindex} {candidate}");
var iceCandidate = new IceCandidate
{
SdpMid = sdpMid,
SdpMlineIndex = sdpMlineindex,
Content = candidate
};
IceCandidateReceived?.Invoke(iceCandidate);
}
else if (line == "sdp")
{
string type = _recvStream.ReadLine();
string sdp = "";
while ((line = _recvStream.ReadLine()) != null)
{
if (line.Length == 0)
{
break;
}
sdp += line;
sdp += "\n";
}
Console.WriteLine($"[<-] SDP message: {type} {sdp}");
var message = new SdpMessage { Type = SdpMessage.StringToType(type), Content = sdp };
SdpMessageReceived?.Invoke(message);
}
}
Console.WriteLine("Finished processing messages");
}
private async Task WriteOutgoingMessagesAsync(CancellationToken ct)
{
while (await _outgoingMessages.OutputAvailableAsync(ct))
{
var msg = await _outgoingMessages.ReceiveAsync(ct);
_sendStream.Write(msg);
}
}
private void SendMessage(string msg)
{
try
{
Console.WriteLine($"[->] {msg}");
_outgoingMessages.Post(msg);
}
catch (Exception e)
{
Console.WriteLine($"Exception: {e.Message}");
Environment.Exit(-1);
}
}
private void PeerConnection_IceCandidateReadytoSend(IceCandidate candidate)
{
SendMessage($"ice\n{candidate.SdpMid}\n{candidate.SdpMlineIndex}\n{candidate.Content}\n\n");
}
private void PeerConnection_LocalSdpReadytoSend(SdpMessage message)
{
string typeStr = SdpMessage.TypeToString(message.Type);
SendMessage($"sdp\n{typeStr}\n{message.Content}\n\n");
}
private void PeerConnection_DataChannelAdded(DataChannel channel)
{
Console.WriteLine($"Event: DataChannel Added {channel.Label}");
channel.StateChanged += () => { Console.WriteLine($"DataChannel '{channel.Label}': StateChanged '{channel.State}'"); };
}
private void PeerConnection_DataChannelRemoved(DataChannel channel)
{
Console.WriteLine($"Event: DataChannel Removed {channel.Label}");
}
public void Dispose()
{
PeerConnection.LocalSdpReadytoSend -= PeerConnection_LocalSdpReadytoSend;
PeerConnection.DataChannelAdded -= PeerConnection_DataChannelAdded;
PeerConnection.DataChannelRemoved -= PeerConnection_DataChannelRemoved;
PeerConnection.IceCandidateReadytoSend -= PeerConnection_IceCandidateReadytoSend;
_recvStream.Close();
_outgoingMessages.Complete();
_outgoingMessages.Completion.Wait();
_clientPipe?.Dispose();
_serverPipe?.Dispose();
_sendStream?.Dispose();
_recvStream?.Dispose();
}
}
}
|