Recently I'm working on the WinUI 3 version of Pixeval, as I'm implementing the downloader, a problem arises into my mind: The concurrency degree(aka parallelism) must be limited or the computer resources maybe exhausted, which means I need a producer-consumer structure to manage the queue of the download task. a BlockingCollection<T> is good except it is synchronized and I believe it's not a good idea to create a long-running Task<T> on a dedicated thread just to polling the task queue and download them whenever they are possible, THREADS ARE VALUEABLE RESOURCES, we should avoid the abuse of them.
  The good thing is, a download mission is typically an IO-Bound task which means they can be put on the Overlapped IO to asynchronized without the overhead of creating a new thread just to waiting for its completion, the only question left is how to implement an asynchronous producer-consumer queue.
  The first thing that comes into my mind is System.Threading.Channels, they are out-of-the-box async producer-consumer models with reliable performance, but since what I'm demanding is not only a producer consumer model, but also requires to maintain a fixed length buffer(the concurrency degree), and, the Channel<T> itself maybe ineligible for this mission.
  Before we take a step into the realm of Task Awaiters, let's clear our mind first: what we need is an atomic counter, bearing the count of the working tasks, it can be up to the max concurrency degree or down to 0, this can be easily implemented by using Interlocked.Increment(ref Int32) and Interlocked.Decrement(ref Int32); and a suspendable loop who shall schedule the tasks continuously, when the working task counter reaches its maximum value, the loop will be suspended, and when the counter drops the loop will be resumed.
  Apparently, our demand requires the cooperation between multiple procedures: one to put tasks to the Channel<T>, the other one, in contrast, is responsible for polling the tasks out in a suspendable and non-blocking loop.
  To write a procedure to put tasks into the channel is quite easy, just call the ChannelWriter<T>.WriteAsync(T) method:

void QueueTask(IDownloadTask task)
{
    _channel.Writer.WriteAsync(task);
}

and the second part will be far more challenging, as we all know the common read pattern for Channel<T> is something like (you can find numbers of its variations at Introduction to System.Threading.Channels):

while (await _channel.Reader.WaitToReadAsync())
{
    while (_channel.Reader.TryRead(out var item))
    {
        Download(item);
    }
}

as we have stated before, the inner loop needs to be suspended when the counter hits its maximum value, and resumes when it drops; but how to do this, exactly?
  A TaskCompletionSource<T> maybe good, except it can only be used for one time, you cannot revert its state after the completion, so we have to alter the whole object in the loop with something like:

while (await _channel.Reader.WaitToReadAsync())
{
    while (_channel.Reader.TryRead(out var item))
    {
        await taskCompletionSource.Task;
        taskCompletionSource = new TaskCompletionSource();
        Download(item);
    }
}

Not so bad! Except the huge object allocation when the download tasks might be massive, to avoid this, we need to recall "how the await works".
  The await doesn't rely on Task<T>, many of us may have forgotten it. it relies on a duck-typing system that allows any object with a GetAwaiter method which returns an object who implements INotifyCompletion and maintains IsCompleted, OnCompleted(Action), and GetResult() to be awaited, which gives us chance to custom one and reuse it——yes, although task awaiters are mostly one-time consumables, they can still be reused to achieve our goal. Since out task awaiter won't contain a value, it will looks like:

public class ReenterableAwaiter : INotifyCompletion
{
    public bool IsCompleted { get; set; }

    public void OnCompleted(Action continuation)
    {
        // ...
    }

    public void GetResult()
    {
        // ...
    }

    public ReenterableAwaiter GetAawaiter()
    {
        return this;
    }
}

Since the awaiter is designed to be used multiple times, the continuations must be stored in somewhere so that we can invoke it later when the awaiter receives signals to complete(just like SetResult in TaskCompletionSource), and so do exceptions, we also need two methods to signal the awaiter to complete successfully and unsuccessfully respectively, and a reset method is required beyond all we've mentioned because we must reset all of its states to initial to reuse it again. The class now should be like:

public class ReenterableAwaiter : INotifyCompletion
{
    private Action? _continuation;
    private Exception? _exception;
    private bool _continueOnCapturedContext = true; // whether the continuation should be posted to the captured SynchronizationContext

    public bool IsCompleted { get; set; }

    public void Reset()
    {
        IsCompleted = false; // Set the awaiter to non-completed
        _continuation = null;
        _exception = null;
    }

    public void OnCompleted(Action continuation)
    {
        // Stores the continuation
        // If your awaiter is intended to be used across multiple
        // task boundaries, you can use a thread-safe collection
        // to hold all the continuations
        _continuation = continuation;
    }

    public void GetResult()
    {
        if (_exception is not null)
        {
            throw _exception;
        }
    }

    // Signals the awaiter to complete successfully
    public void SetResult()
    {
        if (!IsCompleted)
        {
            IsCompleted = true;
            CompleteInternal();
        }
    }

    // Signals the awaiter to complete unsuccessfully
    public void SetException(Exception exception)
    {
        if (!IsCompleted)
        {
            IsCompleted = true;
            _exception = exception;
            CompleteInternal();
        }
    }

    // Queue the continuation to SynchronizationContext.Current
    // if _continueOnCapturedContext is true, otherwise schedule
    // it on the default TaskScheduler
    private void CompleteInternal()
    {
        if (_continuation is null)
        {
            return;
        }

        if (_continueOnCapturedContext && SynchronizationContext.Current is { } context)
        {
            context.Post(cont => (cont as Action)?.Invoke(), _continuation);
        }
        else
        {
            Task.Factory.StartNew(_continuation, CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default);
        }
    }

    public ReenterableAwaiter ConfigureAwait(bool continueOnCapturedContext)
    {
        _continueOnCapturedContext = continueOnCapturedContext;
        return this;
    }

    public ReenterableAwaiter GetAwaiter()
    {
        return this;
    }
}

Now, we can create a suspendable loop using this ReenterableAwaiter:

async void PollTask()
{
    while (await _channel.Reader.WaitToReadAsync())
    {
        while (_channel.Reader.TryRead(out var item))
        {
            await _reenterableAwaiter; // _reenterableAwaiter is a class field
            Download(item);
        }
    }
}

Notice how we "instrumented" a suspension point in the loop at line 7.

and our Download(IDownloadTask) method who will increase and decrease the counter at task queued/completed, and it is also responsible for notifying the _reenterableAwaiter when counter decreases:

async void Download(IDownloadTask task)
{
    // Suppose _counter is the working task counter and MaxParallelism
    // is the maximum concurrency degree
    if (Interlocked.Increment(ref _counter) == MaxParallelism)
    {
        _reenterableAwaiter.Reset();
    }
    // Perform the download...
    Interlocked.Decrement(ref _counter);
    // notice the PollTask that we're ready for new tasks
    _reenterableAwaiter.SetResult();
}

We Choose to Go to the Moon