From 6e6e9d975a14200eee8d50fea1373454f7b69bfa Mon Sep 17 00:00:00 2001 From: Andreas Mueller Date: Sat, 13 Aug 2022 15:22:49 +0200 Subject: [PATCH] Added AsyncQueue --- AMWD.Common/Utilities/AsyncQueue.cs | 437 ++++++++++++++++++ CHANGELOG.md | 4 +- UnitTests/Common/Utilities/AsyncQueueTests.cs | 355 ++++++++++++++ 3 files changed, 795 insertions(+), 1 deletion(-) create mode 100644 AMWD.Common/Utilities/AsyncQueue.cs create mode 100644 UnitTests/Common/Utilities/AsyncQueueTests.cs diff --git a/AMWD.Common/Utilities/AsyncQueue.cs b/AMWD.Common/Utilities/AsyncQueue.cs new file mode 100644 index 0000000..ce70024 --- /dev/null +++ b/AMWD.Common/Utilities/AsyncQueue.cs @@ -0,0 +1,437 @@ +using System.Diagnostics.CodeAnalysis; +using System.Threading; +using System.Threading.Tasks; + +namespace System.Collections.Generic +{ + /// + /// Represents a first-in, first-out collection of objects. + /// + /// Specifies the type of elements in the queue. + public class AsyncQueue + { + #region Fields + + private readonly Queue queue; + + private TaskCompletionSource dequeueTcs = new(); + private TaskCompletionSource availableTcs = new(); + + #endregion Fields + + #region Constructors + + /// + /// Initializes a new instance of the class that is empty and has the default initial capacity. + /// + [ExcludeFromCodeCoverage] + public AsyncQueue() + { + queue = new Queue(); + } + + /// + /// Initializes a new instance of the System.Collections.Generic.Queue`1 class that + /// contains elements copied from the specified collection and has sufficient capacity + /// to accommodate the number of elements copied. + /// + /// The collection whose elements are copied to the new . + /// is null. + [ExcludeFromCodeCoverage] + public AsyncQueue(IEnumerable collection) + { + queue = new Queue(); + Enqueue(collection); + } + + /// + /// Initializes a new instance of the class that is empty and has the specified initial capacity. + /// + /// The initial number of elements that the can contain. + /// is less than zero. + [ExcludeFromCodeCoverage] + public AsyncQueue(int capacity) + { + queue = new Queue(capacity); + } + + #endregion Constructors + + #region Properties + + /// + /// Gets the number of elements contained in the . + /// + [ExcludeFromCodeCoverage] + public int Count + { + get + { + lock (queue) + { + return queue.Count; + } + } + } + + #endregion Properties + + #region Queue implementation + + /// + /// Removes all objects from the . + /// + [ExcludeFromCodeCoverage] + public void Clear() + { + lock (queue) + { + queue.Clear(); + } + } + + /// + /// Determines whether an element is in the . + /// + /// The object to locate in the . The value can be null for reference types. + /// true if item is found in the ; otherwise, false. + [ExcludeFromCodeCoverage] + public bool Contains(T item) + { + lock (queue) + { + return queue.Contains(item); + } + } + + /// + /// Copies the elements to an existing one-dimensional , starting at the specified array index. + /// + /// The one-dimensional that is the destination of the elements copied from . The must have zero-based indexing. + /// The zero-based index in array at which copying begins. + /// is null. + /// is less than zero. + /// The number of elements in the source is greater than the available space from to the end of the destination array. + [ExcludeFromCodeCoverage] + public void CopyTo(T[] array, int arrayIndex) + { + lock (queue) + { + queue.CopyTo(array, arrayIndex); + } + } + + /// + /// Removes and returns the object at the beginning of the . + /// + /// The object that is removed from the beginning of the . + /// The is empty. + [ExcludeFromCodeCoverage] + public T Dequeue() + { + lock (queue) + { + return queue.Dequeue(); + } + } + + /// + /// Adds an object to the end of the . + /// + /// The object to add to the . The value can be null for reference types. + public void Enqueue(T item) + { + lock (queue) + { + queue.Enqueue(item); + SetToken(dequeueTcs); + SetToken(availableTcs); + } + } + + /// + /// Returns the object at the beginning of the without removing it. + /// + /// The object at the beginning of the . + /// The is empty. + [ExcludeFromCodeCoverage] + public T Peek() + { + lock (queue) + { + return queue.Peek(); + } + } + + /// + /// Copies the elements to a new array. + /// + /// A new array containing elements copied from the . + [ExcludeFromCodeCoverage] + public T[] ToArray() + { + lock (queue) + { + return queue.ToArray(); + } + } + + /// + /// Sets the capacity to the actual number of elements in the , if that number is less than 90 percent of current capacity. + /// + [ExcludeFromCodeCoverage] + public void TrimExcess() + { + lock (queue) + { + queue.TrimExcess(); + } + } + + #endregion Queue implementation + + #region Async implementation + + /// + /// Removes and returns all available objects at the beginning of the . + /// + /// The maximum number of objects to return. Zero means no limit. + /// A cancellation token used to propagate notification that this operation should be canceled. + /// The objects that are removed from the beginning of the . + public async Task DequeueAvailableAsync(int maxCount = 0, CancellationToken cancellationToken = default) + { + while (true) + { + TaskCompletionSource internalDequeueTcs; + lock (queue) + { + if (queue.Count > 0) + { + int count = queue.Count; + if (maxCount > 0 && count > maxCount) + count = maxCount; + + var items = new T[count]; + for (int i = 0; i < count; i++) + items[i] = queue.Dequeue(); + + return items; + } + internalDequeueTcs = ResetToken(ref dequeueTcs); + } + + await WaitAsync(internalDequeueTcs, cancellationToken); + } + } + + /// + /// Removes and returns objects at the beginning of the . + /// + /// The number of objects to return. + /// A cancellation token used to propagate notification that this operation should be canceled. + /// The objects that are removed from the beginning of the . + public async Task DequeueManyAsync(int count, CancellationToken cancellationToken = default) + { + if (count < 0) + throw new ArgumentOutOfRangeException(nameof(count)); + + while (true) + { + TaskCompletionSource internalDequeueTcs; + lock (queue) + { + if (count <= queue.Count) + { + var items = new T[count]; + for (int i = 0; i < count; i++) + items[i] = queue.Dequeue(); + + return items; + } + internalDequeueTcs = ResetToken(ref dequeueTcs); + } + + await WaitAsync(internalDequeueTcs, cancellationToken); + } + } + + /// + /// Removes and returns the object at the beginning of the . + /// + /// A cancellation token used to propagate notification that this operation should be canceled. + /// The object that is removed from the beginning of the . + public async Task DequeueAsync(CancellationToken cancellationToken = default) + { + while (true) + { + TaskCompletionSource internalDequeueTcs; + lock (queue) + { + if (queue.Count > 0) + return queue.Dequeue(); + + internalDequeueTcs = ResetToken(ref dequeueTcs); + } + + await WaitAsync(internalDequeueTcs, cancellationToken); + } + } + + /// + /// Waits asynchonously until at least one object is available in the . + /// + /// A cancellation token used to propagate notification that this operation should be canceled. + /// An awaitable task. + public async Task WaitAsync(CancellationToken cancellationToken = default) + { + TaskCompletionSource internalAvailableTcs; + lock (queue) + { + if (queue.Count > 0) + return; + + internalAvailableTcs = ResetToken(ref availableTcs); + } + + await WaitAsync(internalAvailableTcs, cancellationToken); + } + + #endregion Async implementation + + #region Additional features + + /// + /// Removes the object at the beginning of the , and copies it to the parameter. + /// + /// The removed object. + /// true if the object is successfully removed; false if the is empty. + public bool TryDequeue(out T result) + { + try + { + result = Dequeue(); + return true; + } + catch + { + result = default; + return false; + } + } + + /// + /// Returns a value that indicates whether there is an object at the beginning of + /// the , and if one is present, copies it to the + /// parameter. The object is not removed from the . + /// + /// If present, the object at the beginning of the ; otherwise, the default value of . + /// true if there is an object at the beginning of the ; false if the is empty. + public bool TryPeek(out T result) + { + try + { + result = Peek(); + return true; + } + catch + { + result = default; + return false; + } + } + + /// + /// Removes the first occurrence of a specific object from the . + /// + /// The object to remove from the . The value can be null for reference types. + /// true if item is successfully removed; otherwise, false. This method also returns false if item was not found in the . + public bool Remove(T item) + { + lock (queue) + { + var copy = new Queue(queue); + queue.Clear(); + + bool found = false; + int count = copy.Count; + for (int i = 0; i < count; i++) + { + var element = copy.Dequeue(); + if (found) + { + queue.Enqueue(element); + continue; + } + + if ((element == null && item == null) || element?.Equals(item) == true) + { + found = true; + continue; + } + + queue.Enqueue(element); + } + + return found; + } + } + + /// + /// Adds objects to the end of the . + /// + /// The objects to add to the . + public void Enqueue(IEnumerable collection) + { + lock (queue) + { + bool hasElements = false; + foreach (var element in collection) + { + hasElements = true; + queue.Enqueue(element); + } + + if (hasElements) + { + SetToken(dequeueTcs); + SetToken(availableTcs); + } + } + } + + #endregion Additional features + + #region Helper + + [ExcludeFromCodeCoverage] + private static void SetToken(TaskCompletionSource tcs) + { + tcs.TrySetResult(true); + } + + [ExcludeFromCodeCoverage] + private static TaskCompletionSource ResetToken(ref TaskCompletionSource tcs) + { + if (tcs.Task.IsCompleted) + { + tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + + return tcs; + } + + [ExcludeFromCodeCoverage] + private static async Task WaitAsync(TaskCompletionSource tcs, CancellationToken cancellationToken) + { + if (await Task.WhenAny(tcs.Task, Task.Delay(-1, cancellationToken)) == tcs.Task) + { + await tcs.Task; + return; + } + + cancellationToken.ThrowIfCancellationRequested(); + } + + #endregion Helper + } +} diff --git a/CHANGELOG.md b/CHANGELOG.md index 96dbdb6..3e6c7b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased](https://git.am-wd.de/AM.WD/common/compare/v1.8.1...master) - 0000-00-00 -_nothing changed yet_ +### Added +- `AsyncQueue` + ## [v1.8.1](https://git.am-wd.de/AM.WD/common/compare/v1.8.0...v1.8.1) - 2022-08-07 diff --git a/UnitTests/Common/Utilities/AsyncQueueTests.cs b/UnitTests/Common/Utilities/AsyncQueueTests.cs new file mode 100644 index 0000000..4391413 --- /dev/null +++ b/UnitTests/Common/Utilities/AsyncQueueTests.cs @@ -0,0 +1,355 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Moq; + +namespace UnitTests.Common.Utilities +{ + [TestClass] + [System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage] + public class AsyncQueueTests + { + private Queue internalQueue; + + private TestElement queueElement1; + private TestElement queueElement2; + private TestElement queueElement3; + + [TestInitialize] + public void InitializeTest() + { + queueElement1 = new TestElement + { + Number = 111, + Text = "one" + }; + queueElement2 = new TestElement + { + Number = 222, + Text = "two" + }; + queueElement3 = new TestElement + { + Number = 333, + Text = "three" + }; + + internalQueue = new Queue(); + internalQueue.Enqueue(queueElement1); + internalQueue.Enqueue(queueElement2); + internalQueue.Enqueue(queueElement3); + } + + [TestMethod] + public void ShouldEnqueueItem() + { + // arrange + var element = new TestElement { Number = 1, Text = "Hello" }; + + internalQueue.Clear(); + var queue = GetQueue(); + + // act + queue.Enqueue(element); + + // assert + Assert.AreEqual(1, internalQueue.Count); + Assert.AreEqual(internalQueue.Count, queue.Count); + } + + [TestMethod] + public void ShouldEnqueueItemAndResetAvailableToken() + { + // arrange + var element = new TestElement { Number = 1, Text = "Hello" }; + bool available = false; + + internalQueue.Clear(); + var queue = GetQueue(); + + // act + var task = Task.Run(async () => + { + await queue.WaitAsync(); + available = true; + }); + queue.Enqueue(element); + task.Wait(); + + // assert + Assert.IsTrue(available); + } + + [TestMethod] + public void ShouldEnqueueItemAndResetDequeueToken() + { + // arrange + var element = new TestElement { Number = 1, Text = "Hello" }; + TestElement callback = null; + + internalQueue.Clear(); + var queue = GetQueue(); + + // act + var task = Task.Run(async () => + { + callback = await queue.DequeueAsync(); + }); + queue.Enqueue(element); + task.Wait(); + + // assert + Assert.IsNotNull(callback); + Assert.AreEqual(element, callback); + } + + [TestMethod] + public void ShouldEnqueueMultipleItems() + { + // arrange + var elements = new TestElement[] + { + new TestElement { Number = 1, Text = "Hello" }, + new TestElement { Number = 2, Text = "World" }, + }; + internalQueue.Clear(); + var queue = GetQueue(); + + // act + queue.Enqueue(elements); + + // assert + Assert.AreEqual(2, internalQueue.Count); + Assert.AreEqual(queue.Count, internalQueue.Count); + } + + [TestMethod] + public void ShouldPeekAValue() + { + // arrange + var queue = GetQueue(); + + // act + bool isSuccess = queue.TryPeek(out var item); + + // assert + Assert.IsTrue(isSuccess); + Assert.IsNotNull(item); + Assert.AreEqual(queueElement1, item); + Assert.AreEqual(3, queue.Count); + } + + [TestMethod] + public void ShouldNotPeekAValue() + { + // arrange + internalQueue.Clear(); + var queue = GetQueue(); + + // act + bool isSuccess = queue.TryPeek(out var item); + + // assert + Assert.IsFalse(isSuccess); + Assert.IsNull(item); + } + + [TestMethod] + public void ShouldDequeueAValue() + { + // arrange + var queue = GetQueue(); + + // act + bool isSuccess = queue.TryDequeue(out var item); + + // assert + Assert.IsTrue(isSuccess); + Assert.IsNotNull(item); + Assert.AreEqual(queueElement1, item); + Assert.AreEqual(2, queue.Count); + } + + [TestMethod] + public void ShouldNotDequeueAValue() + { + // arrange + internalQueue.Clear(); + var queue = GetQueue(); + + // act + bool isSuccess = queue.TryDequeue(out var item); + + // assert + Assert.IsFalse(isSuccess); + Assert.IsNull(item); + } + + [TestMethod] + public void ShouldRemoveAValue() + { + // arrange + var queue = GetQueue(); + + // act + queue.Remove(queueElement2); + var item1 = queue.Dequeue(); + var item2 = queue.Dequeue(); + + // assert + Assert.AreEqual(0, queue.Count); + Assert.AreEqual(queueElement1, item1); + Assert.AreEqual(queueElement3, item2); + } + + [TestMethod] + public void ShouldNotRemoveAValue() + { + // arrange + var queue = GetQueue(); + + // act + queue.Remove(null); + var item1 = queue.Dequeue(); + var item2 = queue.Dequeue(); + var item3 = queue.Dequeue(); + + // assert + Assert.AreEqual(0, queue.Count); + Assert.AreEqual(queueElement1, item1); + Assert.AreEqual(queueElement2, item2); + Assert.AreEqual(queueElement3, item3); + } + + [TestMethod] + public async Task ShouldAwaitOneDequeue() + { + // arrange + internalQueue.Clear(); + var queue = GetQueue(); + + var task = Task.Run(async () => + { + await Task.Delay(1000); + queue.Enqueue(new[] { queueElement1, queueElement2, queueElement3 }); + }); + + // act + var item = await queue.DequeueAsync(); + + // assert + Assert.AreEqual(2, queue.Count); + Assert.IsNotNull(item); + Assert.AreEqual(queueElement1, item); + } + + [TestMethod] + public async Task ShouldAwaitManyDequeue() + { + // arrange + internalQueue.Clear(); + var queue = GetQueue(); + + var task = Task.Run(async () => + { + await Task.Delay(1000); + queue.Enqueue(new[] { queueElement1, queueElement2, queueElement3 }); + }); + + // act + var items = await queue.DequeueManyAsync(2); + + // assert + Assert.AreEqual(1, queue.Count); + Assert.IsNotNull(items); + Assert.AreEqual(2, items.Length); + Assert.AreEqual(queueElement1, items[0]); + Assert.AreEqual(queueElement2, items[1]); + } + + [TestMethod] + public async Task ShouldAwaitAllDequeue() + { + // arrange + internalQueue.Clear(); + var queue = GetQueue(); + + var task = Task.Run(async () => + { + await Task.Delay(1000); + queue.Enqueue(new[] { queueElement1, queueElement2, queueElement3 }); + }); + + // act + var items = await queue.DequeueAvailableAsync(); + + // assert + Assert.AreEqual(0, queue.Count); + Assert.IsNotNull(items); + Assert.AreEqual(3, items.Length); + Assert.AreEqual(queueElement1, items[0]); + Assert.AreEqual(queueElement2, items[1]); + Assert.AreEqual(queueElement3, items[2]); + } + + [TestMethod] + public async Task ShouldAwaitAvailableDequeue() + { + // arrange + internalQueue.Clear(); + var queue = GetQueue(); + + var task = Task.Run(async () => + { + await Task.Delay(1000); + queue.Enqueue(new[] { queueElement1, queueElement2, queueElement3 }); + }); + + // act + var items = await queue.DequeueAvailableAsync(2); + + // assert + Assert.AreEqual(1, queue.Count); + Assert.IsNotNull(items); + Assert.AreEqual(2, items.Length); + Assert.AreEqual(queueElement1, items[0]); + Assert.AreEqual(queueElement2, items[1]); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentOutOfRangeException))] + public async Task ShouldThrowArumentOutOfRangeException() + { + // arrange + internalQueue.Clear(); + var queue = GetQueue(); + + // act + await queue.DequeueManyAsync(-2); + + // assert - ArgumentOutOfRangeException expected + Assert.Fail(); + } + + + + private AsyncQueue GetQueue() + { + var asyncQueue = new AsyncQueue(); + + var field = asyncQueue.GetType().GetField("queue", BindingFlags.Instance | BindingFlags.NonPublic); + field.SetValue(asyncQueue, internalQueue); + + return asyncQueue; + } + + private class TestElement + { + public int Number { get; set; } + + public string Text { get; set; } + } + } +}