No-nonsense gRPC guide for the C# developers, Part Three: Streaming


source code

Why sometimes we care about the streaming?

Many, many micro-services are fundamentally designed as our Calculate service we were able to build in parts one and two. Of course the request and reply messages payload may wary widely, but the pattern is fundamentally the same: Client sends the request message, the service does something with the supplied parameters and sends the response back. But in some cases we have a case which requires a slightly adjusted point of view. Consider an IoT scenario (internet-of-things): we have a device which periodically and constantly reports some measurement, say temperature, and the service provides some storage and/or analytics for this data. Granted we would be able to create a gRPC method ReportTemperature or such which accepts the temperature reading and the service would do its job in the method implementation. But this is sub-optimal: establishing a secure channel to the service is not entirely free and if the device would need to do it, say, every second, the performance costs would add up. It would be better to establish a connection once and keep sending temperature readings on that connection as long as we can. So the temperature reports would essentially look as a stream of readings.

Another thing to mention that the stream may not necessary be directed as client-to-service. The opposite case could also be possible. For example what if our temperature service would push some analytical data to another (or even the same) client. Say we are interested in the median of every ten of the temperature readings. that would require the server streaming data to the client. Luckily, gRPC supports all of these cases: client-to-service streaming, service-to-client streaming and the duplex (bio-directional) streaming. In our exercise we will add bio-directional streaming. Once we know how to do that, we’ll be able to implement both cases of uni-directional streaming, as they are, in essence, the simplified versions of the bio-directional streaming.

Add the streaming support to the proto file

First, let’s add the message which indicates the single temperature reading to the contracts.proto:

// new
message Temperature {
    int64 timestamp = 1;
    double value = 2;
}

timestamp indicate the time when the temperature reading was captured and the valu is the actual reading; next add the new method to the service:

service Svc {
    rpc Calculate (CalculateRequest) returns (CalculateReply);
    // new
    rpc Median (stream Temperature) returns (stream Temperature);
}

Notice that the stream preceding the message type indicates that the method accepts and/or emits the stream. In our case the service is going to accept the stream of temperature reading and emit the stream of medians of each subsequent 10 temperature reading.

These are all the changes we are going to make to the proto file. Note that our existing Calculator functionality is not affected at all and it will continue to work as before.

client changes

We will start with the client first. We are going to emulate the temperature readings by starting from some fixed temperature and randomly adjusting up and down with each reading (in the time series analysis, this is called random walk).

Ok, the client now will be able to do both “calculator” and “time series” functionality. How do we distinguish between these two?

For simplicity, we will use the following method: if only two arguments are passed (service address and port), we will assume that we want the “time series” demo, otherwise it is going to be “calculator” demo. It is clumsy, I admit, and you will probably never do such in practice, but it will serve for our purposes. The channel, client and certificate business it the same for both cases, so let' extract the calculator-specific functionality into a separate method:

static void DoCalculator(Svc.SvcClient client, String[] args)
{
    long x = long.Parse(args[2]);
    string op = args[3];
    long y = long.Parse(args[4]);
    var reply = client.Calculate(new CalculateRequest
    {
        X = x,
        Y = y,
        Op = op
    });
    Console.WriteLine($"The calculated result is: {reply.Result}");
}

The same manner add an empty (for now) method DoTimeSeries()

static void DoTimeSeries(Svc.SvcClient client, String[] args)
{

}

Now we are ready to change the Main method of doing our silly dispatching logic:

// changed
static void Main(string[] args)
{
    string host = args[0];
    int port = int.Parse(args[1]);

    var creds = new SslCredentials(
        File.ReadAllText("cert/ca.pem")
    );
    var channel = new Channel(
        host,
        port,
        creds
        );
    var client = new Svc.SvcClient(channel);
    if (args.Length == 2)
    {
        DoTimeSeries(client, args);
    }
    else
    {
        DoCalculator(client, args);
    }
}

Sanity check

Make sure that the old calculator functionality still works. You know the drill: In one terminal

dotnet run -p Service 9000

In the second terminal

dotnet run -p Client localhost 9000 17 + 25

It works? Great! let’s move on and fill the time series stuff. We are going to be sending the temperature readings and receiving the medians at the same time, so we will need to run one of this jobs on the thread pool using the Task.Run. the client.Median() returns an object with two properties RequestStream and ResonseStream The former can be used to push the messages to the server, the latter to process the messages coming from the server. First, the processing part:

static void DoTimeSeries(Svc.SvcClient client, String[] args)
        {
            using var duplex = client.Median();
            var responseTask = Task.Run(async () =>
            {
                while (await duplex.ResponseStream.MoveNext())
                {
                    var resp = duplex.ResponseStream.Current;
                    Console.WriteLine($"{resp.Timestamp}: {resp.Value}");
                }
            });
        }
}

Next, lets take care of the pushing part. As discussed, we are using random walk to push slightly randomized readings to the service

// changed
static async Task DoTimeSeries(Svc.SvcClient client, String[] args)
{
    // as before ...
    int ts = 1;
    double temp = 10.0;
    var rnd = new Random();
    while (true)
    {
        await duplex.RequestStream.WriteAsync(new Temperature { Timestamp = ts, Value = temp });
        ts += 1;
        temp += rnd.NextDouble() - 0.5;
    }
}

As we are using await to push asynchronously, we need to modify to mark the method as async and returning a Task to make the async plumbing happy.

As you can see, our pushing part never stops, as indicated by the while (true) loop; As we expect to receive a median on every 10th push, the processing the incoming stream also never stops. You’ll have to Ctrl-C to stop the process once bored of contemplating incoming median temperatures. for the reference, the complete Client.cs as of now:

using System;
using System.IO;
using System.Threading.Tasks;
using Grpc.Core;
using Shared;

namespace Client
{
    class Program
    {
        static void DoCalculator(Svc.SvcClient client, String[] args)
        {
            long x = long.Parse(args[2]);
            string op = args[3];
            long y = long.Parse(args[4]);
            var reply = client.Calculate(new CalculateRequest
            {
                X = x,
                Y = y,
                Op = op
            });
            Console.WriteLine($"The calculated result is: {reply.Result}");
        }

        static async Task DoTimeSeries(Svc.SvcClient client, String[] args)
        {
            Console.WriteLine("doing time series");
            using var duplex = client.Median();
            var responseTask = Task.Run(async () =>
            {
                while (await duplex.ResponseStream.MoveNext())
                {
                    var resp = duplex.ResponseStream.Current;
                    Console.WriteLine($"{resp.Timestamp}: {resp.Value}");
                }
            });
            int ts = 1;
            double temp = 10.0;
            var rnd = new Random();
            while (true)
            {
                await duplex.RequestStream.WriteAsync(new Temperature { Timestamp = ts, Value = temp });
                ts += 1;
                temp += rnd.NextDouble() - 0.5;
            }
        }

        static async Task Main(string[] args)
        {
            string host = args[0];
            int port = int.Parse(args[1]);

            var creds = new SslCredentials(
                File.ReadAllText("cert/ca.pem")
            );
            var channel = new Channel(
                host,
                port,
                creds
                );
            var client = new Svc.SvcClient(channel);
            if (args.Length == 2)
            {
                await DoTimeSeries(client, args);
            }
            else
            {
                DoCalculator(client, args);
            }
        }
    }
}

On to the service streaming!

Streaming in the service

Below the Calculate method add implementation of Median in the MyService class with the following signature:

public override async Task Median(IAsyncStreamReader<Temperature> requestStream, IServerStreamWriter<Temperature> responseStream, ServerCallContext context)
{

}

As you can observe, in the service you also have access to both request and response streams. Our median logic will be pretty straightforward and dumb. We will keep around a buffer of at-most 10 messages. Once it is full, we sort it, take the middle and send it to the response stream

public override async Task Median(IAsyncStreamReader<Temperature> requestStream, IServerStreamWriter<Temperature> responseStream, ServerCallContext context)
{
    // new
    var vals = new List<double>();
    while (await requestStream.MoveNext())
    {
        var temp = requestStream.Current;
        vals.Add(temp.Value);
        double med = 0;
        if (vals.Count == 10)
        {
            var arr = vals.ToArray();
            Array.Sort(arr);
            med = (arr[4] + arr[5]) / 2;
            vals.Clear();
            await responseStream.WriteAsync(new Temperature { Timestamp = temp.Timestamp, Value = med });
        }
    }
}

That should do the trick. For the reference, full Service.cs:

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Grpc.Core;
using Shared;

namespace Service
{
    public class MyService : Svc.SvcBase
    {
        public override Task<CalculateReply> Calculate(CalculateRequest request, ServerCallContext context)
        {
            long result = -1;
            switch (request.Op)
            {
                case "+":
                    result = request.X + request.Y;
                    break;
                case "-":
                    result = request.X - request.Y;
                    break;
                case "*":
                    result = request.X * request.Y;
                    break;
                case "/":
                    if (request.Y != 0)
                    {
                        result = (long)request.X / request.Y;
                    }
                    break;
                default:
                    break;
            }
            return Task.FromResult(new CalculateReply { Result = result });
        }

        public override async Task Median(IAsyncStreamReader<Temperature> requestStream, IServerStreamWriter<Temperature> responseStream, ServerCallContext context)
        {
            Console.WriteLine("Median");
            var vals = new List<double>();
            while (await requestStream.MoveNext())
            {
                var temp = requestStream.Current;
                vals.Add(temp.Value);
                double med = 0;
                if (vals.Count == 10)
                {
                    var arr = vals.ToArray();
                    Array.Sort(arr);
                    med = (arr[4] + arr[5]) / 2;
                    vals.Clear();
                    await responseStream.WriteAsync(new Temperature { Timestamp = temp.Timestamp, Value = med });
                }
            }
        }
    }
    class Program
    {
        static void Main(string[] args)
        {
            int port = int.Parse(args[0]);
            var pair = new KeyCertificatePair(
            File.ReadAllText("cert/service.pem"),
            File.ReadAllText("cert/service-key.pem")
            );
            var creds = new SslServerCredentials(new[] { pair });
            var server = new Server
            {
                Services = { Svc.BindService(new MyService()) },
                Ports = { new ServerPort("0.0.0.0", port, creds) }
            };
            server.Start();
            Console.WriteLine($"Server listening at port {port}. Press any key to terminate");
            Console.ReadKey();
        }
    }

}

Give it a shot: run the service

dotnet run -p Service 9000

and in the separate terminal, the client in the “time series” mode

dotnet run -p Client localhost 9000

You should see a continuous stream of calculated medians coming to the client’s terminal. Ctrl-C when bored. Try doing the same across two different computers.

It should work, just make sure that the certificate for the service contains its hostname/ip address

This concludes our mini-series. As a bonus I will publish shortly how we can replicate the whole functionality in Python so we can talk the Python client to the C# service and vice-versa. This is optional, but if interested, stay tuned!


See also