← All posts

Parallel.ForEachAsync wasn't enough. So we built our own.

The four things every batched-I/O workload needs — scoped DI, retry policy, per-partition timeout, and partial-batch acknowledgement — none of which the BCL gives you in one place. Here's why we shipped Kanject.Core.Parallel.

We had a Lambda processing 5,000 SQS messages a minute. Each batch needed four things:

  1. A fresh DI scope per worker, so EF and DynamoDB clients didn’t leak state across messages.
  2. Retries on transient failures, with exponential backoff.
  3. A per-batch timeout, so one slow message wouldn’t consume the whole Lambda’s runtime.
  4. Partial-batch acknowledgement back to SQS, so successful messages got committed and only the failures rode through to the dead-letter queue.

Every BCL parallel primitive solves three of those four. None solves all four. So we built one that does.

The four-quadrant problem

Here’s why each of the obvious answers falls short:

  • Task.WhenAll(records.Select(...)) — unbounded fan-out, no backpressure, allocates a closure per record, no scope-per-worker. One slow record holds up every aggregate metric.
  • Parallel.ForEachAsync — better, but the body sees one item at a time. Batched I/O like BatchWriteItem or S3 multi-object delete is impossible without re-batching by hand. No retry, no timeout, no per-call IServiceScope.
  • TPL Dataflow — heavyweight, awkward DI integration, ergonomics designed for pipeline composition rather than fan-out-and-aggregate.
  • DIY Channel<T> worker pool — every team rewrites the same scheduling, retry, and aggregation code, slightly differently, slightly buggily.

Declare the parallel intent on the method

The trick was to do for parallelism what we’d already done for recurring: put it on the method, not in startup wiring.

public sealed partial class OrderProcessor(IOrderClient client)
{
    [Parallel(MaxDegreeOfParallelism = 8, RetryCount = 1, RetryBackoffMs = 250)]
    public ValueTask ProcessAsync(Order order, CancellationToken ct)
        => client.ProcessAsync(order, ct);
}

// Generated extension method:
var result = await processor.ProcessAsyncParallelAsync(orders, cancellationToken: ct);

That’s it. The source generator detects the parameter shape (single T for per-item dispatch, ReadOnlyMemory<T> for chunk dispatch), emits an {Method}ParallelAsync over five source shapes (ReadOnlyMemory<T>, T[], IReadOnlyList<T>, IEnumerable<T>, IAsyncEnumerable<T>), and forwards your additional method parameters verbatim through tuple state to keep static lambdas free of captures.

The killer feature: SQS partial-batch hand-off

Here’s the pattern that pays for the entire library:

public sealed partial class OrderQueueConsumer : AbstractQueueConsumer<Order>
{
    [Parallel(MaxDegreeOfParallelism = 8, RetryCount = 1, RetryBackoffMs = 250)]
    private async ValueTask<bool> ProcessAsync(Order order, CancellationToken ct)
    {
        await OrderService.HandleAsync(order, ct);
        return true;
    }

    protected override async Task ConsumeAsync(
        List<MessageContext<Order>> messageContexts,
        CancellationToken cancellationToken)
    {
        var inputs = messageContexts.Select(m => m.Message).ToArray();

        var result = await ProcessAsyncParallelOutcomesAsync(
            inputs, cancellationToken: cancellationToken);

        // Successes → AcknowledgeAsync(messageContext)
        // Failures  → Response.BatchItemFailures
        await result.AcknowledgeOrFailAsync(messageContexts, Response, AcknowledgeAsync);
    }
}

{Method}ParallelOutcomesAsync returns a ParallelItemResult<TInput, TResult> where each entry carries the full per-item story — Index, Input, Value, Exception, Succeeded, TimedOut, Cancelled, RetryCount — in input order. We bridge that directly into the Lambda partial-batch response. Successes get acknowledged. Failures contribute a BatchItemFailure keyed by MessageContext.MessageId. SQS sees the right answer for each individual message in the batch.

The whole consumer-loop boilerplate goes from forty lines of manual try/catch and per-message commit logic to the shape above.

Chunk mode for batched I/O

Per-item dispatch is one shape. The other shape is when the body wants the whole partition slice at once — for BatchWriteItem, S3 multi-delete, or a SQL MERGE of N rows.

public sealed partial class OrderFlusher(IDb db)
{
    [Parallel(
        ChunkingMode = ParallelChunkingMode.FixedChunkSize,
        ChunkSize = 25,                       // DynamoDB BatchWriteItem limit
        MaxDegreeOfParallelism = 4,
        ChunkTimeoutMs = 10_000,
        StopOnException = false)]
    public ValueTask<int> FlushAsync(ReadOnlyMemory<Order> chunk, CancellationToken ct)
        => db.BatchInsertAsync(chunk, ct);
}

The slice is a ReadOnlyMemory<Order> view over the source array — no copy, no allocation. The engine handles the chunking, the timeout, and the per-chunk retry. Your body just runs the I/O.

What we explicitly didn’t do

  • No Parallel.ForEachAsync underneath. The BCL primitive doesn’t expose chunking, doesn’t carry retry/timeout policy, and doesn’t let us inject per-partition scope. We reimplemented from primitives for full control.
  • No SemaphoreSlim when partition count fits within MaxDOP. Each partition is its own Task and races freely. The semaphore only appears on the FixedChunkSize path with large inputs where we genuinely need to throttle.
  • No closures in the hot path. The chunk-mode void overload routes through tuple state so the static lambda has zero captures. This is the same trick RecurringLoop uses for its VoidValue adapter — it survives Stephen Toub’s allocation profiler.
  • No reduce delegate in v1. Callers fold result.Values themselves. Adding a Reduce would force a generic TAccumulator parameter on every overload. We’ll add a sibling ReduceAsync family in the next phase if the demand is real.

Compile-time safety

A Roslyn analyzer surfaces generator limitations at author time, so you never wonder why an extension method didn’t appear:

IDTriggered when
KANPAR001Annotated method doesn’t accept either a single T or a ReadOnlyMemory<T> partition
KANPAR002MaxDegreeOfParallelism < 0
KANPAR005RetryBackoffMs > 0 with RetryCount = 0 (no retries to back off from)
KANPAR011Instance [Parallel] method on a non-partial containing type
KANPAR012Unsupported return type

Ten more cover the rest. If the analyzer is happy, the generated code is too.

Why we shipped this

We could have lived without it. We did, for a long time. We had three different teams each maintaining their own version of “spawn N workers, retry transient failures, time out the slow ones, return per-item outcomes.” Three versions, slightly different, slightly buggy.

Now there’s one. It compiles into your assembly, generates the right typed signatures, and the call site is await whatever.YourMethodParallelAsync(items, ct: ct).

If you’re shipping [Parallel]-able workloads on the Kanject stack, this is in Kanject.Core.Annotations. If you’re not on Kanject yet but you want the pattern, the principle ports cleanly: declare the parallel intent on the method, generate the typed extension, route partial outcomes back to whatever wants them.

Cadence belongs on the method. Parallelism belongs on the method. The call site should be one line.

More field notes from the team.

Engineering March 18, 2026

How we ship source generators that survive refactors

Lessons from two years of building Roslyn generators that don't silently break when downstream code moves.

Engineering February 22, 2026

Cadence belongs on the method, not in startup

Why we built drift-corrected, attribute-driven recurring tasks with three call-site overloads — and why we don't think you should reach for Timer, PeriodicTimer, or BackgroundService directly anymore.

Engineering February 5, 2026

From Console.WriteLine to ILogger without changing a single call site

Developers love Console.WriteLine. Production hates it. We built a compile-time severity inference + interceptor system so the same call sites route through ILogger when you ship — zero refactoring.