День 1231. #ЗаметкиНаПолях #AsyncTips
Блокирующие Стеки и Множества
Задача
Требуется коммуникационный канал для передачи сообщений или данных из одного потока в другой, но вы не хотите, чтобы этот канал использовал семантику FIFO.
Решение
Тип .NET BlockingCollection<T> по умолчанию работает как блокирующая очередь, но он также может работать как любая другая коллекция «производитель/потребитель». По сути, это обёртка для потокобезопасной коллекции, реализующей
Таким образом, вы можете создать
Всё, чтобы было сказано о регулировке применительно к блокирующим очередям, также применимо к блокирующим стекам или множествам. Если ваши производители работают быстрее потребителей, и вы хотите ограничить использование памяти блокирующим стеком/очередью, используйте регулировку (о ней в будущих постах).
Здесь для кода-потребителя используется
Подробнее о потокобезопасных коллекциях см. 1, 2, 3
Источник: Стивен Клири “Конкурентность в C#”. 2-е межд. изд. — СПб.: Питер, 2020. Глава 9.
Блокирующие Стеки и Множества
Задача
Требуется коммуникационный канал для передачи сообщений или данных из одного потока в другой, но вы не хотите, чтобы этот канал использовал семантику FIFO.
Решение
Тип .NET BlockingCollection<T> по умолчанию работает как блокирующая очередь, но он также может работать как любая другая коллекция «производитель/потребитель». По сути, это обёртка для потокобезопасной коллекции, реализующей
IProducerConsumerCollection<T>.Таким образом, вы можете создать
BlockingCollection<T> с семантикой LIFO или семантикой неупорядоченного множества:var blockStack = new BlockingCollection<int>(Важно учитывать, что с упорядочением элементов связаны некоторые условия гонки. Если код-производитель отработает до любого кода-потребителя, порядок элементов будет таким же, как у стека:
new ConcurrentStack<int>());
var blockBag = new BlockingCollection<int>(
new ConcurrentBag<int>());
// Код-производительЕсли код-производитель и код-потребитель выполняются в разных потоках (как это обычно бывает), потребитель всегда получает следующим тот элемент, который был добавлен последним. Например, производитель добавляет
blockStack.Add(7);
blockStack.Add(13);
blockStack.CompleteAdding();
// Код-потребитель
// Выводит "13", затем "7".
foreach (int item in blockStack.GetConsumingEnumerable())
Console.WriteLine(item);
7, потребитель получает 7, затем производитель добавляет 13, потребитель получает 13. Потребитель не ожидает вызова CompleteAdding перед тем, как вернуть первый элемент.Всё, чтобы было сказано о регулировке применительно к блокирующим очередям, также применимо к блокирующим стекам или множествам. Если ваши производители работают быстрее потребителей, и вы хотите ограничить использование памяти блокирующим стеком/очередью, используйте регулировку (о ней в будущих постах).
Здесь для кода-потребителя используется
GetConsumingEnumerable. Это самый распространённый сценарий. Также существует метод Take, который позволяет потребителю получить только один элемент (вместо потребления всех элементов).Подробнее о потокобезопасных коллекциях см. 1, 2, 3
Источник: Стивен Клири “Конкурентность в C#”. 2-е межд. изд. — СПб.: Питер, 2020. Глава 9.
👍7
День 1246. #ЗаметкиНаПолях #AsyncTips
Асинхронные очереди
Задача
Требуется коммуникационный канал для передачи сообщений или данных из одной части кода в другую по принципу FIFO без блокирования потоков.
Например, один фрагмент кода может загружать данные, которые отправляются по каналу по мере загрузки; при этом UI-поток получает данные и выводит их.
Решение
Требуется очередь с асинхронным API. В базовом фреймворке .NET такого типа нет, но в NuGet есть пара возможных решений.
1. System.Threading.Channels
Библиотека для асинхронных коллекций «производитель/потребитель» с акцентом на быстродействие. Производители записывают элементы в канал вызовом
2. System.Threading.Tasks.Dataflow
См. также
- очереди «производитель/потребитель» с блокирующей семантикой вместо асинхронной.
Источник: Стивен Клири “Конкурентность в C#”. 2-е межд. изд. — СПб.: Питер, 2020. Глава 9.
Асинхронные очереди
Задача
Требуется коммуникационный канал для передачи сообщений или данных из одной части кода в другую по принципу FIFO без блокирования потоков.
Например, один фрагмент кода может загружать данные, которые отправляются по каналу по мере загрузки; при этом UI-поток получает данные и выводит их.
Решение
Требуется очередь с асинхронным API. В базовом фреймворке .NET такого типа нет, но в NuGet есть пара возможных решений.
1. System.Threading.Channels
Библиотека для асинхронных коллекций «производитель/потребитель» с акцентом на быстродействие. Производители записывают элементы в канал вызовом
WriteAsync, а когда завершают производство элементов, один из них вызывает Complete для уведомления канала о том, что в дальнейшем элементов больше не будет:var queue = Channel.CreateUnbounded<int>();Код-потребитель использует асинхронные потоки.
// Код-производитель
var writer = queue.Writer;
await writer.WriteAsync(7);
await writer.WriteAsync(13);
writer.Complete();
// Код-потребитель
// Выводит "7", затем "13".
var reader = queue.Reader;
await foreach (int value in reader.ReadAllAsync())
Console.WriteLine(value);
2. System.Threading.Tasks.Dataflow
BufferBlock<T> из библиотеки TPL Dataflow имеет много общего с каналом:var queue = new BufferBlock<int>();Код-потребитель использует метод
// Код-производитель
await queue.SendAsync(7);
await queue.SendAsync(13);
queue.Complete();
// Код-потребитель.
// Выводит "7", затем "13".
while (await queue.OutputAvailableAsync())
Console.WriteLine(await queue.ReceiveAsync());
OutputAvailableAsync, который на самом деле полезен только с одним потребителем. Если потребителей несколько, может случиться, что OutputAvailableAsync вернет true для нескольких потребителей, хотя элемент только один. Если очередь завершена, то ReceiveAsync выдаст исключение InvalidOperationException. Таким образом, для нескольких потребителей код будет выглядеть так:while (true)Библиотека Channels больше подходит для асинхронных очередей «производитель/потребитель» там, где это возможно. Помимо регулировки для случаев, когда производители работают быстрее потребителей, поддерживаются несколько режимов выборки (об этом в следующих постах). Однако, если логика вашего приложения может быть выражена в виде «конвейера», через который проходят данные, TPL Dataflow может быть более логичным кандидатом.
{
int item;
try
{
item = await queue.ReceiveAsync();
}
catch (InvalidOperationException)
{
break;
}
Console.WriteLine(item);
}
См. также
- очереди «производитель/потребитель» с блокирующей семантикой вместо асинхронной.
Источник: Стивен Клири “Конкурентность в C#”. 2-е межд. изд. — СПб.: Питер, 2020. Глава 9.
👍5