Parallel.ForEachAsync wasn't enough. So we built our own.
The four things every batched-I/O workload needs — scoped DI, retry policy, per-partition timeout, and partial-batch acknowledgement — none of which the BCL gives you in one place. Here's why we shipped Kanject.Core.Parallel.
We had a Lambda processing 5,000 SQS messages a minute. Each batch needed four things:
- A fresh DI scope per worker, so EF and DynamoDB clients didn’t leak state across messages.
- Retries on transient failures, with exponential backoff.
- A per-batch timeout, so one slow message wouldn’t consume the whole Lambda’s runtime.
- Partial-batch acknowledgement back to SQS, so successful messages got committed and only the failures rode through to the dead-letter queue.
Every BCL parallel primitive solves three of those four. None solves all four. So we built one that does.
The four-quadrant problem
Here’s why each of the obvious answers falls short:
Task.WhenAll(records.Select(...))— unbounded fan-out, no backpressure, allocates a closure per record, no scope-per-worker. One slow record holds up every aggregate metric.Parallel.ForEachAsync— better, but the body sees one item at a time. Batched I/O likeBatchWriteItemor S3 multi-object delete is impossible without re-batching by hand. No retry, no timeout, no per-callIServiceScope.- TPL Dataflow — heavyweight, awkward DI integration, ergonomics designed for pipeline composition rather than fan-out-and-aggregate.
- DIY
Channel<T>worker pool — every team rewrites the same scheduling, retry, and aggregation code, slightly differently, slightly buggily.
Declare the parallel intent on the method
The trick was to do for parallelism what we’d already done for recurring: put it on the method, not in startup wiring.
public sealed partial class OrderProcessor(IOrderClient client)
{
[Parallel(MaxDegreeOfParallelism = 8, RetryCount = 1, RetryBackoffMs = 250)]
public ValueTask ProcessAsync(Order order, CancellationToken ct)
=> client.ProcessAsync(order, ct);
}
// Generated extension method:
var result = await processor.ProcessAsyncParallelAsync(orders, cancellationToken: ct);
That’s it. The source generator detects the parameter shape (single T for per-item dispatch, ReadOnlyMemory<T> for chunk dispatch), emits an {Method}ParallelAsync over five source shapes (ReadOnlyMemory<T>, T[], IReadOnlyList<T>, IEnumerable<T>, IAsyncEnumerable<T>), and forwards your additional method parameters verbatim through tuple state to keep static lambdas free of captures.
The killer feature: SQS partial-batch hand-off
Here’s the pattern that pays for the entire library:
public sealed partial class OrderQueueConsumer : AbstractQueueConsumer<Order>
{
[Parallel(MaxDegreeOfParallelism = 8, RetryCount = 1, RetryBackoffMs = 250)]
private async ValueTask<bool> ProcessAsync(Order order, CancellationToken ct)
{
await OrderService.HandleAsync(order, ct);
return true;
}
protected override async Task ConsumeAsync(
List<MessageContext<Order>> messageContexts,
CancellationToken cancellationToken)
{
var inputs = messageContexts.Select(m => m.Message).ToArray();
var result = await ProcessAsyncParallelOutcomesAsync(
inputs, cancellationToken: cancellationToken);
// Successes → AcknowledgeAsync(messageContext)
// Failures → Response.BatchItemFailures
await result.AcknowledgeOrFailAsync(messageContexts, Response, AcknowledgeAsync);
}
}
{Method}ParallelOutcomesAsync returns a ParallelItemResult<TInput, TResult> where each entry carries the full per-item story — Index, Input, Value, Exception, Succeeded, TimedOut, Cancelled, RetryCount — in input order. We bridge that directly into the Lambda partial-batch response. Successes get acknowledged. Failures contribute a BatchItemFailure keyed by MessageContext.MessageId. SQS sees the right answer for each individual message in the batch.
The whole consumer-loop boilerplate goes from forty lines of manual try/catch and per-message commit logic to the shape above.
Chunk mode for batched I/O
Per-item dispatch is one shape. The other shape is when the body wants the whole partition slice at once — for BatchWriteItem, S3 multi-delete, or a SQL MERGE of N rows.
public sealed partial class OrderFlusher(IDb db)
{
[Parallel(
ChunkingMode = ParallelChunkingMode.FixedChunkSize,
ChunkSize = 25, // DynamoDB BatchWriteItem limit
MaxDegreeOfParallelism = 4,
ChunkTimeoutMs = 10_000,
StopOnException = false)]
public ValueTask<int> FlushAsync(ReadOnlyMemory<Order> chunk, CancellationToken ct)
=> db.BatchInsertAsync(chunk, ct);
}
The slice is a ReadOnlyMemory<Order> view over the source array — no copy, no allocation. The engine handles the chunking, the timeout, and the per-chunk retry. Your body just runs the I/O.
What we explicitly didn’t do
- No
Parallel.ForEachAsyncunderneath. The BCL primitive doesn’t expose chunking, doesn’t carry retry/timeout policy, and doesn’t let us inject per-partition scope. We reimplemented from primitives for full control. - No
SemaphoreSlimwhen partition count fits withinMaxDOP. Each partition is its own Task and races freely. The semaphore only appears on theFixedChunkSizepath with large inputs where we genuinely need to throttle. - No closures in the hot path. The chunk-mode void overload routes through tuple state so the static lambda has zero captures. This is the same trick
RecurringLoopuses for itsVoidValueadapter — it survives Stephen Toub’s allocation profiler. - No reduce delegate in v1. Callers fold
result.Valuesthemselves. Adding aReducewould force a genericTAccumulatorparameter on every overload. We’ll add a siblingReduceAsyncfamily in the next phase if the demand is real.
Compile-time safety
A Roslyn analyzer surfaces generator limitations at author time, so you never wonder why an extension method didn’t appear:
| ID | Triggered when |
|---|---|
KANPAR001 | Annotated method doesn’t accept either a single T or a ReadOnlyMemory<T> partition |
KANPAR002 | MaxDegreeOfParallelism < 0 |
KANPAR005 | RetryBackoffMs > 0 with RetryCount = 0 (no retries to back off from) |
KANPAR011 | Instance [Parallel] method on a non-partial containing type |
KANPAR012 | Unsupported return type |
Ten more cover the rest. If the analyzer is happy, the generated code is too.
Why we shipped this
We could have lived without it. We did, for a long time. We had three different teams each maintaining their own version of “spawn N workers, retry transient failures, time out the slow ones, return per-item outcomes.” Three versions, slightly different, slightly buggy.
Now there’s one. It compiles into your assembly, generates the right typed signatures, and the call site is await whatever.YourMethodParallelAsync(items, ct: ct).
If you’re shipping [Parallel]-able workloads on the Kanject stack, this is in Kanject.Core.Annotations. If you’re not on Kanject yet but you want the pattern, the principle ports cleanly: declare the parallel intent on the method, generate the typed extension, route partial outcomes back to whatever wants them.
Cadence belongs on the method. Parallelism belongs on the method. The call site should be one line.