.NET Разработчик
6.56K subscribers
443 photos
3 videos
14 files
2.12K links
Дневник сертифицированного .NET разработчика. Заметки, советы, новости из мира .NET и C#.

Для связи: @SBenzenko

Поддержать канал:
- https://boosty.to/netdeveloperdiary
- https://patreon.com/user?u=52551826
- https://pay.cloudtips.ru/p/70df3b3b
Download Telegram
День 1231. #ЗаметкиНаПолях #AsyncTips
Блокирующие Стеки и Множества

Задача
Требуется коммуникационный канал для передачи сообщений или данных из одного потока в другой, но вы не хотите, чтобы этот канал использовал семантику FIFO.

Решение
Тип .NET BlockingCollection<T> по умолчанию работает как блокирующая очередь, но он также может работать как любая другая коллекция «производитель/потребитель». По сути, это обёртка для потокобезопасной коллекции, реализующей IProducerConsumerCollection<T>.

Таким образом, вы можете создать BlockingCollection<T> с семантикой LIFO или семантикой неупорядоченного множества:
var blockStack = new BlockingCollection<int>(
new ConcurrentStack<int>());
var blockBag = new BlockingCollection<int>(
new ConcurrentBag<int>());

Важно учитывать, что с упорядочением элементов связаны некоторые условия гонки. Если код-производитель отработает до любого кода-потребителя, порядок элементов будет таким же, как у стека:
// Код-производитель
blockStack.Add(7);
blockStack.Add(13);
blockStack.CompleteAdding();

// Код-потребитель
// Выводит "13", затем "7".
foreach (int item in blockStack.GetConsumingEnumerable())
Console.WriteLine(item);

Если код-производитель и код-потребитель выполняются в разных потоках (как это обычно бывает), потребитель всегда получает следующим тот элемент, который был добавлен последним. Например, производитель добавляет 7, потребитель получает 7, затем производитель добавляет 13, потребитель получает 13. Потребитель не ожидает вызова CompleteAdding перед тем, как вернуть первый элемент.

Всё, чтобы было сказано о регулировке применительно к блокирующим очередям, также применимо к блокирующим стекам или множествам. Если ваши производители работают быстрее потребителей, и вы хотите ограничить использование памяти блокирующим стеком/очередью, используйте регулировку (о ней в будущих постах).

Здесь для кода-потребителя используется GetConsumingEnumerable. Это самый распространённый сценарий. Также существует метод Take, который позволяет потребителю получить только один элемент (вместо потребления всех элементов).

Подробнее о потокобезопасных коллекциях см. 1, 2, 3

Источник: Стивен Клири “Конкурентность в C#”. 2-е межд. изд. — СПб.: Питер, 2020. Глава 9.
👍7
День 1246. #ЗаметкиНаПолях #AsyncTips
Асинхронные очереди

Задача
Требуется коммуникационный канал для передачи сообщений или данных из одной части кода в другую по принципу FIFO без блокирования потоков.
Например, один фрагмент кода может загружать данные, которые отправляются по каналу по мере загрузки; при этом UI-поток получает данные и выводит их.

Решение
Требуется очередь с асинхронным API. В базовом фреймворке .NET такого типа нет, но в NuGet есть пара возможных решений.

1. System.Threading.Channels
Библиотека для асинхронных коллекций «производитель/потребитель» с акцентом на быстродействие. Производители записывают элементы в канал вызовом WriteAsync, а когда завершают производство элементов, один из них вызывает Complete для уведомления канала о том, что в дальнейшем элементов больше не будет:
var queue = Channel.CreateUnbounded<int>();
// Код-производитель
var writer = queue.Writer;
await writer.WriteAsync(7);
await writer.WriteAsync(13);
writer.Complete();

// Код-потребитель
// Выводит "7", затем "13".
var reader = queue.Reader;
await foreach (int value in reader.ReadAllAsync())
Console.WriteLine(value);

Код-потребитель использует асинхронные потоки.

2. System.Threading.Tasks.Dataflow
BufferBlock<T> из библиотеки TPL Dataflow имеет много общего с каналом:
var queue = new BufferBlock<int>();
// Код-производитель
await queue.SendAsync(7);
await queue.SendAsync(13);
queue.Complete();

// Код-потребитель.
// Выводит "7", затем "13".
while (await queue.OutputAvailableAsync())
Console.WriteLine(await queue.ReceiveAsync());

Код-потребитель использует метод OutputAvailableAsync, который на самом деле полезен только с одним потребителем. Если потребителей несколько, может случиться, что OutputAvailableAsync вернет true для нескольких потребителей, хотя элемент только один. Если очередь завершена, то ReceiveAsync выдаст исключение InvalidOperationException. Таким образом, для нескольких потребителей код будет выглядеть так:
while (true)
{
int item;
try
{
item = await queue.ReceiveAsync();
}
catch (InvalidOperationException)
{
break;
}
Console.WriteLine(item);
}

Библиотека Channels больше подходит для асинхронных очередей «производитель/потребитель» там, где это возможно. Помимо регулировки для случаев, когда производители работают быстрее потребителей, поддерживаются несколько режимов выборки (об этом в следующих постах). Однако, если логика вашего приложения может быть выражена в виде «конвейера», через который проходят данные, TPL Dataflow может быть более логичным кандидатом.

См. также
- очереди «производитель/потребитель» с блокирующей семантикой вместо асинхронной.

Источник: Стивен Клири “Конкурентность в C#”. 2-е межд. изд. — СПб.: Питер, 2020. Глава 9.
👍5
День 1253. #ЗаметкиНаПолях #AsyncTips
Регулировка очередей

Задача
Имеется очередь «производитель/потребитель», но производители могут работать быстрее потребителей, что может привести к неэффективному использованию памяти. Также вам хотелось бы сохранить все элементы в очереди, а следовательно, требуется механизм регулировки производителей.

Решение
Если элементы производятся быстрее, чем потребители могут потреблять их, очередь придётся отрегулировать. Для этого можно задать максимальное количество элементов. Когда очередь будет «заполнена», она блокирует производителей, пока в очереди не появится свободное место.

Регулировка может выполняться посредством создания ограниченного канала (вместо неограниченного). Так как каналы асинхронны, производители будут регулироваться асинхронно:
var queue = Channel.CreateBounded<int>(1);
var writer = queue.Writer;
// Эта запись завершается немедленно.
await writer.WriteAsync(7);

// Эта запись (асинхронно) ожидает удаления 7
// перед тем как вставить в очередь 13.
await writer.WriteAsync(13);
writer.Complete();

Тип BufferBlock<T> также имеет встроенную поддержку регулировки, следует задать параметр BoundedCapacity:
var queue = new BufferBlock<int>(
new DataflowBlockOptions
{
BoundedCapacity = 1
});

// Эта отправка завершается немедленно.
await queue.SendAsync(7);

// Эта отправка (асинхронно) ожидает удаления 7
// перед тем как ставить в очередь 13.
await queue.SendAsync(13);
queue.Complete();

Производитель в этом фрагменте кода использует асинхронный метод SendAsync.

Блокирующие очереди «производитель/потребитель» также поддерживают регулировку. Вы можете использовать тип BlockingCollection<T> для регулировки количества элементов, для чего при создании передается соответствующее значение:
var queue = new BlockingCollection<int>(boundedCapacity: 1);
// Это добавление завершается немедленно.
queue.Add(7);
// Это добавление ожидает удаления 7
// перед тем, как добавлять 13.
queue.Add(13);
queue.CompleteAdding();

Итого
Регулировка необходима в том случае, если производители работают быстрее потребителей. Один из сценариев, которые необходимо рассмотреть: могут ли производители работать быстрее потребителей, если ваше приложение будет работать на другом оборудовании? Обычно некоторая регулировка потребуется для того, чтобы гарантировать нормальную работу на будущем оборудовании и/или облачных платформах, которые нередко более ограничены в ресурсах, чем машины разработчиков.

Регулировка замедляет работу производителей, блокируя их, чтобы потребители гарантированно могли обработать все элементы без излишних затрат памяти. Если обрабатывать каждый элемент не обязательно, можно использовать выборку вместо регулировки (об этом в будущих постах).

См. также
- Асинхронные очереди
- Блокирующие очереди

Источник: Стивен Клири “Конкурентность в C#”. 2-е межд. изд. — СПб.: Питер, 2020. Глава 9.
👍3
День 1267. #ЗаметкиНаПолях #AsyncTips
Выборка в очередях

Задача
Есть очередь «производитель/потребитель», но производители могут работать быстрее потребителей, что может привести к неэффективному использованию памяти. Сохранять все элементы из очереди не обязательно; необходимо отфильтровать элементы очереди так, чтобы более медленные потребители могли ограничиться обработкой самых важных элементов.

Решение
Библиотека Channels предоставляет самые простые средства применения выборки к элементам ввода. Типичный пример — всегда брать последние n элементов с потерей самых старых элементов при заполнении очереди:
var queue = Channel.CreateBounded<int>(
new BoundedChannelOptions(1)
{
FullMode = BoundedChannelFullMode.DropOldest,
});
var writer = queue.Writer;
// Операция записи завершается немедленно.
await writer.WriteAsync(7);
// Операция записи тоже завершается немедленно.
// Элемент 7 теряется, если только он не был
// немедленно извлечен потребителем.
await writer.WriteAsync(13);

Это самый простой механизм контроля входных потоков и предотвращения «затопления» потребителей.

Есть и другие режимы BoundedChannelFullMode. Например, если вы хотите, чтобы самые старые элементы сохранялись, можно при заполнении канала терять новые элементы:
var queue = Channel.CreateBounded<int>(
new BoundedChannelOptions(1)
{
FullMode = BoundedChannelFullMode.DropWrite,
});
var writer = queue.Writer;
// Операция записи завершается немедленно
await writer.WriteAsync(7);
// Операция записи тоже завершается немедленно
// Элемент 13 теряется, если только элемент 7 не был
// немедленно извлечен потребителем.
await writer.WriteAsync(13);

Пояснение
Библиотека Channels отлично подходит для простой выборки. Во многих ситуациях полезен режим BoundedChannelFullMode.DropOldest. Более сложная выборка должна выполняться самими потребителями.
Если выборка должна выполняться по времени (например, «только 10 элементов в секунду»), используйте System.Reactive. В System.Reactive предусмотрены естественные операторы для работы со временем.

См. также
- Асинхронные очереди
- Блокирующие очереди
- Регулировка очередей

Источник: Стивен Клири “Конкурентность в C#”. 2-е межд. изд. — СПб.: Питер, 2020. Глава 9.
👍5
День 1279. #ЗаметкиНаПолях #AsyncTips
Связанные Запросы на Отмену

Подробнее про токены отмены см. «Скоординированная отмена».

Задача: в коде присутствует промежуточный уровень, который должен реагировать на запросы на отмену «сверху», а также выдавать собственные запросы на отмену на следующий уровень.

Решение
В .NET предусмотрена встроенная поддержка этого сценария в виде связанных токенов отмены. Источник токена отмены может быть создан связанным с одним (или несколькими) существующими токенами. Когда вы создаёте источник связанного токена отмены, полученный токен будет отменяться как при непосредственной отмене его, так и при отмене любого из связанных токенов.

Следующий пример выполняет асинхронный запрос HTTP. Токен ct, переданный методу GetWithTimeoutAsync, представляет отмену, запрошенную конечным пользователем, а сам метод также применяет тайм-аут к запросу:
async Task<HttpResponseMessage>
GetWithTimeoutAsync(
HttpClient client,
string url,
CancellationToken ct)
{
using var cts =
CancellationTokenSource
.CreateLinkedTokenSource(ct);

cts.CancelAfter(TimeSpan.FromSeconds(2));

var combinedToken = cts.Token;

return await client.GetAsync(url, combinedToken);
}

Полученный токен combinedToken отменяется либо когда пользователь отменяет существующий токен ct, либо при отмене связанного источника вызовом CancelAfter.

Хотя в примере выше используется только один токен ct, метод CreateLinkedTokenSource может получать любое количество токенов отмены в своих параметрах. Это позволяет вам создать один объединённый токен, на базе которого можно реализовать собственную логику отмены.

Например, ASP.NET предоставляет токен отмены, представляющий отключение пользователя. Код обработчика может создать связанный токен, который реагирует либо на отключение пользователя, либо на свои причины отмены (например, тайм-аут).

Помните о сроке существования источника связанного токена отмены. Предыдущий пример является наиболее типичным: один или несколько токенов отмены передаются методу, который связывает их и передаёт как комбинированный токен. Обратите внимание на то, что в примере используется команда using, которая гарантирует, что источник связанного токена отмены будет освобожден, когда операция будет завершена (а комбинированный токен перестанет использоваться). Подумайте, что произойдёт, если код не освободит источник связанного токена отмены: может оказаться, что метод GetWithTimeoutAsync будет вызван несколько раз с одним (долгосрочным) существующим токеном; в этом случае код будет создавать новый источник связанного токена при каждом вызове. Даже после того, как запросы HTTP завершатся (и ничто не будет использовать комбинированный токен), эти связанные источники будут оставаться присоединенными к существующему токену. Чтобы предотвратить подобные утечки памяти, освобождайте источник связанного токена отмены, когда комбинированный токен перестаёт быть нужным.

Источник: Стивен Клири “Конкурентность в C#”. 2-е межд. изд. — СПб.: Питер, 2020. Глава 10.
👍4