diff --git a/AMWD.Common/Utilities/AsyncQueue.cs b/AMWD.Common/Utilities/AsyncQueue.cs
new file mode 100644
index 0000000..ce70024
--- /dev/null
+++ b/AMWD.Common/Utilities/AsyncQueue.cs
@@ -0,0 +1,437 @@
+using System.Diagnostics.CodeAnalysis;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Collections.Generic
+{
+ ///
+ /// Represents a first-in, first-out collection of objects.
+ ///
+ /// Specifies the type of elements in the queue.
+ public class AsyncQueue
+ {
+ #region Fields
+
+ private readonly Queue queue;
+
+ private TaskCompletionSource dequeueTcs = new();
+ private TaskCompletionSource availableTcs = new();
+
+ #endregion Fields
+
+ #region Constructors
+
+ ///
+ /// Initializes a new instance of the class that is empty and has the default initial capacity.
+ ///
+ [ExcludeFromCodeCoverage]
+ public AsyncQueue()
+ {
+ queue = new Queue();
+ }
+
+ ///
+ /// Initializes a new instance of the System.Collections.Generic.Queue`1 class that
+ /// contains elements copied from the specified collection and has sufficient capacity
+ /// to accommodate the number of elements copied.
+ ///
+ /// The collection whose elements are copied to the new .
+ /// is null.
+ [ExcludeFromCodeCoverage]
+ public AsyncQueue(IEnumerable collection)
+ {
+ queue = new Queue();
+ Enqueue(collection);
+ }
+
+ ///
+ /// Initializes a new instance of the class that is empty and has the specified initial capacity.
+ ///
+ /// The initial number of elements that the can contain.
+ /// is less than zero.
+ [ExcludeFromCodeCoverage]
+ public AsyncQueue(int capacity)
+ {
+ queue = new Queue(capacity);
+ }
+
+ #endregion Constructors
+
+ #region Properties
+
+ ///
+ /// Gets the number of elements contained in the .
+ ///
+ [ExcludeFromCodeCoverage]
+ public int Count
+ {
+ get
+ {
+ lock (queue)
+ {
+ return queue.Count;
+ }
+ }
+ }
+
+ #endregion Properties
+
+ #region Queue implementation
+
+ ///
+ /// Removes all objects from the .
+ ///
+ [ExcludeFromCodeCoverage]
+ public void Clear()
+ {
+ lock (queue)
+ {
+ queue.Clear();
+ }
+ }
+
+ ///
+ /// Determines whether an element is in the .
+ ///
+ /// The object to locate in the . The value can be null for reference types.
+ /// true if item is found in the ; otherwise, false.
+ [ExcludeFromCodeCoverage]
+ public bool Contains(T item)
+ {
+ lock (queue)
+ {
+ return queue.Contains(item);
+ }
+ }
+
+ ///
+ /// Copies the elements to an existing one-dimensional , starting at the specified array index.
+ ///
+ /// The one-dimensional that is the destination of the elements copied from . The must have zero-based indexing.
+ /// The zero-based index in array at which copying begins.
+ /// is null.
+ /// is less than zero.
+ /// The number of elements in the source is greater than the available space from to the end of the destination array.
+ [ExcludeFromCodeCoverage]
+ public void CopyTo(T[] array, int arrayIndex)
+ {
+ lock (queue)
+ {
+ queue.CopyTo(array, arrayIndex);
+ }
+ }
+
+ ///
+ /// Removes and returns the object at the beginning of the .
+ ///
+ /// The object that is removed from the beginning of the .
+ /// The is empty.
+ [ExcludeFromCodeCoverage]
+ public T Dequeue()
+ {
+ lock (queue)
+ {
+ return queue.Dequeue();
+ }
+ }
+
+ ///
+ /// Adds an object to the end of the .
+ ///
+ /// The object to add to the . The value can be null for reference types.
+ public void Enqueue(T item)
+ {
+ lock (queue)
+ {
+ queue.Enqueue(item);
+ SetToken(dequeueTcs);
+ SetToken(availableTcs);
+ }
+ }
+
+ ///
+ /// Returns the object at the beginning of the without removing it.
+ ///
+ /// The object at the beginning of the .
+ /// The is empty.
+ [ExcludeFromCodeCoverage]
+ public T Peek()
+ {
+ lock (queue)
+ {
+ return queue.Peek();
+ }
+ }
+
+ ///
+ /// Copies the elements to a new array.
+ ///
+ /// A new array containing elements copied from the .
+ [ExcludeFromCodeCoverage]
+ public T[] ToArray()
+ {
+ lock (queue)
+ {
+ return queue.ToArray();
+ }
+ }
+
+ ///
+ /// Sets the capacity to the actual number of elements in the , if that number is less than 90 percent of current capacity.
+ ///
+ [ExcludeFromCodeCoverage]
+ public void TrimExcess()
+ {
+ lock (queue)
+ {
+ queue.TrimExcess();
+ }
+ }
+
+ #endregion Queue implementation
+
+ #region Async implementation
+
+ ///
+ /// Removes and returns all available objects at the beginning of the .
+ ///
+ /// The maximum number of objects to return. Zero means no limit.
+ /// A cancellation token used to propagate notification that this operation should be canceled.
+ /// The objects that are removed from the beginning of the .
+ public async Task DequeueAvailableAsync(int maxCount = 0, CancellationToken cancellationToken = default)
+ {
+ while (true)
+ {
+ TaskCompletionSource internalDequeueTcs;
+ lock (queue)
+ {
+ if (queue.Count > 0)
+ {
+ int count = queue.Count;
+ if (maxCount > 0 && count > maxCount)
+ count = maxCount;
+
+ var items = new T[count];
+ for (int i = 0; i < count; i++)
+ items[i] = queue.Dequeue();
+
+ return items;
+ }
+ internalDequeueTcs = ResetToken(ref dequeueTcs);
+ }
+
+ await WaitAsync(internalDequeueTcs, cancellationToken);
+ }
+ }
+
+ ///
+ /// Removes and returns objects at the beginning of the .
+ ///
+ /// The number of objects to return.
+ /// A cancellation token used to propagate notification that this operation should be canceled.
+ /// The objects that are removed from the beginning of the .
+ public async Task DequeueManyAsync(int count, CancellationToken cancellationToken = default)
+ {
+ if (count < 0)
+ throw new ArgumentOutOfRangeException(nameof(count));
+
+ while (true)
+ {
+ TaskCompletionSource internalDequeueTcs;
+ lock (queue)
+ {
+ if (count <= queue.Count)
+ {
+ var items = new T[count];
+ for (int i = 0; i < count; i++)
+ items[i] = queue.Dequeue();
+
+ return items;
+ }
+ internalDequeueTcs = ResetToken(ref dequeueTcs);
+ }
+
+ await WaitAsync(internalDequeueTcs, cancellationToken);
+ }
+ }
+
+ ///
+ /// Removes and returns the object at the beginning of the .
+ ///
+ /// A cancellation token used to propagate notification that this operation should be canceled.
+ /// The object that is removed from the beginning of the .
+ public async Task DequeueAsync(CancellationToken cancellationToken = default)
+ {
+ while (true)
+ {
+ TaskCompletionSource internalDequeueTcs;
+ lock (queue)
+ {
+ if (queue.Count > 0)
+ return queue.Dequeue();
+
+ internalDequeueTcs = ResetToken(ref dequeueTcs);
+ }
+
+ await WaitAsync(internalDequeueTcs, cancellationToken);
+ }
+ }
+
+ ///
+ /// Waits asynchonously until at least one object is available in the .
+ ///
+ /// A cancellation token used to propagate notification that this operation should be canceled.
+ /// An awaitable task.
+ public async Task WaitAsync(CancellationToken cancellationToken = default)
+ {
+ TaskCompletionSource internalAvailableTcs;
+ lock (queue)
+ {
+ if (queue.Count > 0)
+ return;
+
+ internalAvailableTcs = ResetToken(ref availableTcs);
+ }
+
+ await WaitAsync(internalAvailableTcs, cancellationToken);
+ }
+
+ #endregion Async implementation
+
+ #region Additional features
+
+ ///
+ /// Removes the object at the beginning of the , and copies it to the parameter.
+ ///
+ /// The removed object.
+ /// true if the object is successfully removed; false if the is empty.
+ public bool TryDequeue(out T result)
+ {
+ try
+ {
+ result = Dequeue();
+ return true;
+ }
+ catch
+ {
+ result = default;
+ return false;
+ }
+ }
+
+ ///
+ /// Returns a value that indicates whether there is an object at the beginning of
+ /// the , and if one is present, copies it to the
+ /// parameter. The object is not removed from the .
+ ///
+ /// If present, the object at the beginning of the ; otherwise, the default value of .
+ /// true if there is an object at the beginning of the ; false if the is empty.
+ public bool TryPeek(out T result)
+ {
+ try
+ {
+ result = Peek();
+ return true;
+ }
+ catch
+ {
+ result = default;
+ return false;
+ }
+ }
+
+ ///
+ /// Removes the first occurrence of a specific object from the .
+ ///
+ /// The object to remove from the . The value can be null for reference types.
+ /// true if item is successfully removed; otherwise, false. This method also returns false if item was not found in the .
+ public bool Remove(T item)
+ {
+ lock (queue)
+ {
+ var copy = new Queue(queue);
+ queue.Clear();
+
+ bool found = false;
+ int count = copy.Count;
+ for (int i = 0; i < count; i++)
+ {
+ var element = copy.Dequeue();
+ if (found)
+ {
+ queue.Enqueue(element);
+ continue;
+ }
+
+ if ((element == null && item == null) || element?.Equals(item) == true)
+ {
+ found = true;
+ continue;
+ }
+
+ queue.Enqueue(element);
+ }
+
+ return found;
+ }
+ }
+
+ ///
+ /// Adds objects to the end of the .
+ ///
+ /// The objects to add to the .
+ public void Enqueue(IEnumerable collection)
+ {
+ lock (queue)
+ {
+ bool hasElements = false;
+ foreach (var element in collection)
+ {
+ hasElements = true;
+ queue.Enqueue(element);
+ }
+
+ if (hasElements)
+ {
+ SetToken(dequeueTcs);
+ SetToken(availableTcs);
+ }
+ }
+ }
+
+ #endregion Additional features
+
+ #region Helper
+
+ [ExcludeFromCodeCoverage]
+ private static void SetToken(TaskCompletionSource tcs)
+ {
+ tcs.TrySetResult(true);
+ }
+
+ [ExcludeFromCodeCoverage]
+ private static TaskCompletionSource ResetToken(ref TaskCompletionSource tcs)
+ {
+ if (tcs.Task.IsCompleted)
+ {
+ tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ }
+
+ return tcs;
+ }
+
+ [ExcludeFromCodeCoverage]
+ private static async Task WaitAsync(TaskCompletionSource tcs, CancellationToken cancellationToken)
+ {
+ if (await Task.WhenAny(tcs.Task, Task.Delay(-1, cancellationToken)) == tcs.Task)
+ {
+ await tcs.Task;
+ return;
+ }
+
+ cancellationToken.ThrowIfCancellationRequested();
+ }
+
+ #endregion Helper
+ }
+}
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 96dbdb6..3e6c7b8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,7 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased](https://git.am-wd.de/AM.WD/common/compare/v1.8.1...master) - 0000-00-00
-_nothing changed yet_
+### Added
+- `AsyncQueue`
+
## [v1.8.1](https://git.am-wd.de/AM.WD/common/compare/v1.8.0...v1.8.1) - 2022-08-07
diff --git a/UnitTests/Common/Utilities/AsyncQueueTests.cs b/UnitTests/Common/Utilities/AsyncQueueTests.cs
new file mode 100644
index 0000000..4391413
--- /dev/null
+++ b/UnitTests/Common/Utilities/AsyncQueueTests.cs
@@ -0,0 +1,355 @@
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Threading.Tasks;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Moq;
+
+namespace UnitTests.Common.Utilities
+{
+ [TestClass]
+ [System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverage]
+ public class AsyncQueueTests
+ {
+ private Queue internalQueue;
+
+ private TestElement queueElement1;
+ private TestElement queueElement2;
+ private TestElement queueElement3;
+
+ [TestInitialize]
+ public void InitializeTest()
+ {
+ queueElement1 = new TestElement
+ {
+ Number = 111,
+ Text = "one"
+ };
+ queueElement2 = new TestElement
+ {
+ Number = 222,
+ Text = "two"
+ };
+ queueElement3 = new TestElement
+ {
+ Number = 333,
+ Text = "three"
+ };
+
+ internalQueue = new Queue();
+ internalQueue.Enqueue(queueElement1);
+ internalQueue.Enqueue(queueElement2);
+ internalQueue.Enqueue(queueElement3);
+ }
+
+ [TestMethod]
+ public void ShouldEnqueueItem()
+ {
+ // arrange
+ var element = new TestElement { Number = 1, Text = "Hello" };
+
+ internalQueue.Clear();
+ var queue = GetQueue();
+
+ // act
+ queue.Enqueue(element);
+
+ // assert
+ Assert.AreEqual(1, internalQueue.Count);
+ Assert.AreEqual(internalQueue.Count, queue.Count);
+ }
+
+ [TestMethod]
+ public void ShouldEnqueueItemAndResetAvailableToken()
+ {
+ // arrange
+ var element = new TestElement { Number = 1, Text = "Hello" };
+ bool available = false;
+
+ internalQueue.Clear();
+ var queue = GetQueue();
+
+ // act
+ var task = Task.Run(async () =>
+ {
+ await queue.WaitAsync();
+ available = true;
+ });
+ queue.Enqueue(element);
+ task.Wait();
+
+ // assert
+ Assert.IsTrue(available);
+ }
+
+ [TestMethod]
+ public void ShouldEnqueueItemAndResetDequeueToken()
+ {
+ // arrange
+ var element = new TestElement { Number = 1, Text = "Hello" };
+ TestElement callback = null;
+
+ internalQueue.Clear();
+ var queue = GetQueue();
+
+ // act
+ var task = Task.Run(async () =>
+ {
+ callback = await queue.DequeueAsync();
+ });
+ queue.Enqueue(element);
+ task.Wait();
+
+ // assert
+ Assert.IsNotNull(callback);
+ Assert.AreEqual(element, callback);
+ }
+
+ [TestMethod]
+ public void ShouldEnqueueMultipleItems()
+ {
+ // arrange
+ var elements = new TestElement[]
+ {
+ new TestElement { Number = 1, Text = "Hello" },
+ new TestElement { Number = 2, Text = "World" },
+ };
+ internalQueue.Clear();
+ var queue = GetQueue();
+
+ // act
+ queue.Enqueue(elements);
+
+ // assert
+ Assert.AreEqual(2, internalQueue.Count);
+ Assert.AreEqual(queue.Count, internalQueue.Count);
+ }
+
+ [TestMethod]
+ public void ShouldPeekAValue()
+ {
+ // arrange
+ var queue = GetQueue();
+
+ // act
+ bool isSuccess = queue.TryPeek(out var item);
+
+ // assert
+ Assert.IsTrue(isSuccess);
+ Assert.IsNotNull(item);
+ Assert.AreEqual(queueElement1, item);
+ Assert.AreEqual(3, queue.Count);
+ }
+
+ [TestMethod]
+ public void ShouldNotPeekAValue()
+ {
+ // arrange
+ internalQueue.Clear();
+ var queue = GetQueue();
+
+ // act
+ bool isSuccess = queue.TryPeek(out var item);
+
+ // assert
+ Assert.IsFalse(isSuccess);
+ Assert.IsNull(item);
+ }
+
+ [TestMethod]
+ public void ShouldDequeueAValue()
+ {
+ // arrange
+ var queue = GetQueue();
+
+ // act
+ bool isSuccess = queue.TryDequeue(out var item);
+
+ // assert
+ Assert.IsTrue(isSuccess);
+ Assert.IsNotNull(item);
+ Assert.AreEqual(queueElement1, item);
+ Assert.AreEqual(2, queue.Count);
+ }
+
+ [TestMethod]
+ public void ShouldNotDequeueAValue()
+ {
+ // arrange
+ internalQueue.Clear();
+ var queue = GetQueue();
+
+ // act
+ bool isSuccess = queue.TryDequeue(out var item);
+
+ // assert
+ Assert.IsFalse(isSuccess);
+ Assert.IsNull(item);
+ }
+
+ [TestMethod]
+ public void ShouldRemoveAValue()
+ {
+ // arrange
+ var queue = GetQueue();
+
+ // act
+ queue.Remove(queueElement2);
+ var item1 = queue.Dequeue();
+ var item2 = queue.Dequeue();
+
+ // assert
+ Assert.AreEqual(0, queue.Count);
+ Assert.AreEqual(queueElement1, item1);
+ Assert.AreEqual(queueElement3, item2);
+ }
+
+ [TestMethod]
+ public void ShouldNotRemoveAValue()
+ {
+ // arrange
+ var queue = GetQueue();
+
+ // act
+ queue.Remove(null);
+ var item1 = queue.Dequeue();
+ var item2 = queue.Dequeue();
+ var item3 = queue.Dequeue();
+
+ // assert
+ Assert.AreEqual(0, queue.Count);
+ Assert.AreEqual(queueElement1, item1);
+ Assert.AreEqual(queueElement2, item2);
+ Assert.AreEqual(queueElement3, item3);
+ }
+
+ [TestMethod]
+ public async Task ShouldAwaitOneDequeue()
+ {
+ // arrange
+ internalQueue.Clear();
+ var queue = GetQueue();
+
+ var task = Task.Run(async () =>
+ {
+ await Task.Delay(1000);
+ queue.Enqueue(new[] { queueElement1, queueElement2, queueElement3 });
+ });
+
+ // act
+ var item = await queue.DequeueAsync();
+
+ // assert
+ Assert.AreEqual(2, queue.Count);
+ Assert.IsNotNull(item);
+ Assert.AreEqual(queueElement1, item);
+ }
+
+ [TestMethod]
+ public async Task ShouldAwaitManyDequeue()
+ {
+ // arrange
+ internalQueue.Clear();
+ var queue = GetQueue();
+
+ var task = Task.Run(async () =>
+ {
+ await Task.Delay(1000);
+ queue.Enqueue(new[] { queueElement1, queueElement2, queueElement3 });
+ });
+
+ // act
+ var items = await queue.DequeueManyAsync(2);
+
+ // assert
+ Assert.AreEqual(1, queue.Count);
+ Assert.IsNotNull(items);
+ Assert.AreEqual(2, items.Length);
+ Assert.AreEqual(queueElement1, items[0]);
+ Assert.AreEqual(queueElement2, items[1]);
+ }
+
+ [TestMethod]
+ public async Task ShouldAwaitAllDequeue()
+ {
+ // arrange
+ internalQueue.Clear();
+ var queue = GetQueue();
+
+ var task = Task.Run(async () =>
+ {
+ await Task.Delay(1000);
+ queue.Enqueue(new[] { queueElement1, queueElement2, queueElement3 });
+ });
+
+ // act
+ var items = await queue.DequeueAvailableAsync();
+
+ // assert
+ Assert.AreEqual(0, queue.Count);
+ Assert.IsNotNull(items);
+ Assert.AreEqual(3, items.Length);
+ Assert.AreEqual(queueElement1, items[0]);
+ Assert.AreEqual(queueElement2, items[1]);
+ Assert.AreEqual(queueElement3, items[2]);
+ }
+
+ [TestMethod]
+ public async Task ShouldAwaitAvailableDequeue()
+ {
+ // arrange
+ internalQueue.Clear();
+ var queue = GetQueue();
+
+ var task = Task.Run(async () =>
+ {
+ await Task.Delay(1000);
+ queue.Enqueue(new[] { queueElement1, queueElement2, queueElement3 });
+ });
+
+ // act
+ var items = await queue.DequeueAvailableAsync(2);
+
+ // assert
+ Assert.AreEqual(1, queue.Count);
+ Assert.IsNotNull(items);
+ Assert.AreEqual(2, items.Length);
+ Assert.AreEqual(queueElement1, items[0]);
+ Assert.AreEqual(queueElement2, items[1]);
+ }
+
+ [TestMethod]
+ [ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public async Task ShouldThrowArumentOutOfRangeException()
+ {
+ // arrange
+ internalQueue.Clear();
+ var queue = GetQueue();
+
+ // act
+ await queue.DequeueManyAsync(-2);
+
+ // assert - ArgumentOutOfRangeException expected
+ Assert.Fail();
+ }
+
+
+
+ private AsyncQueue GetQueue()
+ {
+ var asyncQueue = new AsyncQueue();
+
+ var field = asyncQueue.GetType().GetField("queue", BindingFlags.Instance | BindingFlags.NonPublic);
+ field.SetValue(asyncQueue, internalQueue);
+
+ return asyncQueue;
+ }
+
+ private class TestElement
+ {
+ public int Number { get; set; }
+
+ public string Text { get; set; }
+ }
+ }
+}