Sockets in .Net Core – Part 3

Framing using Pipelines

One of the latest additions to .Net Core 2.x was Pipelines, that I consider “Microsoft to the rescue” concept. In a previous article I talked about manual framing which was the most popular solution despite of optimizations and nuances, until Microsoft release the Pipelines package.

If you haven’t read the previous article about framing I suggest you to review it, it will help to understand pipelines much better.

Socket in .Net Core – Part 2

Going back to the definition from the previous article:

 In a nutshell, framing is the process of messages interpretation, which requires to identify message boundaries in a stream context.

This process of identifying boundaries presents several problems:

  • Efficiency: ASCII, Unicode, bytes interpretation, all of them have different complexity and potential solutions
  • Boilerplate code that tackles edge cases in inefficient ways
  • Buffering problems resulting in flow control

Pipelines to the rescue

Microsoft released System.IO.Pipelines in an update of .Net Core 2.2. Using Pipelines is extremely simple, there are two steps:

  1. FillPipeAsync reads from an existing socket and writes into the PipeWriter
  2. ReadPipeAsync reads from an entity called PipeReader and returns parsed lines

The advantages are clear; simplicity, optimization and buffer controlled by the library instead of manual buffering processes.

The whole code you see in the previous article is reduced to some few elegant methods.

Part 1: Orchestration

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);
    await Task.WhenAll(reading, writing);
}

This code creates a Pipe and starts a writer and a reader method. Something very interesting is the concept of decoupling reader and writer. In theory, we don’t need to write anything we are reading from the socket leading a potential null writer implementation.

Names can be a bit confusing in this case. The “Writer” means a Pipe Writer, it will consume the socket and write the Pipe. The Pipe is a very optimized circular queue intended to manage back pressure and flow control efficiently. It saves us of building our own buffers but also controls very important aspects of network communications.

Both tasks are executed and never end until connection is closed. It will be easier to see in the following code.

Part 2: Socket reading – Pipe writing

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 1024;
    while (true)
    {
        // Allocate at least 1024 bytes from the PipeWriter
        Memory memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket
            writer.Advance(bytesRead);
        }
        catch (SocketException socketException)
        {
            // …
        }
        catch (Exception ex)
        {
            // …
        }
        // Make the data available to the PipeReader
        FlushResult result = await writer.FlushAsync();
        if (result.IsCompleted)
        {
            break;
        }
    }
    // Tell the PipeReader that there's no more data coming
    writer.Complete();
}

At the beginning this code might seem intimidating but it is actually very simple. First, we allocate a buffer at the application level from the Pipe Writer. Socket reading is exactly the same of before, the difference is now we will write our bytes to the Pipe which will efficiently manage our buffer. Exception handling is added as a real-life example.

Once we write data to the writer to need to inform the Reader that data is ready using FlushAsync. This method has two goals, making data available and also returning a flag to indicate communication is finished. IsCompleted means there is no more data to read and we can exit which is what this code does. The writer signals when it is done with the Complete method.

Part 3: Pipe reading – Framing

I have to tell you the truth. Framing is not entire eliminated, it is still necessary to determine boundaries, but pipelines create a way simpler code that let you focus on the main part which is trivial after releasing the heavy work to the pipes.

Just check the code and you will agree with me:

byte[] _binaryMessageBoundary;
async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence buffer = result.Buffer;
        int marker = BuffersCustomExtensions.LocationNotFound;
        do
        {
            // Look for the message boundary in the buffer
            marker = buffer.Locate(_messageBoundary);
            if (marker != BuffersCustomExtensions.LocationNotFound)
            {
                var bufferSlice = buffer.Slice(0, marker + _messageBoundary.Length);
                var slice = bufferSlice.ToArray();
                await ProcessCommand(slice); // your code to process the message
                buffer = buffer.Slice(marker + _messageBoundary.Length, buffer.Length - 
                    marker - _messageBoundary.Length);
            }
        }
        while (marker != BuffersCustomExtensions.LocationNotFound);
        // Tell the PipeReader how much of the buffer we have consumed
        reader.AdvanceTo(buffer.Start, buffer.End);
        // Stop reading if there's no more data coming
        if (result.IsCompleted)
        {
            break;
        }
    }
    // Mark the PipeReader as complete
    reader.Complete();
}

Code is extremely simple and way cleaner. We need to read from the Pipe reader that handles the buffer for us, extracting a ReadOnlySequence of bytes. The code that searches for boundaries uses byte array slicing and markers to determine messages.

In the same way we advance the writer position in the pipe we need to do it in the reader, indicating where we ended reading. Same concept applies to stop reading, if the flag for completed is set, socket has finished reading and it’s safe to leave. The pipe reader also needs to flag the pipe as complete before ending.

Once both tasks exit the main method used to orchestrate will detect the tasks have finished and it will exit as well.

Summary

If you are using .Net Core 2.2 or greater and you are still using manual buffer handling, switch to Pipelines immediately. It will simplify the code and also, performance will mostly always be better, especially if you are managing multiple threads at the same time. I noted a significant better CPU utilization after switching to Pipelines.

Published by maxriosflores

Solution Architect for a decade. I designed, built and implemented software solutions for more than 25 years and every single day more interested on technology. I learned to code in a Texas Instruments with 16kb at 8 years old. I shared this passion with friends coding CZ Spectrums, MSX's and C64's. I worked in computers since my early 17's with super old tools like plain C and Quick Basic. I love math and computers as much as outdoors and family life.

Leave a comment

Your email address will not be published. Required fields are marked *