using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
namespace System.Collections.Generic
{
///
/// Represents a first-in, first-out collection of objects.
///
/// Specifies the type of elements in the queue.
public class AsyncQueue
{
#region Fields
private readonly Queue queue;
private TaskCompletionSource dequeueTcs = new();
private TaskCompletionSource availableTcs = new();
#endregion Fields
#region Constructors
///
/// Initializes a new instance of the class that is empty and has the default initial capacity.
///
[ExcludeFromCodeCoverage]
public AsyncQueue()
{
queue = new Queue();
}
///
/// Initializes a new instance of the System.Collections.Generic.Queue`1 class that
/// contains elements copied from the specified collection and has sufficient capacity
/// to accommodate the number of elements copied.
///
/// The collection whose elements are copied to the new .
/// is null.
[ExcludeFromCodeCoverage]
public AsyncQueue(IEnumerable collection)
{
queue = new Queue();
Enqueue(collection);
}
///
/// Initializes a new instance of the class that is empty and has the specified initial capacity.
///
/// The initial number of elements that the can contain.
/// is less than zero.
[ExcludeFromCodeCoverage]
public AsyncQueue(int capacity)
{
queue = new Queue(capacity);
}
#endregion Constructors
#region Properties
///
/// Gets the number of elements contained in the .
///
[ExcludeFromCodeCoverage]
public int Count
{
get
{
lock (queue)
{
return queue.Count;
}
}
}
#endregion Properties
#region Queue implementation
///
/// Removes all objects from the .
///
[ExcludeFromCodeCoverage]
public void Clear()
{
lock (queue)
{
queue.Clear();
}
}
///
/// Determines whether an element is in the .
///
/// The object to locate in the . The value can be null for reference types.
/// true if item is found in the ; otherwise, false.
[ExcludeFromCodeCoverage]
public bool Contains(T item)
{
lock (queue)
{
return queue.Contains(item);
}
}
///
/// Copies the elements to an existing one-dimensional , starting at the specified array index.
///
/// The one-dimensional that is the destination of the elements copied from . The must have zero-based indexing.
/// The zero-based index in array at which copying begins.
/// is null.
/// is less than zero.
/// The number of elements in the source is greater than the available space from to the end of the destination array.
[ExcludeFromCodeCoverage]
public void CopyTo(T[] array, int arrayIndex)
{
lock (queue)
{
queue.CopyTo(array, arrayIndex);
}
}
///
/// Removes and returns the object at the beginning of the .
///
/// The object that is removed from the beginning of the .
/// The is empty.
[ExcludeFromCodeCoverage]
public T Dequeue()
{
lock (queue)
{
return queue.Dequeue();
}
}
///
/// Adds an object to the end of the .
///
/// The object to add to the . The value can be null for reference types.
public void Enqueue(T item)
{
lock (queue)
{
queue.Enqueue(item);
SetToken(dequeueTcs);
SetToken(availableTcs);
}
}
///
/// Returns the object at the beginning of the without removing it.
///
/// The object at the beginning of the .
/// The is empty.
[ExcludeFromCodeCoverage]
public T Peek()
{
lock (queue)
{
return queue.Peek();
}
}
///
/// Copies the elements to a new array.
///
/// A new array containing elements copied from the .
[ExcludeFromCodeCoverage]
public T[] ToArray()
{
lock (queue)
{
return queue.ToArray();
}
}
///
/// Sets the capacity to the actual number of elements in the , if that number is less than 90 percent of current capacity.
///
[ExcludeFromCodeCoverage]
public void TrimExcess()
{
lock (queue)
{
queue.TrimExcess();
}
}
#endregion Queue implementation
#region Async implementation
///
/// Removes and returns all available objects at the beginning of the .
///
/// The maximum number of objects to return. Zero means no limit.
/// A cancellation token used to propagate notification that this operation should be canceled.
/// The objects that are removed from the beginning of the .
public async Task DequeueAvailableAsync(int maxCount = 0, CancellationToken cancellationToken = default)
{
while (true)
{
TaskCompletionSource internalDequeueTcs;
lock (queue)
{
if (queue.Count > 0)
{
int count = queue.Count;
if (maxCount > 0 && count > maxCount)
count = maxCount;
var items = new T[count];
for (int i = 0; i < count; i++)
items[i] = queue.Dequeue();
return items;
}
internalDequeueTcs = ResetToken(ref dequeueTcs);
}
await WaitAsync(internalDequeueTcs, cancellationToken);
}
}
///
/// Removes and returns objects at the beginning of the .
///
/// The number of objects to return.
/// A cancellation token used to propagate notification that this operation should be canceled.
/// The objects that are removed from the beginning of the .
public async Task DequeueManyAsync(int count, CancellationToken cancellationToken = default)
{
if (count < 0)
throw new ArgumentOutOfRangeException(nameof(count));
while (true)
{
TaskCompletionSource internalDequeueTcs;
lock (queue)
{
if (count <= queue.Count)
{
var items = new T[count];
for (int i = 0; i < count; i++)
items[i] = queue.Dequeue();
return items;
}
internalDequeueTcs = ResetToken(ref dequeueTcs);
}
await WaitAsync(internalDequeueTcs, cancellationToken);
}
}
///
/// Removes and returns the object at the beginning of the .
///
/// A cancellation token used to propagate notification that this operation should be canceled.
/// The object that is removed from the beginning of the .
public async Task DequeueAsync(CancellationToken cancellationToken = default)
{
while (true)
{
TaskCompletionSource internalDequeueTcs;
lock (queue)
{
if (queue.Count > 0)
return queue.Dequeue();
internalDequeueTcs = ResetToken(ref dequeueTcs);
}
await WaitAsync(internalDequeueTcs, cancellationToken);
}
}
///
/// Waits asynchonously until at least one object is available in the .
///
/// A cancellation token used to propagate notification that this operation should be canceled.
/// An awaitable task.
public async Task WaitAsync(CancellationToken cancellationToken = default)
{
TaskCompletionSource internalAvailableTcs;
lock (queue)
{
if (queue.Count > 0)
return;
internalAvailableTcs = ResetToken(ref availableTcs);
}
await WaitAsync(internalAvailableTcs, cancellationToken);
}
#endregion Async implementation
#region Additional features
///
/// Removes the object at the beginning of the , and copies it to the parameter.
///
/// The removed object.
/// true if the object is successfully removed; false if the is empty.
public bool TryDequeue(out T result)
{
try
{
result = Dequeue();
return true;
}
catch
{
result = default;
return false;
}
}
///
/// Returns a value that indicates whether there is an object at the beginning of
/// the , and if one is present, copies it to the
/// parameter. The object is not removed from the .
///
/// If present, the object at the beginning of the ; otherwise, the default value of .
/// true if there is an object at the beginning of the ; false if the is empty.
public bool TryPeek(out T result)
{
try
{
result = Peek();
return true;
}
catch
{
result = default;
return false;
}
}
///
/// Removes the first occurrence of a specific object from the .
///
/// The object to remove from the . The value can be null for reference types.
/// true if item is successfully removed; otherwise, false. This method also returns false if item was not found in the .
public bool Remove(T item)
{
lock (queue)
{
var copy = new Queue(queue);
queue.Clear();
bool found = false;
int count = copy.Count;
for (int i = 0; i < count; i++)
{
var element = copy.Dequeue();
if (found)
{
queue.Enqueue(element);
continue;
}
if ((element == null && item == null) || element?.Equals(item) == true)
{
found = true;
continue;
}
queue.Enqueue(element);
}
return found;
}
}
///
/// Adds objects to the end of the .
///
/// The objects to add to the .
public void Enqueue(IEnumerable collection)
{
lock (queue)
{
bool hasElements = false;
foreach (var element in collection)
{
hasElements = true;
queue.Enqueue(element);
}
if (hasElements)
{
SetToken(dequeueTcs);
SetToken(availableTcs);
}
}
}
#endregion Additional features
#region Helper
[ExcludeFromCodeCoverage]
private static void SetToken(TaskCompletionSource tcs)
{
tcs.TrySetResult(true);
}
[ExcludeFromCodeCoverage]
private static TaskCompletionSource ResetToken(ref TaskCompletionSource tcs)
{
if (tcs.Task.IsCompleted)
{
tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
}
return tcs;
}
[ExcludeFromCodeCoverage]
private static async Task WaitAsync(TaskCompletionSource tcs, CancellationToken cancellationToken)
{
if (await Task.WhenAny(tcs.Task, Task.Delay(-1, cancellationToken)) == tcs.Task)
{
await tcs.Task;
return;
}
cancellationToken.ThrowIfCancellationRequested();
}
#endregion Helper
}
}