Image by macrovector on Freepik
The code is available on GitHub here
This article demonstrates the implementation of Named Pipes for Interprocess Communication (IPC) design, which was discussed in a previous article. In this implementation, the PipeStream class in the System.IO.Pipes namespace is used to implement the IServer and IClient interfaces.
NamedPipes implementation using PipeStream
To begin, a core component is created that will handle common functionalities for both NamedPipeClient and NamedPipeServer. This component implements the IPCConnection interface and is named NamedPipeBase. It takes a generic type parameter, T, which must inherit from PipeStream. The NamedPipeBase class has three fields: _name, Pipe, and _stream, where _name is the name of the named pipe, Pipe is the pipe stream, and _stream is the StreamString class used to encapsulate the reading and writing of the PipeStream.
Two events are defined in the NamedPipeBase class: Disconnected and MessageReceived. The Disconnected event is raised when the connection is lost, and the MessageReceived event is raised when a message is received.
The Initialize() method sets the Pipe and _stream fields. The StartReading() method starts a new Task that continuously reads messages from the named pipe and raises the MessageReceived event when a message is received. The Send() method writes a message to the named pipe.
public abstract class NamedPipeBase<T> : IPCConnection
where T : PipeStream
{
protected readonly string _name;
protected T Pipe;
private StreamString _stream;
public NamedPipeBase(string pipeName)
{
_name = pipeName;
}
public event EventHandler Disconnected;
private void OnDisconnected()
{
Disconnected?.Invoke(this, EventArgs.Empty);
}
public event EventHandler<MessageReceivedEventArgs> MessageReceived;
private void OnMessageReceived(string message)
{
MessageReceived?.Invoke(this, new MessageReceivedEventArgs(message));
}
protected void Initialize(T pipeStream)
{
Pipe = pipeStream;
_stream = new StreamString(pipeStream);
}
protected async Task StartReading()
{
await Task.Factory.StartNew(async () =>
{
try
{
while (true)
{
var message = await _stream.ReadString();
OnMessageReceived(message);
}
}
catch (InvalidOperationException)
{
OnDisconnected();
Dispose();
}
});
}
public async Task Send(string message)
{
await _stream.WriteString(message);
}
public abstract void Dispose();
}
Reading and Writing to a PipeStream
Next, the StreamString class is defined, which encapsulates the reading and writing of the PipeStream. The ReadString() method reads a message from the named pipe and returns it as a string. The WriteString() method writes a message to the named pipe.
public class StreamString
{
private PipeStream ioStream;
public StreamString(PipeStream ioStream)
{
this.ioStream = ioStream;
}
public async Task<string> ReadString()
{
byte[] messageBytes = new byte[256];
StringBuilder message = new StringBuilder();
if (ioStream.CanRead)
{
// loop until the entire message is read
do
{
var bytesRead =
await ioStream.ReadAsync(messageBytes, 0,
messageBytes.Length);
// got bytes from the stream so add them to the message
if (bytesRead > 0)
{
message.Append(
Encoding.Unicode.GetString(messageBytes, 0, bytesRead));
Array.Clear(messageBytes, 0, messageBytes.Length);
}
else
throw new InvalidOperationException("disconnected.");
}
while (!ioStream.IsMessageComplete);
}
return message.ToString();
}
public async Task WriteString(string message)
{
var messageBytes = Encoding.Unicode.GetBytes(message);
if (ioStream.CanWrite)
{
await ioStream.WriteAsync(messageBytes, 0, messageBytes.Length);
await ioStream.FlushAsync();
ioStream.WaitForPipeDrain();
}
}
}
NamedPipeServer
The NamedPipeServer class implements the IServer interface and utilizes the NamedPipeBase for common functionality. The class has two additional events: ClientConnected and ServerStarted. The ClientConnected event is raised when a client connects to the named pipe and the ServerStarted event is raised when the server is started.
The Start() method initializes the named pipe server stream and starts waiting for client connections. When a client connects, the WaitForConnectionCallBack() method is called, which raises the ClientConnected event and starts reading messages from the named pipe. The Dispose() method disconnects and disposes the named pipe server stream.
public class NamedPipeServer : NamedPipeBase<NamedPipeServerStream>, IServer
{
public NamedPipeServer(string name)
: base(name)
{
}
public event EventHandler ClientConnected;
private void OnClientConnected()
{
ClientConnected?.Invoke(this, EventArgs.Empty);
}
public event EventHandler ServerStarted;
private void OnServerStarted()
{
ServerStarted?.Invoke(this, EventArgs.Empty);
}
public async Task Start()
{
Initialize(new NamedPipeServerStream(_name, PipeDirection.InOut, 1,
PipeTransmissionMode.Message, PipeOptions.Asynchronous));
try
{
Pipe.BeginWaitForConnection(WaitForConnectionCallBack, null);
OnServerStarted();
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
private void WaitForConnectionCallBack(IAsyncResult result)
{
Pipe.EndWaitForConnection(result);
OnClientConnected();
StartReading().GetAwaiter().GetResult();
}
public override void Dispose()
{
Pipe?.Disconnect();
Pipe?.Dispose();
}
}
NamedPipeClient
The NamedPipeClient class implements the IClient interface and utilizes the NamedPipeBase for common functionality. The class has two events: ConnectedToServer and ClientStarted. The ConnectedToServer event is raised when the client successfully connects to the named pipe server, and the ClientStarted event is raised when the client is started.
The Connect() method initializes the named pipe client stream, connects to the named pipe server, sets the read mode, and starts reading messages from the named pipe. The Dispose() method disposes the named pipe client stream.
public class NamedPipeClient : NamedPipeBase<NamedPipeClientStream>, IClient
{
public NamedPipeClient(string name) : base(name)
{
}
public event EventHandler ConnectedToServer;
private void OnConnectedToServer()
{
ConnectedToServer?.Invoke(this, EventArgs.Empty);
}
public event EventHandler ClientStarted;
private void OnClientStarted()
{
ClientStarted?.Invoke(this, EventArgs.Empty);
}
public async Task Connect()
{
Initialize(new NamedPipeClientStream(".", _name, PipeDirection.InOut, PipeOptions.Asynchronous));
try
{
OnClientStarted();
await Pipe.ConnectAsync();
Pipe.ReadMode = PipeTransmissionMode.Message;
OnConnectedToServer();
await StartReading();
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
public override void Dispose()
{
Pipe?.Dispose();
}
}
Usage example
To use the NamedPipeServer and NamedPipeClient classes, first create a NamedPipeServer instance and bind callback functions to its events. Then call the Start method of the NamedPipeServer instance to start the server and wait for connections. Create a new NamedPipeClient instance with a name matching the NamedPipeServer instance, bind callback functions to its events, and call the Connect method to connect to the NamedPipeServer instance.
static async Task Main(string[] args)
{
IServer server = new NamedPipeServer("mypipe");
server.ServerStarted += (_, args) =>
Console.WriteLine("SERVER => Server started.");
server.ClientConnected += (_, args) =>
Console.WriteLine("SERVER => A client connected.");
server.MessageReceived += (_, args) =>
Console.WriteLine($"SERVER => Message received from client: {(args as MessageReceivedEventArgs).Message}");
server.Disconnected += (_, args) =>
Console.WriteLine($"SERVER => A client disconnected.");
await server.Start();
IClient client = new NamedPipeClient("mypipe");
client.ClientStarted += (_, args)
=> Console.WriteLine("CLIENT => Client started.");
client.ConnectedToServer += (_, args)
=> Console.WriteLine("CLIENT => Client connected to server.");
client.MessageReceived += (_, args) =>
Console.WriteLine($"CLIENT => Message received from server: {(args as MessageReceivedEventArgs).Message}");
client.Disconnected += (_, args) =>
Console.WriteLine($"CLIENT => Server disconnected.");
await client.Connect();
}
After Server and Client are started and connected with each other we have the following output:
SERVER => Server started.
CLIENT => Client started.
SERVER => A client connected.
CLIENT => Client connected to server.
Once connected, use the Send method to send messages and the MessageReceived event to receive messages.
while (true)
{
var command = Console.ReadLine();
if (command.StartsWith("s:"))
{
var message = command.Replace("s:", string.Empty);
if (message == "stop")
server.Dispose();
else if (message == "start")
await server.Start();
else
await server.Send(message);
}
else if (command.StartsWith("c:"))
{
var message = command.Replace("c:", string.Empty);
if (message == "stop")
client.Dispose();
else if (message == "start")
await client.Connect();
else
await client.Send(message);
}
else
break;
}
We prefix the command with ‘c:’ in order to send a message with the client and with ‘s:’ to send a message from server to the client.
Below we send messages, disconnect, reconnect and then send messages again.
c: a message to the server # user input
SERVER => Message received from client: a message to the server # output
s:a command send to the client # user input
CLIENT => Message received from server: a command send to the client # output
c:stop # user input
CLIENT => Server disconnected. # output
SERVER => A client disconnected. # output
s:start # user input
SERVER => Server started. # output
c:start # user input
CLIENT => Client started. # output
CLIENT => Client connected to server. #output
SERVER => A client connected. # output
c:a new message is send from client # user input
SERVER => Message received from client: a new message is send from client # output