Unit testing and .NET async streams
C# 8 introduces async streams that make async programming in .NET more enjoyable. In this post, I’ll explore what it takes to unit test applications that are using async streams.
Prerequisites:
-
You are familiar with asynchronous programming in .NET. If you are not, I recommend taking a fantastic pluralgsight course.
-
You should have Visual Studio 2019 version >=16.3 or .NET Core 3.0 SDK.
Real-world applications often process data streams. Data might be coming from a database, a web-service, or an async file operation. An example application I created, mimics this by matching words coming from an async stream and yielding a “word-occurrences count” object once a match has been found. This is the worst way to count matching words, but a good example of dealing with async streams.
I was also curious about what it takes to test exception handling, cancellation, and progress reporting, so I added some examples of such tests.
Let’s try to create our first failed test.
[Test]
public async Task GetWordCountUpdates_NoWordsToCount_ReturnsEmptyResult()
{
var dataSource = new Mock<IDataSource>();
dataSource.Setup(x => x.GetData()).Returns(new string[] { }.ToAsyncEnumerable);
var wordCountService = new WordCounterService(dataSource.Object);
var result = new List<Result>();
await foreach (var update in wordCountService.GetWordCountUpdates(new string[] { }))
{
result.Add(update);
}
Assert.That(result, Is.Empty);
}
Three things here to notice:
-
The test is defined as
async Task.
We needasync
because there isawait
inside the test body. Why noasync void
? Well, Usingasync void
is a bad practice. Anyways, NUnit will not let you run the test and complain,Async test method must have a non-void return type.
Good job, NUnit! -
ToAsyncEnumerable
is our good friend. It comes with `System.Linq.Async’ which can be added as the Nuget package. In this case, we apply it to the empty collection. -
await foreach
body executes in the captured context. It means we can use non-thread safe such as List. Async doesn't mean parallel in this case, and all the updates to `result` happen sequentially.
He is the minimal test thas compiles and fails:
public interface IDataSource
{
IAsyncEnumerable<string> GetData();
}
public class WordCounterService
{
public WordCounterService(IDataSource dataSource)
{
}
// Note, that method doesn’t have async modified yet, but tests will guide us there later
public IAsyncEnumerable<WordCountUpdate> GetWordCountUpdates(string[] words)
{
throw new NotImplementedException();
}
}
After a series of red-green-refactor cycles, I evolved the code under test and structured. Each test defines stream behavior; Here is how typical test look like:
[Test]
public async Task GetWordCountUpdates_WordMatchesInStream_ReturnsExpectedUpdates()
{
SetupDataSource("foo");
var result = (await GetWordCountUpdate("foo")).First();
Assert.That(result.OccurrencesCount, Is.EqualTo(1));
}
WordCounterService.cs WordCounterServiceTests.cs
Testing Exception handling
This part went smoothly. Thanks to Task.FromException
that constructs a task that completed with a specific exception.
There is a whole family of Task.From*
functions that are very helpful when it comes to creating tasks that completed with a specific state, such as success, cancel, or exception.
Here is how the test looks like:
[Test]
public void GetWordCountUpdates_DataSourceThrowsAndException_RethrowsTheException()
{
// emulate exception while reading data stream
async IAsyncEnumerable<string> StreamData()
{
await Task.FromException(new IOException("hard disk is corrupted"));
yield break; // otherwise compiler will complain "not all path return a value"
}
var dataSource = new Mock<IDataSource>();
dataSource.Setup(x => x.GetData()).Returns(StreamData);
var service = new WordCounterService(dataSource.Object);
var firstUpdate = service.GetWordCountUpdates(new[] { "foo" }).FirstAsync();
// ensure that exception is not swallowed
Assert.That(async () => await firstUpdate, Throws.InstanceOf<IOException>());
}
Note that the test method is void
; this is because the only place we await is inside the lambda expression inside the async
statement.
Testing cancelation
Let’s start by test that complies and fail.
[Test]
public void GetWordCountUpdates_CanBeCancelledAfterItemInStream()
{
var dataSource = new Mock<IDataSource>();
var source = new CancellationTokenSource();
var cancellationToken = source.Token;
async IAsyncEnumerable<string> StreamData()
{
yield return "foo";
await Task.CompletedTask;
}
dataSource.Setup(x => x.GetData()).Returns(StreamData);
var service = new WordCounterService(dataSource.Object);
var enumerator = service.GetWordCountUpdates(new[] { "foo" }, cancellationToken).GetAsyncEnumerator(cancellationToken);
source.Cancel();
Assert.That(async () => await enumerator.MoveNextAsync(), Throws.TypeOf<OperationCanceledException>());
}
To make test compile and pass I added CancellationToken cancellationToken = default
to WordCounterService.GetWordCountUpdates
.
public async IAsyncEnumerable<WordCountUpdate> GetWordCountUpdates(string[] words, CancellationToken cancellationToken = default)
{
await foreach (var text in m_DataSource.GetData())
{
cancellationToken.ThrowIfCancellationRequested();
foreach (var word in words)
{
...
}
}
}
Done.
Testing progress reporting
I decided that progress will be reported after reading a line stream. In the real world, it can be something more sophisticated.
[Test]
public async Task GetWordCountUpdates_ReportsProgressForEachElementInStream()
{
var dataSource = new Mock<IDataSource>();
static async IAsyncEnumerable<string> StreamData()
{
yield return "foo";
yield return "bar";
yield return "baz";
await Task.CompletedTask;
}
dataSource.Setup(x => x.GetData()).Returns(StreamData);
var service = new WordCounterService(dataSource.Object);
var progressMock = new Mock<IProgress<int>>();
await foreach (var unused in service.GetWordCountUpdates(new[] { "foo" }, CancellationToken.None, progressMock.Object))
{
// do nothing
}
progressMock.Verify(x => x.Report(1), Times.Exactly(3));
}
To make it compile I added IProgress<int> progress
to WordCounterService.GetWordCountUpdates
public async IAsyncEnumerable<WordCountUpdate> GetWordCountUpdates(string[] words, CancellationToken cancellationToken = default, IProgress<int> progress = default)
{
await foreach (var text in m_DataSource.GetData())
{
cancellationToken.ThrowIfCancellationRequested();
progress?.Report(1);
foreach (var word in words)
{
var occurrencesCount = text.Split(' ', '.', ';').Count(x => x.Equals(word));
if (occurrencesCount > 0)
{
yield return new WordCountUpdate(word, occurrencesCount);
}
}
}
}
Putting it all together
Let’s read use real data. The first thing that came to mind is to count “war” and “peace” in the self-titled book by Leo Tolstoy. To do that I created an implementation of IDataSource that reads data from async API:
class FileDataSource : IDataSource
{
readonly string m_FileName;
public FileDataSource(string fileName)
{
m_FileName = fileName;
}
public async IAsyncEnumerable<string> GetData()
{
using var reader = File.OpenText(m_FileName);
string line = null;
while ((line = await reader.ReadLineAsync()) != null)
{
yield return line;
}
}
}
and the console program itself:
class Program
{
static async Task Main()
{
var wordsToCount = new[] { "war", "peace" };
var dataSource = new FileDataSource("WarAndPeace.txt");
var wordCountService = new WordCounterService(dataSource);
var warCount = 0;
var peaceCount = 0;
var linesProcessed = 0;
var progress = new Progress<int>(n => linesProcessed += n);
await foreach (var update in wordCountService.GetWordCountUpdates(wordsToCount, CancellationToken.None, progress))
{
if (update.Word == "war")
{
warCount += update.OccurrencesCount;
}
else
{
peaceCount += update.OccurrencesCount;
}
Console.SetCursorPosition(0, 0);
Console.WriteLine($"Lines processed: {linesProcessed} war: {warCount} peace {peaceCount}");
}
}
}
Summary
I was able to emulate all the scenarios I wanted: pure data, exceptions, cancelation, and even progress tracking. Everything is done in-memory and works super-fast.
The part that I enjoyed most is that I can define stream behavior like this:
static async IAsyncEnumerable<string> StreamData()
{
yield return "foo";
yield return "bar";
yield return "baz";
await Task.CompletedTask;
}
Don’t know about you, but when I see this sequence of yields, I immediately start to wonder about other use-cases: what if there is an exception or cancellation or empty string or multiple matches in the same line and so on. I can express every what-if by writing a test for each case. Task.From*
make this process effortless.
However, when I created a close to a real-word application that counts “war” and “peace” in the self-titled Leo Tolstoy book, I found a couple of cases I didn’t think about (mostly missed punctuation characters). I converted them to the unit tests.
Overall, playing with unit-testing Async Stream very was a very positive experience for me. I attribute this to the fact that .net async APIs were designed with testability in mind. I hope you’ll enjoy unit-testing your async application too.