C# 8 introduces async streams that make async programming in .NET more enjoyable. In this post, I’ll explore what it takes to unit test applications that are using async streams.

Prerequisites:

  1. You are familiar with asynchronous programming in .NET. If you are not, I recommend taking a fantastic pluralgsight course.

  2. You should have Visual Studio 2019 version >=16.3 or .NET Core 3.0 SDK.

Real-world applications often process data streams. Data might be coming from a database, a web-service, or an async file operation. An example application I created, mimics this by matching words coming from an async stream and yielding a “word-occurrences count” object once a match has been found. This is the worst way to count matching words, but a good example of dealing with async streams.

I was also curious about what it takes to test exception handling, cancellation, and progress reporting, so I added some examples of such tests.

Let’s try to create our first failed test.

[Test]
public async Task GetWordCountUpdates_NoWordsToCount_ReturnsEmptyResult()
{
    var dataSource = new Mock<IDataSource>();
    dataSource.Setup(x => x.GetData()).Returns(new string[] { }.ToAsyncEnumerable);
    var wordCountService = new WordCounterService(dataSource.Object);
    var result = new List<Result>();

    await foreach (var update in wordCountService.GetWordCountUpdates(new string[] { }))
    {
        result.Add(update);
    }

    Assert.That(result, Is.Empty);
}

Three things here to notice:

  1. The test is defined as async Task. We need async because there is await inside the test body. Why no async void? Well, Using async void is a bad practice. Anyways, NUnit will not let you run the test and complain, Async test method must have a non-void return type. Good job, NUnit!

  2. ToAsyncEnumerable is our good friend. It comes with `System.Linq.Async’ which can be added as the Nuget package. In this case, we apply it to the empty collection.

  3. await foreach body executes in the captured context. It means we can use non-thread safe such as List. Async doesn't mean parallel in this case, and all the updates to `result` happen sequentially.

He is the minimal test thas compiles and fails:

public interface IDataSource
{
    IAsyncEnumerable<string> GetData();
}

public class WordCounterService
{
    public WordCounterService(IDataSource dataSource)
    {
    }

    // Note, that method doesn’t have async modified yet, but tests will guide us there later
    public IAsyncEnumerable<WordCountUpdate> GetWordCountUpdates(string[] words)
    {
        throw new NotImplementedException();
    }
} 

After a series of red-green-refactor cycles, I evolved the code under test and structured. Each test defines stream behavior; Here is how typical test look like:

[Test]
public async Task GetWordCountUpdates_WordMatchesInStream_ReturnsExpectedUpdates()
{
    SetupDataSource("foo");

    var result = (await GetWordCountUpdate("foo")).First();

    Assert.That(result.OccurrencesCount, Is.EqualTo(1));
}

WordCounterService.cs WordCounterServiceTests.cs

Testing Exception handling

This part went smoothly. Thanks to Task.FromException that constructs a task that completed with a specific exception.

There is a whole family of Task.From* functions that are very helpful when it comes to creating tasks that completed with a specific state, such as success, cancel, or exception.

Here is how the test looks like:

[Test]
public void GetWordCountUpdates_DataSourceThrowsAndException_RethrowsTheException()
{
    // emulate exception while reading data stream
    async IAsyncEnumerable<string> StreamData()
    {
        await Task.FromException(new IOException("hard disk is corrupted"));
        yield break; // otherwise compiler will complain "not all path return a value"
    }
    var dataSource = new Mock<IDataSource>();
    dataSource.Setup(x => x.GetData()).Returns(StreamData);
    var service = new WordCounterService(dataSource.Object);

    var firstUpdate = service.GetWordCountUpdates(new[] { "foo" }).FirstAsync();

    // ensure that exception is not swallowed
    Assert.That(async () => await firstUpdate, Throws.InstanceOf<IOException>());
}

Note that the test method is void; this is because the only place we await is inside the lambda expression inside the async statement.

Testing cancelation

Let’s start by test that complies and fail.

[Test]
public void GetWordCountUpdates_CanBeCancelledAfterItemInStream()
{
    var dataSource = new Mock<IDataSource>();
    var source = new CancellationTokenSource();
    var cancellationToken = source.Token;
    async IAsyncEnumerable<string> StreamData()
    {
        yield return "foo";
        await Task.CompletedTask;
    }
    dataSource.Setup(x => x.GetData()).Returns(StreamData);
    var service = new WordCounterService(dataSource.Object);
    var enumerator = service.GetWordCountUpdates(new[] { "foo" }, cancellationToken).GetAsyncEnumerator(cancellationToken);

    source.Cancel();

    Assert.That(async () => await enumerator.MoveNextAsync(), Throws.TypeOf<OperationCanceledException>());
}

To make test compile and pass I added CancellationToken cancellationToken = default to WordCounterService.GetWordCountUpdates.

public async IAsyncEnumerable<WordCountUpdate> GetWordCountUpdates(string[] words, CancellationToken cancellationToken = default)
{
    await foreach (var text in m_DataSource.GetData())
    {
        cancellationToken.ThrowIfCancellationRequested();
        foreach (var word in words)
        {
            ...
        }
    }
}

Done.

Testing progress reporting

I decided that progress will be reported after reading a line stream. In the real world, it can be something more sophisticated.

[Test]
public async Task GetWordCountUpdates_ReportsProgressForEachElementInStream()
{
    var dataSource = new Mock<IDataSource>();
    static async IAsyncEnumerable<string> StreamData()
    {
        yield return "foo";
        yield return "bar";
        yield return "baz";
        await Task.CompletedTask;
    }
    dataSource.Setup(x => x.GetData()).Returns(StreamData);
    var service = new WordCounterService(dataSource.Object);
    var progressMock = new Mock<IProgress<int>>();

    await foreach (var unused in service.GetWordCountUpdates(new[] { "foo" }, CancellationToken.None, progressMock.Object))
    {
        // do nothing
    }

    progressMock.Verify(x => x.Report(1), Times.Exactly(3));
}

To make it compile I added IProgress<int> progress to WordCounterService.GetWordCountUpdates

public async IAsyncEnumerable<WordCountUpdate> GetWordCountUpdates(string[] words, CancellationToken cancellationToken = default, IProgress<int> progress = default)
{
    await foreach (var text in m_DataSource.GetData())
    {
        cancellationToken.ThrowIfCancellationRequested();
        progress?.Report(1);
        foreach (var word in words)
        {
            var occurrencesCount = text.Split(' ', '.', ';').Count(x => x.Equals(word));
            if (occurrencesCount > 0)
            {
                yield return new WordCountUpdate(word, occurrencesCount);
            }
        }
    }
}

Putting it all together

Let’s read use real data. The first thing that came to mind is to count “war” and “peace” in the self-titled book by Leo Tolstoy. To do that I created an implementation of IDataSource that reads data from async API:

class FileDataSource : IDataSource
{
        readonly string m_FileName;

        public FileDataSource(string fileName)
        {
            m_FileName = fileName;
        }

        public async IAsyncEnumerable<string> GetData()
        {
            using var reader = File.OpenText(m_FileName);
            string line = null;
            while ((line = await reader.ReadLineAsync()) != null)
            {
                yield return line;
            }
        }
    }

and the console program itself:

class Program
{
    static async Task Main()
    {
        var wordsToCount = new[] { "war", "peace" };
        var dataSource = new FileDataSource("WarAndPeace.txt");
        var wordCountService = new WordCounterService(dataSource);
        var warCount = 0;
        var peaceCount = 0;
        var linesProcessed = 0;

        var progress = new Progress<int>(n => linesProcessed += n);
        await foreach (var update in wordCountService.GetWordCountUpdates(wordsToCount, CancellationToken.None, progress))
        {
            if (update.Word == "war")
            {
                warCount += update.OccurrencesCount;
            }
            else
            {
                peaceCount += update.OccurrencesCount;
            }
            
            Console.SetCursorPosition(0, 0);
            Console.WriteLine($"Lines processed: {linesProcessed} war: {warCount} peace {peaceCount}");
        }
    }
}

Summary

I was able to emulate all the scenarios I wanted: pure data, exceptions, cancelation, and even progress tracking. Everything is done in-memory and works super-fast.

The part that I enjoyed most is that I can define stream behavior like this:

static async IAsyncEnumerable<string> StreamData()
{
    yield return "foo";
    yield return "bar";
    yield return "baz";
    await Task.CompletedTask;
}

Don’t know about you, but when I see this sequence of yields, I immediately start to wonder about other use-cases: what if there is an exception or cancellation or empty string or multiple matches in the same line and so on. I can express every what-if by writing a test for each case. Task.From* make this process effortless.

However, when I created a close to a real-word application that counts “war” and “peace” in the self-titled Leo Tolstoy book, I found a couple of cases I didn’t think about (mostly missed punctuation characters). I converted them to the unit tests.

Overall, playing with unit-testing Async Stream very was a very positive experience for me. I attribute this to the fact that .net async APIs were designed with testability in mind. I hope you’ll enjoy unit-testing your async application too.