I recently had the pleasure to work on a feature that resulted in me inflicting the Thundering Herd problem on a Web Service at work. This article goes over, how I found out the issue, and how we wrote the client to tackle the issue and reduce the execution time by 85% of the program. Also, I had great help from a Senior Engineer who knew off the bat, but let me come to the conclusions on my own.
The feature was basically a glorified version of the following stuff:
The code for traversing the File System is pretty optimized for taking advantage of concurrency, the string manipulations operations are also embarrassingly parallel, and I am super happy with the performance on those sections.
However, the ingestion into the external service is showing each API call of the hundreds of calls taking upwards of 10 minutes. It's late and I shoot a message letting my senior know I'm done on my side, but the external service is just not able to handle the incoming requests, and the usual 2-10 secs API response time, is now showing 5-10 minutes.
We find the root cause for increased latency being the storming of concurrent requests on the external service resulting a downstream dependency for the service throttling it. My Http Client has all the basics setup including retry backoff, and timeouts. However, the way I decided to ingest stuff was to kick-off all requests asynchronously, and then await the completion of all requests. In my head the backoff policy would introduce the appropriate jitter and the dotnet runtime would handle appropriate thread scheduling that would create a sort of a natural rate limiter.
After looking at the graph for requests, and latency and it seemed like the initial burst of requests was causing the Thundering Herd problem on the external service, and even with the jitter the concurrent requests for this API were leaving the service dysfunctional.
At this point we realized we needed to slow down the rate at which we make our requests to this service, the external service does not give us the ability to do a smarter or retry, and by the time we get our "Too Many Requests" too many requests have already been scheduled. The brute force way of doing this was to batch limit the requests in a loop, and await the completion of these requests but this was introducing the "head of line" blocking request, where the new batch of request would not be kicked off till the slowest request from the previous request was not completed. In this route the previous batches block new batches despite there being more than enough capacity in concurrent operations we want to perform. This is too slow, and an ideal solution should not be wasting any slots of concurrent tasks under the set limit.
Taking on a dependency on any external package was not an option since we wanted to have finer grained control over this part of the code, the non-functional requirement is to achieve the shortest possible execution time.
This is where I got to write our own Concurrency Throttler that does not suffer from the head of line blocking of the brute force way. We also ended up using this Throttler when interacting with this external service in other parts of this program as well, and not just for ingestion.
The way the Concurrency Throttler works is it supports 2 basic methods, and a constructor that lets you set an upper limit on max concurrency.
ControlledConcurrentTaskExecutor(int maxConcurrency); Task EnqueueAsync(Task t); Task AwaitAll();
Under the hood we use a Semaphore to control how many concurrent tasks exist at any given point. When we enqueue tasks, the semaphore blocks only if there is no more space in existing concurrent tasks ongoing, if there is space then we proceed without blocking. Once we are unblocked (semaphore has enough space, and it decrements it under the hood), we wrap each task into a callback lambda such that upon its completion it will increment the semaphore to signal other blocking threads of completion. This way we can do:
await EnqueueAsnc(fooBar);
and the above call will block till there is enough space, otherwise it will enqueue and proceed.
Internally we store all tasks in a List<Task> which we can perform an "await all" on. The AwaitAll operation also takes care of thread safety, and handling clearing of tasks, so you can reuse the Throttler as needed.
With just the introduction of 69 lines of code, we had the ability to control the max concurrency, the Backoff policy was now able to handle introducing appropriate Jitter into the calls being made, and the requests were completing in under 5 seconds. By slowing the rate of requests we were making, we were able to reduce the programs total execution time from ~15 minutes to sitting at ~2 minutes.
A rough version of the concurrency throttler C# code can be seen below:
/* * Wrapper around controlled concurrency / throttling on task execution. * Throttles the ongoing maximum tasks to maxConcurrency * set during construction of object */ public class ControlledConcurrentTaskExecutor { // since we don't need system wide control we can use lightweight substitute // instead of actual semaphore private SemaphoreSlim _semaphore; private List<Task> _runningTasks; private object _lock = new object(); public ControlledConcurrentTaskExecutor(int maxConcurrency) { this._semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency); this._runningTasks = new List<Task>(); } // blocks when we have no capacity but lets you continue the second we do! public async Task Enqueue(Task task) { // block if we have no capacity await this._semaphore.WaitAsync(); // wrap the task just so we can enqure semaphore release Task runningTask = Task.Run(async () => { try { await task; } finally { this._semaphore.Release(); } }); // thread safety when updating the list lock (this._lock) { this._runningTasks.Add(runningTask); } } public async Task WaitAllTasksAsync() { List<Task> currentTasks; // defensive programming approach to ensure that the list used for waiting // on tasks remains stable during the waiting process. lock (this._lock) { currentTasks = new List<Task>(this._runningTasks); } await Task.WhenAll(currentTasks); //thread safety when clearing the list lock (this._lock) { this._runningTasks.RemoveAll(task => task.IsCompleted); } }