using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; namespace System.Collections.Generic { /// /// Represents a first-in, first-out collection of objects. /// /// Specifies the type of elements in the queue. public class AsyncQueue { #region Fields private readonly Queue queue; private TaskCompletionSource dequeueTcs = new(); private TaskCompletionSource availableTcs = new(); #endregion Fields #region Constructors /// /// Initializes a new instance of the class that is empty and has the default initial capacity. /// [ExcludeFromCodeCoverage] public AsyncQueue() { queue = new Queue(); } /// /// Initializes a new instance of the System.Collections.Generic.Queue`1 class that /// contains elements copied from the specified collection and has sufficient capacity /// to accommodate the number of elements copied. /// /// The collection whose elements are copied to the new . /// is null. [ExcludeFromCodeCoverage] public AsyncQueue(IEnumerable collection) { queue = new Queue(); Enqueue(collection); } /// /// Initializes a new instance of the class that is empty and has the specified initial capacity. /// /// The initial number of elements that the can contain. /// is less than zero. [ExcludeFromCodeCoverage] public AsyncQueue(int capacity) { queue = new Queue(capacity); } #endregion Constructors #region Properties /// /// Gets the number of elements contained in the . /// [ExcludeFromCodeCoverage] public int Count { get { lock (queue) { return queue.Count; } } } #endregion Properties #region Queue implementation /// /// Removes all objects from the . /// [ExcludeFromCodeCoverage] public void Clear() { lock (queue) { queue.Clear(); } } /// /// Determines whether an element is in the . /// /// The object to locate in the . The value can be null for reference types. /// true if item is found in the ; otherwise, false. [ExcludeFromCodeCoverage] public bool Contains(T item) { lock (queue) { return queue.Contains(item); } } /// /// Copies the elements to an existing one-dimensional , starting at the specified array index. /// /// The one-dimensional that is the destination of the elements copied from . The must have zero-based indexing. /// The zero-based index in array at which copying begins. /// is null. /// is less than zero. /// The number of elements in the source is greater than the available space from to the end of the destination array. [ExcludeFromCodeCoverage] public void CopyTo(T[] array, int arrayIndex) { lock (queue) { queue.CopyTo(array, arrayIndex); } } /// /// Removes and returns the object at the beginning of the . /// /// The object that is removed from the beginning of the . /// The is empty. [ExcludeFromCodeCoverage] public T Dequeue() { lock (queue) { return queue.Dequeue(); } } /// /// Adds an object to the end of the . /// /// The object to add to the . The value can be null for reference types. public void Enqueue(T item) { lock (queue) { queue.Enqueue(item); SetToken(dequeueTcs); SetToken(availableTcs); } } /// /// Returns the object at the beginning of the without removing it. /// /// The object at the beginning of the . /// The is empty. [ExcludeFromCodeCoverage] public T Peek() { lock (queue) { return queue.Peek(); } } /// /// Copies the elements to a new array. /// /// A new array containing elements copied from the . [ExcludeFromCodeCoverage] public T[] ToArray() { lock (queue) { return queue.ToArray(); } } /// /// Sets the capacity to the actual number of elements in the , if that number is less than 90 percent of current capacity. /// [ExcludeFromCodeCoverage] public void TrimExcess() { lock (queue) { queue.TrimExcess(); } } #endregion Queue implementation #region Async implementation /// /// Removes and returns all available objects at the beginning of the . /// /// The maximum number of objects to return. Zero means no limit. /// A cancellation token used to propagate notification that this operation should be canceled. /// The objects that are removed from the beginning of the . public async Task DequeueAvailableAsync(int maxCount = 0, CancellationToken cancellationToken = default) { while (true) { TaskCompletionSource internalDequeueTcs; lock (queue) { if (queue.Count > 0) { int count = queue.Count; if (maxCount > 0 && count > maxCount) count = maxCount; var items = new T[count]; for (int i = 0; i < count; i++) items[i] = queue.Dequeue(); return items; } internalDequeueTcs = ResetToken(ref dequeueTcs); } await WaitAsync(internalDequeueTcs, cancellationToken); } } /// /// Removes and returns objects at the beginning of the . /// /// The number of objects to return. /// A cancellation token used to propagate notification that this operation should be canceled. /// The objects that are removed from the beginning of the . public async Task DequeueManyAsync(int count, CancellationToken cancellationToken = default) { if (count < 0) throw new ArgumentOutOfRangeException(nameof(count)); while (true) { TaskCompletionSource internalDequeueTcs; lock (queue) { if (count <= queue.Count) { var items = new T[count]; for (int i = 0; i < count; i++) items[i] = queue.Dequeue(); return items; } internalDequeueTcs = ResetToken(ref dequeueTcs); } await WaitAsync(internalDequeueTcs, cancellationToken); } } /// /// Removes and returns the object at the beginning of the . /// /// A cancellation token used to propagate notification that this operation should be canceled. /// The object that is removed from the beginning of the . public async Task DequeueAsync(CancellationToken cancellationToken = default) { while (true) { TaskCompletionSource internalDequeueTcs; lock (queue) { if (queue.Count > 0) return queue.Dequeue(); internalDequeueTcs = ResetToken(ref dequeueTcs); } await WaitAsync(internalDequeueTcs, cancellationToken); } } /// /// Waits asynchonously until at least one object is available in the . /// /// A cancellation token used to propagate notification that this operation should be canceled. /// An awaitable task. public async Task WaitAsync(CancellationToken cancellationToken = default) { TaskCompletionSource internalAvailableTcs; lock (queue) { if (queue.Count > 0) return; internalAvailableTcs = ResetToken(ref availableTcs); } await WaitAsync(internalAvailableTcs, cancellationToken); } #endregion Async implementation #region Additional features /// /// Removes the object at the beginning of the , and copies it to the parameter. /// /// The removed object. /// true if the object is successfully removed; false if the is empty. public bool TryDequeue(out T result) { try { result = Dequeue(); return true; } catch { result = default; return false; } } /// /// Returns a value that indicates whether there is an object at the beginning of /// the , and if one is present, copies it to the /// parameter. The object is not removed from the . /// /// If present, the object at the beginning of the ; otherwise, the default value of . /// true if there is an object at the beginning of the ; false if the is empty. public bool TryPeek(out T result) { try { result = Peek(); return true; } catch { result = default; return false; } } /// /// Removes the first occurrence of a specific object from the . /// /// The object to remove from the . The value can be null for reference types. /// true if item is successfully removed; otherwise, false. This method also returns false if item was not found in the . public bool Remove(T item) { lock (queue) { var copy = new Queue(queue); queue.Clear(); bool found = false; int count = copy.Count; for (int i = 0; i < count; i++) { var element = copy.Dequeue(); if (found) { queue.Enqueue(element); continue; } if ((element == null && item == null) || element?.Equals(item) == true) { found = true; continue; } queue.Enqueue(element); } return found; } } /// /// Adds objects to the end of the . /// /// The objects to add to the . public void Enqueue(IEnumerable collection) { lock (queue) { bool hasElements = false; foreach (var element in collection) { hasElements = true; queue.Enqueue(element); } if (hasElements) { SetToken(dequeueTcs); SetToken(availableTcs); } } } #endregion Additional features #region Helper [ExcludeFromCodeCoverage] private static void SetToken(TaskCompletionSource tcs) { tcs.TrySetResult(true); } [ExcludeFromCodeCoverage] private static TaskCompletionSource ResetToken(ref TaskCompletionSource tcs) { if (tcs.Task.IsCompleted) { tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); } return tcs; } [ExcludeFromCodeCoverage] private static async Task WaitAsync(TaskCompletionSource tcs, CancellationToken cancellationToken) { if (await Task.WhenAny(tcs.Task, Task.Delay(-1, cancellationToken)) == tcs.Task) { await tcs.Task; return; } cancellationToken.ThrowIfCancellationRequested(); } #endregion Helper } }