MODULE 5: ASYNCHRONOUS PROGRAMMING - Complete Guide
52. Task and Task (The Foundation)
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
public class TaskBasics
{
// TASK - Represents an asynchronous operation
// TASK<T> - Represents an async operation that returns a value
public void CreatingTasks()
{
// 1. Task.Run - Queue work to thread pool
Task task1 = Task.Run(() =>
{
Console.WriteLine("Task running on thread pool");
});
// 2. Task.Factory.StartNew - More control
Task task2 = Task.Factory.StartNew(() =>
{
Console.WriteLine("Factory task");
});
// 3. Task with return value
Task<int> task3 = Task.Run(() =>
{
Thread.Sleep(100);
return 42;
});
// 4. Task from result (already completed)
Task<int> completedTask = Task.FromResult(100);
// 5. Task that never completes (for testing)
Task neverComplete = Task.Delay(Timeout.Infinite);
// 6. Task that throws exception
Task faultedTask = Task.FromException(new InvalidOperationException("Failed"));
}
// TASK STATUS AND PROPERTIES
public void TaskProperties()
{
Task task = Task.Run(() =>
{
Thread.Sleep(100);
});
// Status before start
Console.WriteLine($"Status: {task.Status}"); // WaitingForActivation or RanToCompletion
// Wait for completion
task.Wait(); // Blocks (avoid in UI!)
// After completion
Console.WriteLine($"IsCompleted: {task.IsCompleted}");
Console.WriteLine($"IsFaulted: {task.IsFaulted}");
Console.WriteLine($"IsCanceled: {task.IsCanceled}");
// For Task<T>
Task<int> taskT = Task.Run(() => 42);
taskT.Wait();
Console.WriteLine($"Result: {taskT.Result}"); // Blocks if not complete
// Better to use await instead of .Result or .Wait()
}
// WAITING FOR TASKS
public async Task WaitingForTasks()
{
var task1 = Task.Run(() => { Thread.Sleep(1000); return 1; });
var task2 = Task.Run(() => { Thread.Sleep(500); return 2; });
var task3 = Task.Run(() => { Thread.Sleep(1500); return 3; });
// Wait for all tasks to complete
Task.WaitAll(task1, task2, task3); // Blocks
// Wait for any task to complete
Task.WaitAny(task1, task2, task3); // Blocks until one completes
// Async versions (preferred)
await Task.WhenAll(task1, task2, task3); // Non-blocking
await Task.WhenAny(task1, task2, task3); // Non-blocking
// Get results after WhenAll
int[] results = await Task.WhenAll(task1, task2, task3);
// Process as they complete
var tasks = new[] { task1, task2, task3 };
foreach (var task in tasks)
{
int result = await task;
Console.WriteLine($"Completed: {result}");
}
}
// TASK EXCEPTION HANDLING
public async Task TaskExceptions()
{
// Exception in task
Task task = Task.Run(() =>
{
throw new InvalidOperationException("Something went wrong");
});
try
{
await task;
}
catch (InvalidOperationException ex)
{
Console.WriteLine($"Caught: {ex.Message}");
}
// Multiple exceptions (AggregateException)
Task task1 = Task.Run(() => throw new Exception("Error 1"));
Task task2 = Task.Run(() => throw new Exception("Error 2"));
try
{
await Task.WhenAll(task1, task2);
}
catch (AggregateException ex)
{
foreach (var innerEx in ex.InnerExceptions)
{
Console.WriteLine($"Inner: {innerEx.Message}");
}
}
catch (Exception ex)
{
// WhenAll throws first exception by default
Console.WriteLine($"First error: {ex.Message}");
}
// Check faulted tasks
var faultedTask = Task.Run(() => throw new Exception("Failed"));
await Task.Delay(10);
if (faultedTask.IsFaulted)
{
Console.WriteLine($"Task faulted: {faultedTask.Exception?.Message}");
}
}
// TASK COMPLETION SOURCE (Manual task control)
public class TaskCompletionSourceDemo
{
public Task<int> WaitForResultAsync()
{
var tcs = new TaskCompletionSource<int>();
// Simulate some event-based operation
Timer timer = null;
timer = new Timer(_ =>
{
tcs.SetResult(42);
timer.Dispose();
}, null, 1000, Timeout.Infinite);
return tcs.Task;
}
public Task<string> WaitForEventAsync()
{
var tcs = new TaskCompletionSource<string>();
// Could be triggered by an event
void OnDataReceived(string data)
{
tcs.SetResult(data);
}
// Simulate data received after delay
Task.Run(async () =>
{
await Task.Delay(500);
OnDataReceived("Hello from event");
});
return tcs.Task;
}
public async Task UseManualTask()
{
// Create task that completes when condition is met
var tcs = new TaskCompletionSource<bool>();
// Some condition
bool conditionMet = false;
// Simulate condition becoming true
Task.Run(async () =>
{
await Task.Delay(2000);
conditionMet = true;
tcs.SetResult(true);
});
await tcs.Task;
Console.WriteLine("Condition met!");
}
}
// TASK CANCELLATION (Basic)
public async Task TaskCancellationBasics()
{
// Create cancellation token source
using var cts = new CancellationTokenSource();
// Task that checks for cancellation
Task longRunningTask = Task.Run(() =>
{
for (int i = 0; i < 100; i++)
{
// Check for cancellation
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("Cancellation requested!");
cts.Token.ThrowIfCancellationRequested();
}
Thread.Sleep(100);
Console.WriteLine($"Working... {i}");
}
}, cts.Token);
// Cancel after 1 second
await Task.Delay(1000);
cts.Cancel();
try
{
await longRunningTask;
}
catch (OperationCanceledException)
{
Console.WriteLine("Task was cancelled");
}
}
// TASK SCHEDULERS (Control where tasks run)
public void TaskSchedulers()
{
// Default scheduler (thread pool)
Task task1 = Task.Run(() => { });
// Current synchronization context (UI thread)
TaskScheduler uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
// Custom scheduler (limited concurrency)
var limitedScheduler = new LimitedConcurrencyLevelTaskScheduler(2);
Task.Factory.StartNew(() => { }, CancellationToken.None,
TaskCreationOptions.None, limitedScheduler);
}
// Custom Task Scheduler (Limited concurrency)
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
private readonly LinkedList<Task> _tasks = new LinkedList<Task>();
private readonly int _maxConcurrencyLevel;
private int _runningTasks = 0;
public LimitedConcurrencyLevelTaskScheduler(int maxConcurrencyLevel)
{
_maxConcurrencyLevel = maxConcurrencyLevel;
}
protected override IEnumerable<Task> GetScheduledTasks()
{
lock (_tasks)
{
return _tasks.ToArray();
}
}
protected override void QueueTask(Task task)
{
lock (_tasks)
{
_tasks.AddLast(task);
if (_runningTasks < _maxConcurrencyLevel)
{
_runningTasks++;
NotifyThreadPoolOfPendingWork();
}
}
}
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
Task taskToRun;
lock (_tasks)
{
taskToRun = _tasks.First?.Value;
if (taskToRun != null)
_tasks.RemoveFirst();
}
if (taskToRun != null)
{
TryExecuteTask(taskToRun);
lock (_tasks)
{
_runningTasks--;
if (_tasks.Count > 0 && _runningTasks < _maxConcurrencyLevel)
{
_runningTasks++;
NotifyThreadPoolOfPendingWork();
}
}
}
}, null);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
public override int MaximumConcurrencyLevel => _maxConcurrencyLevel;
}
}
// REAL-WORLD EXAMPLE - Data loading service
public class DataLoader
{
private readonly HttpClient _httpClient = new HttpClient();
private readonly CacheService _cache = new CacheService();
public async Task<string> LoadDataAsync(string url, CancellationToken cancellationToken = default)
{
// Check cache first
if (_cache.TryGet(url, out string cachedData))
{
return cachedData;
}
// Load from network
try
{
var response = await _httpClient.GetAsync(url, cancellationToken);
response.EnsureSuccessStatusCode();
var data = await response.Content.ReadAsStringAsync(cancellationToken);
// Cache result
_cache.Set(url, data);
return data;
}
catch (OperationCanceledException)
{
Console.WriteLine("Loading cancelled");
throw;
}
catch (HttpRequestException ex)
{
Console.WriteLine($"Network error: {ex.Message}");
throw;
}
}
public async Task<string[]> LoadMultipleAsync(string[] urls)
{
// Load all in parallel
var tasks = urls.Select(url => LoadDataAsync(url));
return await Task.WhenAll(tasks);
}
public async IAsyncEnumerable<string> LoadStreamAsync(string[] urls)
{
foreach (var url in urls)
{
yield return await LoadDataAsync(url);
}
}
}
public class CacheService
{
private Dictionary<string, string> _cache = new();
public bool TryGet(string key, out string value)
{
return _cache.TryGetValue(key, out value);
}
public void Set(string key, string value)
{
_cache[key] = value;
}
}
53. Async/Await Pattern (Deep Dive)
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
public class AsyncAwaitDeepDive
{
// BASIC ASYNC METHOD
public async Task<string> GetDataAsync()
{
Console.WriteLine($"Starting - Thread: {Thread.CurrentThread.ManagedThreadId}");
// await returns control to caller
string result = await Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine($"Inside Task - Thread: {Thread.CurrentThread.ManagedThreadId}");
return "Hello Async";
});
// Continues after await (may be on different thread)
Console.WriteLine($"Continuing - Thread: {Thread.CurrentThread.ManagedThreadId}");
return result;
}
// ASYNC METHOD WITH MULTIPLE AWAITS
public async Task<string> ProcessDataAsync()
{
// First operation
string data = await DownloadDataAsync();
// Second operation (depends on first)
string processed = await ProcessDataAsync(data);
// Third operation
string result = await SaveDataAsync(processed);
return result;
}
private Task<string> DownloadDataAsync() => Task.FromResult("raw data");
private Task<string> ProcessDataAsync(string data) => Task.FromResult($"processed {data}");
private Task<string> SaveDataAsync(string data) => Task.FromResult($"saved {data}");
// STATE MACHINE (What the compiler generates)
// The compiler transforms async method into state machine
[AsyncStateMachine(typeof(MyStateMachine))]
public async Task MyAsyncMethod()
{
int x = 10;
await Task.Delay(100);
int y = 20;
Console.WriteLine(x + y);
}
// What compiler generates (simplified)
public class MyStateMachine : IAsyncStateMachine
{
private int _state;
private TaskAwaiter _awaiter;
public int x;
public int y;
public void MoveNext()
{
switch (_state)
{
case 0:
x = 10;
_awaiter = Task.Delay(100).GetAwaiter();
if (!_awaiter.IsCompleted)
{
_state = 1;
// Register continuation
return;
}
goto case 1;
case 1:
_awaiter.GetResult();
y = 20;
Console.WriteLine(x + y);
break;
}
}
public void SetStateMachine(IAsyncStateMachine stateMachine) { }
}
// AWAITER PATTERN (Custom awaitable)
public class CustomAwaitable
{
public CustomAwaiter GetAwaiter() => new CustomAwaiter();
}
public class CustomAwaiter : INotifyCompletion
{
public bool IsCompleted => false;
public void OnCompleted(Action continuation)
{
// Execute continuation after some condition
Task.Run(() =>
{
Thread.Sleep(1000);
continuation();
});
}
public void GetResult() { }
}
public async Task UseCustomAwaitable()
{
await new CustomAwaitable();
Console.WriteLine("After custom await");
}
// ASYNC VOID (Avoid except event handlers)
public async void ButtonClickHandler(object sender, EventArgs e)
{
// ONLY use async void for event handlers
try
{
await SaveDataAsync();
}
catch (Exception ex)
{
// Exceptions go to SynchronizationContext
Console.WriteLine($"Error: {ex.Message}");
}
}
// ASYNC VOID PROBLEMS
public async Task AsyncVoidProblems()
{
// DON'T DO THIS
async void BadMethod()
{
throw new InvalidOperationException("This exception is lost!");
}
BadMethod(); // Exception can't be caught!
// DO THIS INSTEAD
async Task GoodMethod()
{
throw new InvalidOperationException("This can be caught");
}
await GoodMethod(); // Exception caught here
}
// VALUETASK (For performance-critical async)
public async ValueTask<int> GetCachedValueAsync()
{
// Use ValueTask when result is often cached or synchronous
if (_cache.TryGetValue("key", out int cached))
{
return cached; // No allocation!
}
// Otherwise async path
int result = await FetchFromDatabaseAsync();
return result;
}
private Dictionary<string, int> _cache = new();
private async Task<int> FetchFromDatabaseAsync()
{
await Task.Delay(100);
return 42;
}
// RETURNING TASK FROM NON-ASYNC METHOD
public Task<int> GetValueAsync()
{
// No async keyword, but returns Task
if (_cache.TryGetValue("value", out int cached))
{
return Task.FromResult(cached); // Already completed
}
// Otherwise start async operation
return GetValueFromDatabaseAsync();
}
private async Task<int> GetValueFromDatabaseAsync()
{
await Task.Delay(100);
return 42;
}
// ASYNC METHOD OVERLOADING
public async Task<string> GetDataAsync()
{
return await GetDataAsync(CancellationToken.None);
}
public async Task<string> GetDataAsync(CancellationToken cancellationToken)
{
// Implementation
await Task.Delay(100, cancellationToken);
return "data";
}
// ASYNC DISPOSABLE (IAsyncDisposable)
public class AsyncResource : IAsyncDisposable
{
public async ValueTask DisposeAsync()
{
await CleanupAsync();
}
private Task CleanupAsync() => Task.CompletedTask;
}
public async Task UseAsyncDisposable()
{
await using var resource = new AsyncResource();
// Use resource
} // DisposeAsync called automatically
}
// REAL-WORLD EXAMPLES
// 1. UI Application (WPF/MAUI)
public class UIService
{
// Use ConfigureAwait to avoid deadlocks in UI
public async Task LoadDataAsync()
{
// In UI app, await returns to UI thread
var data = await Task.Run(() => FetchData());
// This runs on UI thread - safe to update UI
UpdateUI(data);
}
private string FetchData()
{
Thread.Sleep(1000);
return "Data";
}
private void UpdateUI(string data) { }
// For library code - use ConfigureAwait(false)
public async Task<string> LibraryMethodAsync()
{
// ConfigureAwait(false) means don't capture context
var data = await Task.Run(() => FetchData()).ConfigureAwait(false);
// This runs on thread pool thread (not original context)
return ProcessData(data);
}
private string ProcessData(string data) => data.ToUpper();
}
// 2. ASP.NET Core (No need for ConfigureAwait in modern apps)
public class ControllerService
{
// In ASP.NET Core, no synchronization context
public async Task<string> HandleRequestAsync()
{
// ConfigureAwait not needed (no context to capture)
var data = await GetDataAsync();
var processed = await ProcessDataAsync(data);
return processed;
}
private Task<string> GetDataAsync() => Task.FromResult("data");
private Task<string> ProcessDataAsync(string data) => Task.FromResult($"processed {data}");
}
// 3. Progress Reporting
public class ProgressReporter
{
public async Task ProcessItemsAsync(IProgress<int> progress)
{
for (int i = 0; i <= 100; i += 10)
{
await Task.Delay(100);
progress?.Report(i);
}
}
public async Task UseProgress()
{
var progress = new Progress<int>(percent =>
{
Console.WriteLine($"Progress: {percent}%");
});
await ProcessItemsAsync(progress);
}
}
// 4. Retry Pattern
public class RetryHelper
{
public static async Task<T> RetryAsync<T>(
Func<Task<T>> operation,
int maxRetries = 3,
int delayMs = 1000,
CancellationToken cancellationToken = default)
{
int retryCount = 0;
while (true)
{
try
{
return await operation();
}
catch (Exception ex) when (retryCount < maxRetries)
{
retryCount++;
Console.WriteLine($"Attempt {retryCount} failed: {ex.Message}. Retrying...");
await Task.Delay(delayMs * retryCount, cancellationToken);
}
}
}
}
// 5. Timeout Pattern
public class TimeoutHelper
{
public static async Task<T> WithTimeoutAsync<T>(
Task<T> task,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var timeoutTask = Task.Delay(timeout, timeoutCts.Token);
var completedTask = await Task.WhenAny(task, timeoutTask);
if (completedTask == timeoutTask)
{
throw new TimeoutException($"Operation timed out after {timeout}");
}
timeoutCts.Cancel(); // Cancel timeout task
return await task; // Propagate any exceptions
}
public static async Task WithTimeoutAsync(
Task task,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var timeoutTask = Task.Delay(timeout, timeoutCts.Token);
var completedTask = await Task.WhenAny(task, timeoutTask);
if (completedTask == timeoutTask)
{
throw new TimeoutException($"Operation timed out after {timeout}");
}
timeoutCts.Cancel();
await task;
}
}
54. Cancellation Tokens (Graceful Shutdown)
using System;
using System.Threading;
using System.Threading.Tasks;
public class CancellationTokensDemo
{
// BASIC CANCELLATION
public async Task BasicCancellation()
{
using var cts = new CancellationTokenSource();
// Start long-running operation
var task = LongRunningOperationAsync(cts.Token);
// Cancel after 2 seconds
await Task.Delay(2000);
cts.Cancel();
try
{
await task;
}
catch (OperationCanceledException)
{
Console.WriteLine("Operation cancelled");
}
}
private async Task LongRunningOperationAsync(CancellationToken token)
{
for (int i = 0; i < 100; i++)
{
// Check for cancellation
token.ThrowIfCancellationRequested();
await Task.Delay(100);
Console.WriteLine($"Progress: {i}");
}
}
// CANCELLATION WITH LINKED SOURCES
public async Task LinkedCancellation()
{
using var cts1 = new CancellationTokenSource();
using var cts2 = new CancellationTokenSource();
// Cancel if either token cancels
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
cts1.Token, cts2.Token);
// Start operation with linked token
var task = OperationWithCancellationAsync(linkedCts.Token);
// Either of these will cancel the operation
// cts1.Cancel();
// cts2.Cancel();
// Timeout after 5 seconds
linkedCts.CancelAfter(5000);
try
{
await task;
}
catch (OperationCanceledException)
{
Console.WriteLine("Operation cancelled by timeout or user");
}
}
// CANCELLATION IN PARALLEL OPERATIONS
public async Task ParallelCancellation()
{
using var cts = new CancellationTokenSource();
cts.CancelAfter(3000);
var token = cts.Token;
// Run multiple tasks with same cancellation token
var tasks = new[]
{
ProcessItemAsync(1, token),
ProcessItemAsync(2, token),
ProcessItemAsync(3, token),
ProcessItemAsync(4, token)
};
try
{
await Task.WhenAll(tasks);
}
catch (OperationCanceledException)
{
Console.WriteLine("Some tasks were cancelled");
// Check which tasks completed successfully
foreach (var task in tasks)
{
if (task.IsCompletedSuccessfully)
{
Console.WriteLine("Task completed before cancellation");
}
else if (task.IsCanceled)
{
Console.WriteLine("Task was cancelled");
}
else if (task.IsFaulted)
{
Console.WriteLine("Task faulted: {task.Exception}");
}
}
}
}
private async Task ProcessItemAsync(int id, CancellationToken token)
{
for (int i = 0; i < 10; i++)
{
token.ThrowIfCancellationRequested();
await Task.Delay(500);
Console.WriteLine($"Item {id}, step {i}");
}
}
// CANCELLATION WITH PROGRESS
public async Task CancellationWithProgress()
{
using var cts = new CancellationTokenSource();
var progress = new Progress<int>(p => Console.WriteLine($"Progress: {p}%"));
var task = ProcessWithProgressAsync(progress, cts.Token);
// Cancel after 3 seconds
await Task.Delay(3000);
cts.Cancel();
try
{
await task;
}
catch (OperationCanceledException)
{
Console.WriteLine("Processing cancelled");
}
}
private async Task ProcessWithProgressAsync(IProgress<int> progress, CancellationToken token)
{
for (int i = 0; i <= 100; i += 10)
{
token.ThrowIfCancellationRequested();
await Task.Delay(200);
progress?.Report(i);
}
}
// CANCELLATION TOKEN REGISTRATION
public async Task CancellationRegistration()
{
using var cts = new CancellationTokenSource();
var token = cts.Token;
// Register callback when cancellation requested
using (token.Register(() => Console.WriteLine("Cancellation requested!")))
{
// Start operation
var task = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await Task.Delay(500);
Console.WriteLine($"Working... {i}");
}
}, token);
await Task.Delay(2000);
cts.Cancel();
try
{
await task;
}
catch (OperationCanceledException)
{
Console.WriteLine("Task cancelled");
}
}
}
// CANCELLATION IN ASYNC STREAMS (IAsyncEnumerable)
public async Task CancellationInAsyncStreams()
{
using var cts = new CancellationTokenSource();
cts.CancelAfter(3000);
await foreach (var item in GenerateSequenceAsync(cts.Token))
{
Console.WriteLine($"Received: {item}");
}
}
private async IAsyncEnumerable<int> GenerateSequenceAsync([System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken token = default)
{
for (int i = 0; i < 100; i++)
{
token.ThrowIfCancellationRequested();
await Task.Delay(200);
yield return i;
}
}
// CANCELLATION IN HTTP REQUESTS
public async Task CancellableHttpRequest()
{
using var httpClient = new HttpClient();
using var cts = new CancellationTokenSource();
// Cancel after 5 seconds
cts.CancelAfter(5000);
try
{
var response = await httpClient.GetAsync("https://api.example.com/data", cts.Token);
var content = await response.Content.ReadAsStringAsync(cts.Token);
Console.WriteLine($"Received {content.Length} characters");
}
catch (TaskCanceledException)
{
Console.WriteLine("Request timed out or was cancelled");
}
catch (OperationCanceledException)
{
Console.WriteLine("Operation was cancelled");
}
}
}
// REAL-WORLD EXAMPLE - Service with graceful shutdown
public class BackgroundService : IHostedService
{
private readonly ILogger _logger;
private CancellationTokenSource _cts;
private Task _executingTask;
public BackgroundService(ILogger logger)
{
_logger = logger;
}
public Task StartAsync(CancellationToken cancellationToken)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_executingTask = ExecuteAsync(_cts.Token);
return Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
if (_executingTask == null) return;
try
{
// Signal cancellation to running task
_cts.Cancel();
}
finally
{
// Wait for task to complete (with timeout)
await Task.WhenAny(_executingTask, Task.Delay(5000, cancellationToken));
}
}
private async Task ExecuteAsync(CancellationToken token)
{
_logger.LogInformation("Service starting");
try
{
while (!token.IsCancellationRequested)
{
await ProcessBatchAsync(token);
await Task.Delay(1000, token);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Service stopping gracefully");
}
catch (Exception ex)
{
_logger.LogError(ex, "Service error");
throw;
}
}
private async Task ProcessBatchAsync(CancellationToken token)
{
// Process data
await Task.Delay(500, token);
_logger.LogDebug("Batch processed");
}
}
public interface IHostedService
{
Task StartAsync(CancellationToken cancellationToken);
Task StopAsync(CancellationToken cancellationToken);
}
public interface ILogger
{
void LogInformation(string message);
void LogDebug(string message);
void LogError(Exception ex, string message);
}
55. Progress Reporting (IProgress)
using System;
using System.Threading;
using System.Threading.Tasks;
public class ProgressReportingDemo
{
// BASIC PROGRESS REPORTING
public async Task BasicProgressReporting()
{
// Create progress reporter
var progress = new Progress<int>(percent =>
{
Console.WriteLine($"Progress: {percent}%");
UpdateUI(percent);
});
// Start operation with progress
await LongOperationAsync(progress);
}
private async Task LongOperationAsync(IProgress<int> progress)
{
for (int i = 0; i <= 100; i += 10)
{
await Task.Delay(500);
progress.Report(i);
}
}
// COMPLEX PROGRESS (Custom type)
public class OperationProgress
{
public int PercentComplete { get; set; }
public string CurrentStep { get; set; }
public int ItemsProcessed { get; set; }
public int TotalItems { get; set; }
public TimeSpan ElapsedTime { get; set; }
public override string ToString()
=> $"{PercentComplete}% - {CurrentStep} ({ItemsProcessed}/{TotalItems}) - {ElapsedTime}";
}
public async Task ComplexProgressReporting()
{
var progress = new Progress<OperationProgress>(p =>
{
Console.Clear();
Console.WriteLine($"Progress: {p.PercentComplete}%");
Console.WriteLine($"Step: {p.CurrentStep}");
Console.WriteLine($"Items: {p.ItemsProcessed}/{p.TotalItems}");
Console.WriteLine($"Elapsed: {p.ElapsedTime}");
});
await ProcessLargeDatasetAsync(1000, progress);
}
private async Task ProcessLargeDatasetAsync(int itemCount, IProgress<OperationProgress> progress)
{
var stopwatch = System.Diagnostics.Stopwatch.StartNew();
for (int i = 0; i <= itemCount; i++)
{
// Simulate processing
await Task.Delay(10);
if (i % 100 == 0 || i == itemCount)
{
progress.Report(new OperationProgress
{
PercentComplete = (int)((double)i / itemCount * 100),
CurrentStep = $"Processing item {i}",
ItemsProcessed = i,
TotalItems = itemCount,
ElapsedTime = stopwatch.Elapsed
});
}
}
progress.Report(new OperationProgress
{
PercentComplete = 100,
CurrentStep = "Complete",
ItemsProcessed = itemCount,
TotalItems = itemCount,
ElapsedTime = stopwatch.Elapsed
});
}
// PROGRESS WITH CANCELLATION
public async Task ProgressWithCancellation()
{
using var cts = new CancellationTokenSource();
var progress = new Progress<int>(p => Console.WriteLine($"Progress: {p}%"));
// Cancel after 3 seconds
cts.CancelAfter(3000);
try
{
await ProcessWithProgressAsync(progress, cts.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("Operation cancelled");
}
}
private async Task ProcessWithProgressAsync(IProgress<int> progress, CancellationToken token)
{
for (int i = 0; i <= 100; i += 5)
{
token.ThrowIfCancellationRequested();
await Task.Delay(200, token);
progress.Report(i);
}
}
// MULTI-STEP PROGRESS
public async Task MultiStepProgress()
{
var progress = new Progress<string>(step => Console.WriteLine($"Step: {step}"));
await MultiStepOperationAsync(progress);
}
private async Task MultiStepOperationAsync(IProgress<string> progress)
{
progress.Report("Initializing...");
await Task.Delay(500);
progress.Report("Loading data...");
await Task.Delay(1000);
progress.Report("Processing data...");
await Task.Delay(1500);
progress.Report("Validating results...");
await Task.Delay(800);
progress.Report("Saving to database...");
await Task.Delay(1200);
progress.Report("Complete!");
}
// PROGRESS IN PARALLEL OPERATIONS
public async Task ParallelProgressReporting()
{
var overallProgress = new Progress<int>(p => Console.WriteLine($"Overall: {p}%"));
var fileProgress = new Progress<int>(p => Console.WriteLine($" File: {p}%"));
await ProcessFilesInParallelAsync(overallProgress, fileProgress);
}
private async Task ProcessFilesInParallelAsync(IProgress<int> overall, IProgress<int> file)
{
string[] files = { "file1.dat", "file2.dat", "file3.dat", "file4.dat", "file5.dat" };
var tasks = files.Select(file => ProcessSingleFileAsync(file, file));
await Task.WhenAll(tasks);
overall.Report(100);
}
private async Task ProcessSingleFileAsync(string filename, IProgress<int> progress)
{
for (int i = 0; i <= 100; i += 20)
{
await Task.Delay(100);
progress.Report(i);
}
}
// UI UPDATE (Thread-safe progress reporting)
public class UIProgressReporter
{
// In WPF/MAUI/WinForms, Progress<T> captures synchronization context
public async Task UpdateUIProgress()
{
// Progress automatically marshals to UI thread
var progress = new Progress<int>(percent =>
{
// This runs on UI thread
// progressBar.Value = percent;
// statusLabel.Text = $"{percent}%";
Console.WriteLine($"UI Updated: {percent}%");
});
await BackgroundOperationAsync(progress);
}
private async Task BackgroundOperationAsync(IProgress<int> progress)
{
await Task.Run(() =>
{
for (int i = 0; i <= 100; i++)
{
Thread.Sleep(50);
progress.Report(i);
}
});
}
}
// REAL-WORLD EXAMPLE - File processor with progress
public class FileProcessor
{
public async Task ProcessFilesAsync(
string[] filePaths,
IProgress<(string FileName, int Percent, string Status)> progress,
CancellationToken cancellationToken = default)
{
int totalFiles = filePaths.Length;
for (int i = 0; i < totalFiles; i++)
{
cancellationToken.ThrowIfCancellationRequested();
var file = filePaths[i];
progress.Report((file, 0, "Starting"));
await Task.Run(() =>
{
// Simulate file processing
for (int p = 0; p <= 100; p += 10)
{
Thread.Sleep(50);
progress.Report((file, p, $"Processing: {p}%"));
}
}, cancellationToken);
progress.Report((file, 100, "Complete"));
int overallPercent = (int)((double)(i + 1) / totalFiles * 100);
progress.Report(($"Overall", overallPercent, $"Done: {i + 1}/{totalFiles}"));
}
}
}
}
// Helper methods
partial class ProgressReportingDemo
{
private void UpdateUI(int percent)
{
// Simulate UI update
Console.WriteLine($"UI: {percent}%");
}
}
56. ConfigureAwait (Context Control)
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms; // Example UI context
public class ConfigureAwaitDemo
{
// WHAT IS SYNCHRONIZATION CONTEXT?
public void SynchronizationContextExplanation()
{
// Different contexts:
// - UI apps (WPF/WinForms): UI context
// - ASP.NET (old): Request context
// - Console apps: Default (null) context
// - ASP.NET Core: No context (ConfigureAwait not needed)
var context = SynchronizationContext.Current;
Console.WriteLine($"Current context: {context?.GetType().Name ?? "null"}");
}
// DEFAULT BEHAVIOR (Captures context)
public async Task DefaultContextCapture()
{
// Start on original context (e.g., UI thread)
Console.WriteLine($"Before await: Thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(1000); // Captures context
// Returns to original context (may be same or different thread)
Console.WriteLine($"After await: Thread {Thread.CurrentThread.ManagedThreadId}");
// In UI app, this runs on UI thread
// UpdateUI(); // Safe to update UI
}
// CONFIGUREAWAIT(FALSE) - Don't capture context
public async Task NoContextCapture()
{
// Start on any context
Console.WriteLine($"Before await: Thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(1000).ConfigureAwait(false); // Don't capture context
// Runs on ANY thread (usually thread pool)
Console.WriteLine($"After await: Thread {Thread.CurrentThread.ManagedThreadId}");
// Can't update UI here!
}
// UI APPLICATION PATTERN
public class UIPattern
{
// UI button click handler
public async void ButtonClickHandler()
{
// On UI thread
ShowLoadingSpinner();
try
{
// CPU-bound work - move to background
var data = await Task.Run(() => HeavyComputation()).ConfigureAwait(false);
// Back on UI thread? NO - ConfigureAwait false means thread pool
// Need to marshal back to UI
// Option 1: Don't use ConfigureAwait for last part
// Option 2: Use dispatcher
await UpdateUIAsync(data);
}
finally
{
HideLoadingSpinner();
}
}
// Better approach: Use ConfigureAwait strategically
public async Task<string> ProcessDataAsync()
{
// Library code - no UI update needed
var data = await FetchFromDatabaseAsync().ConfigureAwait(false);
var processed = await ProcessDataAsync(data).ConfigureAwait(false);
return processed;
}
// UI-specific code - keep context
public async Task LoadDataForUIAsync()
{
// Keep context for UI updates
ShowLoading(); // UI thread
// Background work - don't need context
var data = await FetchDataAsync().ConfigureAwait(false);
// Back on UI thread for update
UpdateUI(data); // Runs on UI thread because we didn't use ConfigureAwait on last await
}
private string HeavyComputation() => "result";
private Task UpdateUIAsync(string data) => Task.CompletedTask;
private void ShowLoadingSpinner() { }
private void HideLoadingSpinner() { }
private Task<string> FetchFromDatabaseAsync() => Task.FromResult("data");
private Task<string> ProcessDataAsync(string data) => Task.FromResult($"processed {data}");
private void ShowLoading() { }
private Task<string> FetchDataAsync() => Task.FromResult("data");
private void UpdateUI(string data) { }
}
// LIBRARY BEST PRACTICES
public class LibraryBestPractices
{
// For reusable libraries, always use ConfigureAwait(false)
public async Task<string> LibraryMethodAsync()
{
// No need to capture context in library
var result = await InternalOperationAsync().ConfigureAwait(false);
return result;
}
// When you have multiple awaits, use ConfigureAwait on all
public async Task<int> ProcessItemsAsync(string[] items)
{
var results = new List<int>();
foreach (var item in items)
{
// Use ConfigureAwait for each await
var value = await ProcessItemAsync(item).ConfigureAwait(false);
results.Add(value);
}
return results.Sum();
}
private Task<string> InternalOperationAsync() => Task.FromResult("result");
private Task<int> ProcessItemAsync(string item) => Task.FromResult(item.Length);
}
// ASP.NET CORE (Modern) vs OLD ASP.NET
public class AspNetComparison
{
// Old ASP.NET (Framework) - Has SynchronizationContext
public async Task<ActionResult> OldAspNetAction()
{
// Has request context
var data = await GetDataAsync();
// Returns to original request context
return View(data);
}
// ASP.NET Core - NO SynchronizationContext
public async Task<IActionResult> AspNetCoreAction()
{
// No context to capture
var data = await GetDataAsync(); // ConfigureAwait not needed
return View(data);
}
}
// DEADLOCK SCENARIO (Why ConfigureAwait matters)
public class DeadlockScenario
{
// THIS CAUSES DEADLOCK IN UI/CONTEXT-SENSITIVE APPS
public void ThisWillDeadlock()
{
// In UI app, this deadlocks!
var task = GetDataAsync();
var result = task.Result; // Blocks UI thread
}
// This works
public async Task ThisWorks()
{
var result = await GetDataAsync(); // Doesn't block
}
private async Task<string> GetDataAsync()
{
// This attempts to marshal back to original context
await Task.Delay(1000);
return "data";
}
// FIX: Use ConfigureAwait(false)
private async Task<string> FixedGetDataAsync()
{
await Task.Delay(1000).ConfigureAwait(false);
return "data";
// Now task.Result won't deadlock because no context capture
}
}
// RULES OF THUMB
public class ConfigureAwaitRules
{
// ✅ DO: Use ConfigureAwait(false) in library code
public async Task LibraryCodeAsync()
{
await OperationAsync().ConfigureAwait(false);
await AnotherOperationAsync().ConfigureAwait(false);
}
// ✅ DO: Use ConfigureAwait(false) for background operations
public async Task BackgroundWorkAsync()
{
await Task.Run(() => Compute()).ConfigureAwait(false);
}
// ❌ DON'T: Use ConfigureAwait(false) if you need context after
public async Task UIOperationAsync()
{
var data = await GetDataAsync(); // Keep context
UpdateUI(data); // Need UI thread
}
// ✅ DO: Use ConfigureAwait(false) for CPU-bound work, then marshal back
public async Task MixedOperationAsync()
{
// Background work (no context needed)
var result = await Task.Run(() => HeavyCompute()).ConfigureAwait(false);
// Back to UI thread using dispatcher
await Task.Run(() => UpdateUI(result));
}
// ✅ DO: In ASP.NET Core, ConfigureAwait is optional (no context)
public async Task<string> AspNetCoreMethodAsync()
{
// ConfigureAwait not needed but doesn't hurt
var data = await GetDataAsync().ConfigureAwait(false);
return data;
}
private Task OperationAsync() => Task.CompletedTask;
private Task AnotherOperationAsync() => Task.CompletedTask;
private void Compute() { }
private int HeavyCompute() => 42;
private Task<string> GetDataAsync() => Task.FromResult("data");
private void UpdateUI(string data) { }
}
// CONFIGUREAWAIT WITH VALUETASK
public async ValueTask<int> ValueTaskWithConfigureAwait()
{
// Same rules apply
var result = await GetValueAsync().ConfigureAwait(false);
return result;
}
private ValueTask<int> GetValueAsync() => new ValueTask<int>(42);
}
// Helper classes for ASP.NET examples
public class ActionResult { }
public class ViewResult : ActionResult { }
public interface IActionResult { }
public class ViewResult2 : IActionResult { }
57. Async Streams (IAsyncEnumerable - C# 8)
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
public class AsyncStreamsDemo
{
// BASIC ASYNC STREAM
public async Task BasicAsyncStream()
{
// Consume async stream
await foreach (var number in GenerateNumbersAsync(10))
{
Console.WriteLine($"Received: {number}");
}
}
// Producing async stream
private async IAsyncEnumerable<int> GenerateNumbersAsync(int count)
{
for (int i = 0; i < count; i++)
{
await Task.Delay(100); // Async work
yield return i; // Yield asynchronously
}
}
// ASYNC STREAM WITH CANCELLATION
public async Task AsyncStreamWithCancellation()
{
using var cts = new CancellationTokenSource();
cts.CancelAfter(3000);
try
{
await foreach (var item in GenerateWithCancellationAsync(cts.Token))
{
Console.WriteLine($"Item: {item}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Stream cancelled");
}
}
private async IAsyncEnumerable<int> GenerateWithCancellationAsync(
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken token = default)
{
for (int i = 0; i < 100; i++)
{
token.ThrowIfCancellationRequested();
await Task.Delay(200, token);
yield return i;
}
}
// REAL-WORLD: READING LARGE FILE LINE BY LINE
public async Task ProcessLargeFileAsync(string filePath)
{
await foreach (var line in ReadLinesAsync(filePath))
{
if (string.IsNullOrWhiteSpace(line)) continue;
var processed = ProcessLine(line);
await SaveProcessedLineAsync(processed);
}
}
private async IAsyncEnumerable<string> ReadLinesAsync(string filePath)
{
using var reader = new StreamReader(filePath);
string line;
while ((line = await reader.ReadLineAsync()) != null)
{
yield return line;
}
}
// REAL-WORLD: PAGINATED API CALLS
public class ApiClient
{
public async IAsyncEnumerable<User> GetUsersAsync()
{
int page = 1;
bool hasMore = true;
while (hasMore)
{
var response = await FetchPageAsync(page);
foreach (var user in response.Users)
{
yield return user;
}
hasMore = response.HasMore;
page++;
}
}
private async Task<PageResponse> FetchPageAsync(int page)
{
// Simulate API call
await Task.Delay(100);
return new PageResponse
{
Users = new[] { new User { Id = page, Name = $"User {page}" } },
HasMore = page < 10
};
}
}
// REAL-WORLD: DATABASE QUERY STREAMING
public class DatabaseStreamer
{
public async IAsyncEnumerable<Record> StreamRecordsAsync(string sql)
{
using var connection = new SqlConnection("connectionString");
await connection.OpenAsync();
using var command = new SqlCommand(sql, connection);
using var reader = await command.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
yield return new Record
{
Id = reader.GetInt32(0),
Name = reader.GetString(1),
Value = reader.GetDecimal(2)
};
}
}
}
// TRANSFORMING ASYNC STREAMS
public async Task TransformStreams()
{
var numbers = GenerateNumbersAsync(100);
// Filter
await foreach (var even in numbers.WhereAwait(async n => await IsEvenAsync(n)))
{
Console.WriteLine($"Even: {even}");
}
// Select (transform)
await foreach (var squared in numbers.SelectAwait(async n => await SquareAsync(n)))
{
Console.WriteLine($"Squared: {squared}");
}
}
// COMBINING MULTIPLE STREAMS
public async Task CombineStreams()
{
var stream1 = GenerateNumbersAsync(10);
var stream2 = GenerateNumbersAsync(10, 100);
// Merge two streams
await foreach (var item in MergeAsync(stream1, stream2))
{
Console.WriteLine($"Merged: {item}");
}
}
private async IAsyncEnumerable<T> MergeAsync<T>(
IAsyncEnumerable<T> first,
IAsyncEnumerable<T> second)
{
await foreach (var item in first)
{
yield return item;
}
await foreach (var item in second)
{
yield return item;
}
}
private async IAsyncEnumerable<int> GenerateNumbersAsync(int count, int start = 0)
{
for (int i = start; i < start + count; i++)
{
await Task.Delay(10);
yield return i;
}
}
// EXTENSION METHODS FOR ASYNC STREAMS
public static class AsyncEnumerableExtensions
{
public static async IAsyncEnumerable<T> WhereAwait<T>(
this IAsyncEnumerable<T> source,
Func<T, Task<bool>> predicate)
{
await foreach (var item in source)
{
if (await predicate(item))
{
yield return item;
}
}
}
public static async IAsyncEnumerable<TResult> SelectAwait<TSource, TResult>(
this IAsyncEnumerable<TSource> source,
Func<TSource, Task<TResult>> selector)
{
await foreach (var item in source)
{
yield return await selector(item);
}
}
public static async Task<List<T>> ToListAsync<T>(this IAsyncEnumerable<T> source)
{
var list = new List<T>();
await foreach (var item in source)
{
list.Add(item);
}
return list;
}
}
// HELPER METHODS
private string ProcessLine(string line) => line.ToUpper();
private Task SaveProcessedLineAsync(string line) => Task.CompletedTask;
private Task<bool> IsEvenAsync(int n) => Task.FromResult(n % 2 == 0);
private Task<int> SquareAsync(int n) => Task.FromResult(n * n);
}
// Helper classes
public class User
{
public int Id { get; set; }
public string Name { get; set; }
}
public class PageResponse
{
public User[] Users { get; set; }
public bool HasMore { get; set; }
}
public class Record
{
public int Id { get; set; }
public string Name { get; set; }
public decimal Value { get; set; }
}
public class SqlConnection : IDisposable
{
public SqlConnection(string connectionString) { }
public Task OpenAsync() => Task.CompletedTask;
public void Dispose() { }
}
public class SqlCommand : IDisposable
{
public SqlCommand(string sql, SqlConnection connection) { }
public Task<SqlDataReader> ExecuteReaderAsync() => Task.FromResult(new SqlDataReader());
public void Dispose() { }
}
public class SqlDataReader : IAsyncDisposable
{
public Task<bool> ReadAsync() => Task.FromResult(false);
public int GetInt32(int ordinal) => 0;
public string GetString(int ordinal) => "";
public decimal GetDecimal(int ordinal) => 0;
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}
58. Parallel Programming (Parallel.For, Parallel.ForEach)
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class ParallelProgrammingDemo
{
// PARALLEL.FOR
public void ParallelForDemo()
{
// Simple parallel for loop
Parallel.For(0, 100, i =>
{
Console.WriteLine($"Processing {i} on thread {Thread.CurrentThread.ManagedThreadId}");
ProcessItem(i);
});
// Parallel for with options
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 4, // Max 4 threads
CancellationToken = CancellationToken.None
};
Parallel.For(0, 1000, options, i =>
{
ProcessItem(i);
});
// Parallel for with state (for early termination)
Parallel.For(0, 1000, (i, state) =>
{
if (i == 500)
{
state.Break(); // Stop processing further items
return;
}
ProcessItem(i);
});
}
// PARALLEL.FOREACH
public void ParallelForEachDemo()
{
var items = Enumerable.Range(0, 1000).ToList();
// Simple parallel foreach
Parallel.ForEach(items, item =>
{
ProcessItem(item);
});
// Parallel foreach with options
var options = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(items, options, ProcessItem);
// Parallel foreach with indexed overload
Parallel.ForEach(items, (item, state, index) =>
{
Console.WriteLine($"Index {index}: {item}");
ProcessItem(item);
});
// Partitioned foreach (for better performance)
var partitioner = Partitioner.Create(items, EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, item =>
{
ProcessItem(item);
});
}
// PARALLEL WITH LOCAL STATE (For aggregations)
public long ParallelSum(int[] numbers)
{
long total = 0;
Parallel.ForEach(numbers,
() => 0L, // Local initializer
(number, state, localSum) => localSum + number, // Body
localSum => Interlocked.Add(ref total, localSum) // Finally
);
return total;
}
// REAL-WORLD: Image Processing
public void ProcessImagesParallel(string[] imagePaths)
{
var options = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
Parallel.ForEach(imagePaths, options, path =>
{
using var image = LoadImage(path);
var processed = ApplyFilters(image);
SaveImage(processed, path);
Console.WriteLine($"Processed: {Path.GetFileName(path)}");
});
}
// REAL-WORLD: Batch Processing
public async Task ProcessBatchParallelAsync(DataBatch[] batches)
{
var results = new ConcurrentBag<BatchResult>();
Parallel.ForEach(batches, batch =>
{
var result = ProcessBatch(batch);
results.Add(result);
});
await SaveResultsAsync(results);
}
// PLINQ (Parallel LINQ)
public void PlinqDemo()
{
var numbers = Enumerable.Range(1, 10000000);
// Sequential LINQ
var sequential = numbers
.Where(x => x % 2 == 0)
.Select(x => x * x)
.Sum();
// Parallel LINQ (automatic parallelization)
var parallel = numbers
.AsParallel()
.Where(x => x % 2 == 0)
.Select(x => x * x)
.Sum();
// With degree of parallelism
var parallelWithDegree = numbers
.AsParallel()
.WithDegreeOfParallelism(4)
.Where(x => x % 2 == 0)
.Select(x => x * x)
.Sum();
// Forced parallel (even for small collections)
var forced = numbers
.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Select(x => HeavyComputation(x))
.ToList();
// Ordered parallel (maintains order)
var ordered = numbers
.AsParallel()
.AsOrdered()
.Where(x => x % 2 == 0)
.Select(x => x * x)
.ToArray();
}
// PLINQ AGGREGATION
public (double Average, int Max, int Min) PlinqAggregation(int[] numbers)
{
var result = numbers
.AsParallel()
.Aggregate(
() => (Sum: 0L, Count: 0, Max: int.MinValue, Min: int.MaxValue),
(acc, num) => (
Sum: acc.Sum + num,
Count: acc.Count + 1,
Max: Math.Max(acc.Max, num),
Min: Math.Min(acc.Min, num)
),
(acc1, acc2) => (
Sum: acc1.Sum + acc2.Sum,
Count: acc1.Count + acc2.Count,
Max: Math.Max(acc1.Max, acc2.Max),
Min: Math.Min(acc1.Min, acc2.Min)
),
final => (
Average: (double)final.Sum / final.Count,
Max: final.Max,
Min: final.Min
)
);
return result;
}
// PERFORMANCE COMPARISON
public void PerformanceComparison()
{
const int size = 100_000_000;
var numbers = Enumerable.Range(1, size).ToArray();
// Sequential
var sw = System.Diagnostics.Stopwatch.StartNew();
var sequential = numbers.Select(x => HeavyComputation(x)).Sum();
sw.Stop();
Console.WriteLine($"Sequential: {sw.ElapsedMilliseconds}ms");
// Parallel (PLINQ)
sw.Restart();
var parallel = numbers.AsParallel().Select(x => HeavyComputation(x)).Sum();
sw.Stop();
Console.WriteLine($"Parallel: {sw.ElapsedMilliseconds}ms");
// Parallel.For
sw.Restart();
long total = 0;
Parallel.ForEach(numbers, () => 0L,
(x, state, local) => local + HeavyComputation(x),
local => Interlocked.Add(ref total, local));
sw.Stop();
Console.WriteLine($"Parallel.For: {sw.ElapsedMilliseconds}ms");
}
// CAUTION: Not all operations benefit from parallelization
public void WhenNotToUseParallel()
{
// ❌ DON'T: Very small collections (overhead > benefit)
var smallList = Enumerable.Range(1, 10);
var bad = smallList.AsParallel().Select(x => x * x).ToList();
// ✅ DO: Use sequential for small collections
var good = smallList.Select(x => x * x).ToList();
// ❌ DON'T: IO-bound operations (use async instead)
var files = Directory.GetFiles(@"C:\Temp");
Parallel.ForEach(files, file =>
{
// Bad: File I/O doesn't benefit from CPU parallelism
var content = File.ReadAllText(file);
});
// ✅ DO: Use async for IO-bound
// await Task.WhenAll(files.Select(file => File.ReadAllTextAsync(file)));
// ❌ DON'T: Operations that lock frequently
var results = new List<int>();
Parallel.For(0, 1000, i =>
{
lock (results) // Frequent locking kills performance
{
results.Add(i);
}
});
// ✅ DO: Use thread-safe collections
var concurrentResults = new ConcurrentBag<int>();
Parallel.For(0, 1000, i =>
{
concurrentResults.Add(i); // Lock-free, much faster
});
}
// HELPER METHODS
private void ProcessItem(int item)
{
// Simulate work
Thread.Sleep(1);
}
private int HeavyComputation(int x)
{
// Simulate CPU-intensive work
double result = 0;
for (int i = 0; i < 1000; i++)
{
result += Math.Sqrt(x * i);
}
return (int)result;
}
// Image processing helpers
private object LoadImage(string path) => new object();
private object ApplyFilters(object image) => image;
private void SaveImage(object image, string path) { }
// Batch processing helpers
private BatchResult ProcessBatch(DataBatch batch) => new BatchResult();
private Task SaveResultsAsync(ConcurrentBag<BatchResult> results) => Task.CompletedTask;
private static class Path
{
public static string GetFileName(string path) => System.IO.Path.GetFileName(path);
}
}
public class DataBatch { }
public class BatchResult { }
59. PLINQ (Parallel LINQ - Advanced)
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PlinqAdvancedDemo
{
// PLINQ CANCELLATION
public void PlinqWithCancellation()
{
using var cts = new CancellationTokenSource();
var numbers = Enumerable.Range(1, 10000000);
// Cancel after 1 second
cts.CancelAfter(1000);
try
{
var result = numbers
.AsParallel()
.WithCancellation(cts.Token)
.Where(x => x % 2 == 0)
.Select(x => HeavyComputation(x))
.ToArray();
}
catch (OperationCanceledException)
{
Console.WriteLine("PLINQ query cancelled");
}
}
// PLINQ WITH EXCEPTIONS
public void PlinqWithExceptions()
{
var numbers = Enumerable.Range(1, 1000);
try
{
var result = numbers
.AsParallel()
.Select(x =>
{
if (x == 500)
throw new InvalidOperationException($"Error at {x}");
return x * 2;
})
.ToArray();
}
catch (AggregateException ex)
{
foreach (var innerEx in ex.InnerExceptions)
{
Console.WriteLine($"Error: {innerEx.Message}");
}
}
}
// CUSTOM PARTITIONER (For better load balancing)
public void CustomPartitioner()
{
var data = Enumerable.Range(1, 1000000);
// Range partitioner (good for simple loops)
var rangePartitioner = Partitioner.Create(0, 1000000, 10000);
Parallel.ForEach(rangePartitioner, range =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
ProcessItem(i);
}
});
// Chunk partitioner (for IEnumerable)
var chunkPartitioner = Partitioner.Create(data, EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(chunkPartitioner, item =>
{
ProcessItem(item);
});
}
// PLINQ ORDERING
public void PlinqOrdering()
{
var data = Enumerable.Range(1, 1000);
// Unordered (fastest, no preservation)
var unordered = data
.AsParallel()
.AsUnordered()
.Where(x => x % 2 == 0)
.Select(x => x * x)
.ToArray();
// Ordered (preserves original order, slower)
var ordered = data
.AsParallel()
.AsOrdered()
.Where(x => x % 2 == 0)
.Select(x => x * x)
.ToArray();
// For sequential operations after parallel
var withSequential = data
.AsParallel()
.Where(x => x % 2 == 0)
.AsSequential() // Switch back to sequential
.OrderBy(x => x)
.Select(x => x * x)
.ToList();
}
// PLINQ MERGE OPTIONS
public void PlinqMergeOptions()
{
var data = Enumerable.Range(1, 1000000);
// Auto buffered (default) - balances speed and memory
var autoBuffered = data
.AsParallel()
.WithMergeOptions(ParallelMergeOptions.AutoBuffered)
.Select(x => HeavyComputation(x))
.ToArray();
// Fully buffered - fastest for large result sets, more memory
var fullyBuffered = data
.AsParallel()
.WithMergeOptions(ParallelMergeOptions.FullyBuffered)
.Select(x => HeavyComputation(x))
.ToArray();
// Not buffered - lowest memory, results as they come
var notBuffered = data
.AsParallel()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x => HeavyComputation(x))
.ToArray();
}
// PLINQ FOR SORTING
public void PlinqSorting()
{
var data = Enumerable.Range(1, 1000000).Reverse();
// Parallel sort using OrderBy
var sorted = data
.AsParallel()
.AsOrdered()
.OrderBy(x => x)
.ToArray();
// More efficient for known range
var rangePartitioner = Partitioner.Create(0, 1000000, 10000);
var result = new ConcurrentBag<int>();
Parallel.ForEach(rangePartitioner, range =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
result.Add(i);
}
});
}
// REAL-WORLD: Data Analysis Pipeline
public class DataAnalysisPipeline
{
public async Task<AnalysisResult> AnalyzeAsync(DataSet data)
{
// Parallel processing of different metrics
var tasks = new[]
{
Task.Run(() => CalculateMean(data)),
Task.Run(() => CalculateMedian(data)),
Task.Run(() => CalculateStandardDeviation(data)),
Task.Run(() => FindOutliers(data)),
Task.Run(() => CalculatePercentiles(data))
};
await Task.WhenAll(tasks);
return new AnalysisResult
{
Mean = tasks[0].Result,
Median = tasks[1].Result,
StandardDeviation = tasks[2].Result,
Outliers = tasks[3].Result,
Percentiles = tasks[4].Result
};
}
private double CalculateMean(DataSet data)
{
return data.Values
.AsParallel()
.Average();
}
private double CalculateMedian(DataSet data)
{
var sorted = data.Values
.AsParallel()
.AsOrdered()
.OrderBy(x => x)
.ToArray();
int mid = sorted.Length / 2;
return sorted.Length % 2 == 0
? (sorted[mid - 1] + sorted[mid]) / 2.0
: sorted[mid];
}
private double CalculateStandardDeviation(DataSet data)
{
var mean = CalculateMean(data);
var variance = data.Values
.AsParallel()
.Select(x => Math.Pow(x - mean, 2))
.Average();
return Math.Sqrt(variance);
}
private double[] FindOutliers(DataSet data)
{
var mean = CalculateMean(data);
var stdDev = CalculateStandardDeviation(data);
return data.Values
.AsParallel()
.Where(x => Math.Abs(x - mean) > 3 * stdDev)
.ToArray();
}
private double[] CalculatePercentiles(DataSet data)
{
var sorted = data.Values
.AsParallel()
.AsOrdered()
.OrderBy(x => x)
.ToArray();
return new[]
{
Percentile(sorted, 25),
Percentile(sorted, 50),
Percentile(sorted, 75),
Percentile(sorted, 90),
Percentile(sorted, 95)
};
}
private double Percentile(double[] sorted, int percentile)
{
double index = (percentile / 100.0) * (sorted.Length - 1);
int lower = (int)Math.Floor(index);
int upper = (int)Math.Ceiling(index);
if (lower == upper)
return sorted[lower];
return sorted[lower] + (sorted[upper] - sorted[lower]) * (index - lower);
}
}
// REAL-WORLD: Log File Analysis
public class LogAnalyzer
{
public async Task<LogStats> AnalyzeLogsAsync(string[] logFiles)
{
var stats = new LogStats();
// Process each file in parallel
var fileResults = await Task.WhenAll(
logFiles.Select(file => Task.Run(() => AnalyzeFile(file)))
);
// Aggregate results
foreach (var result in fileResults)
{
stats.TotalLines += result.TotalLines;
stats.ErrorCount += result.ErrorCount;
stats.WarningCount += result.WarningCount;
stats.InfoCount += result.InfoCount;
foreach (var error in result.Errors)
{
stats.Errors.Add(error);
}
}
return stats;
}
private FileAnalysis AnalyzeFile(string filePath)
{
var lines = File.ReadAllLines(filePath);
var errors = lines
.AsParallel()
.Where(line => line.Contains("ERROR"))
.Select(line => ParseError(line))
.ToList();
return new FileAnalysis
{
TotalLines = lines.Length,
ErrorCount = errors.Count(e => e.Level == "ERROR"),
WarningCount = lines.Count(l => l.Contains("WARNING")),
InfoCount = lines.Count(l => l.Contains("INFO")),
Errors = errors
};
}
private LogEntry ParseError(string line) => new LogEntry { Level = "ERROR", Message = line };
}
// HELPER METHODS
private int HeavyComputation(int x)
{
double result = 0;
for (int i = 0; i < 10000; i++)
{
result += Math.Sqrt(x * i);
}
return (int)result;
}
private void ProcessItem(int item) => Thread.Sleep(1);
}
// Helper classes
public class DataSet
{
public double[] Values { get; set; }
}
public class AnalysisResult
{
public double Mean { get; set; }
public double Median { get; set; }
public double StandardDeviation { get; set; }
public double[] Outliers { get; set; }
public double[] Percentiles { get; set; }
}
public class LogStats
{
public int TotalLines { get; set; }
public int ErrorCount { get; set; }
public int WarningCount { get; set; }
public int InfoCount { get; set; }
public List<LogEntry> Errors { get; set; } = new();
}
public class FileAnalysis
{
public int TotalLines { get; set; }
public int ErrorCount { get; set; }
public int WarningCount { get; set; }
public int InfoCount { get; set; }
public List<LogEntry> Errors { get; set; }
}
public class LogEntry
{
public string Level { get; set; }
public string Message { get; set; }
}
60. Concurrent Collections (Thread-Safe Data Structures)
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class ConcurrentCollectionsDemo
{
// CONCURRENTDICTIONARY<TKey, TValue>
public void ConcurrentDictionaryDemo()
{
var dict = new ConcurrentDictionary<string, int>();
// Add or update
dict.TryAdd("key1", 1);
dict["key2"] = 2; // Also works
// Get or add
int value = dict.GetOrAdd("key3", 3);
// Add or update with factory
int updated = dict.AddOrUpdate("key1",
key => 10,
(key, existing) => existing + 1);
// Remove
dict.TryRemove("key2", out int removedValue);
// Update if exists
dict.TryUpdate("key1", 100, 2); // Only updates if current value is 2
// Thread-safe enumeration
foreach (var kvp in dict)
{
// Safe to read while being modified
Console.WriteLine($"{kvp.Key}: {kvp.Value}");
}
// Get all keys/values (snapshot)
var keys = dict.Keys; // Snapshot
var values = dict.Values; // Snapshot
}
// CONCURRENTBAG<T> (Unordered, fast for adding/removing from same thread)
public void ConcurrentBagDemo()
{
var bag = new ConcurrentBag<int>();
// Add items
bag.Add(1);
bag.Add(2);
bag.Add(3);
// Try to take (remove)
if (bag.TryTake(out int item))
{
Console.WriteLine($"Removed: {item}");
}
// Peek without removing
if (bag.TryPeek(out int peeked))
{
Console.WriteLine($"Next item: {peeked}");
}
// Thread-local awareness (maintains local cache per thread)
Parallel.For(0, 1000, i =>
{
bag.Add(i);
// This thread tends to get its own items (LIFO)
if (bag.TryTake(out int localItem))
{
// Process localItem
}
});
}
// CONCURRENTQUEUE<T> (FIFO)
public async Task ConcurrentQueueDemo()
{
var queue = new ConcurrentQueue<int>();
// Producer
var producer = Task.Run(() =>
{
for (int i = 0; i < 1000; i++)
{
queue.Enqueue(i);
Thread.Sleep(1);
}
});
// Consumer
var consumer = Task.Run(() =>
{
int itemsProcessed = 0;
while (itemsProcessed < 1000)
{
if (queue.TryDequeue(out int item))
{
ProcessItem(item);
itemsProcessed++;
}
else
{
Thread.Sleep(1);
}
}
});
await Task.WhenAll(producer, consumer);
}
// CONCURRENTSTACK<T> (LIFO)
public void ConcurrentStackDemo()
{
var stack = new ConcurrentStack<int>();
// Push
stack.Push(1);
stack.Push(2);
stack.Push(3);
// Push multiple
stack.PushRange(new[] { 4, 5, 6 });
// Pop
if (stack.TryPop(out int item))
{
Console.WriteLine($"Popped: {item}");
}
// Pop multiple
var items = new int[3];
int popped = stack.TryPopRange(items);
// Peek
if (stack.TryPeek(out int peeked))
{
Console.WriteLine($"Top: {peeked}");
}
}
// BLOCKINGCOLLECTION<T> (Producer-Consumer with blocking)
public async Task BlockingCollectionDemo()
{
// Bounded collection (max 100 items)
var collection = new BlockingCollection<int>(100);
// Producer
var producer = Task.Run(() =>
{
for (int i = 0; i < 1000; i++)
{
collection.Add(i); // Blocks if full
Console.WriteLine($"Produced: {i}");
}
collection.CompleteAdding(); // Signal completion
});
// Consumer
var consumer = Task.Run(() =>
{
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine($"Consumed: {item}");
ProcessItem(item);
}
});
await Task.WhenAll(producer, consumer);
}
// BLOCKING COLLECTION WITH BOUNDS
public async Task BlockingCollectionWithBounds()
{
using var collection = new BlockingCollection<int>(boundedCapacity: 10);
// Fast producer (will block when full)
var producer = Task.Run(async () =>
{
for (int i = 0; i < 100; i++)
{
collection.Add(i);
Console.WriteLine($"Added: {i}");
await Task.Delay(10);
}
collection.CompleteAdding();
});
// Slow consumer
var consumer = Task.Run(async () =>
{
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine($"Processing: {item}");
await Task.Delay(100); // Slow processing
}
});
await Task.WhenAll(producer, consumer);
}
// REAL-WORLD: Producer-Consumer Pipeline
public class ProcessingPipeline
{
private readonly BlockingCollection<WorkItem> _inputQueue;
private readonly BlockingCollection<ProcessedItem> _outputQueue;
public ProcessingPipeline(int capacity = 100)
{
_inputQueue = new BlockingCollection<WorkItem>(capacity);
_outputQueue = new BlockingCollection<ProcessedItem>(capacity);
}
public async Task StartAsync(int workerCount)
{
var workers = Enumerable.Range(0, workerCount)
.Select(_ => Task.Run(WorkerLoop))
.ToArray();
await Task.WhenAll(workers);
}
public void Enqueue(WorkItem item) => _inputQueue.Add(item);
public async Task<List<ProcessedItem>> GetResultsAsync(CancellationToken token = default)
{
var results = new List<ProcessedItem>();
while (!_outputQueue.IsCompleted && !token.IsCancellationRequested)
{
if (_outputQueue.TryTake(out var item, 100))
{
results.Add(item);
}
}
return results;
}
private async Task WorkerLoop()
{
foreach (var item in _inputQueue.GetConsumingEnumerable())
{
var processed = await ProcessItemAsync(item);
_outputQueue.Add(processed);
}
}
public void Complete() => _inputQueue.CompleteAdding();
private Task<ProcessedItem> ProcessItemAsync(WorkItem item)
{
// Simulate processing
return Task.FromResult(new ProcessedItem { Id = item.Id, Result = item.Data.ToUpper() });
}
}
// REAL-WORLD: Rate Limiter using ConcurrentQueue
public class RateLimiter
{
private readonly ConcurrentQueue<DateTime> _requestTimes = new();
private readonly int _maxRequests;
private readonly TimeSpan _timeWindow;
public RateLimiter(int maxRequests, TimeSpan timeWindow)
{
_maxRequests = maxRequests;
_timeWindow = timeWindow;
}
public bool TryAcquire()
{
CleanupOldRequests();
if (_requestTimes.Count < _maxRequests)
{
_requestTimes.Enqueue(DateTime.UtcNow);
return true;
}
return false;
}
private void CleanupOldRequests()
{
var cutoff = DateTime.UtcNow - _timeWindow;
while (_requestTimes.TryPeek(out DateTime oldest))
{
if (oldest < cutoff)
{
_requestTimes.TryDequeue(out _);
}
else
{
break;
}
}
}
}
// REAL-WORLD: LRU Cache with ConcurrentDictionary
public class LRUCache<TKey, TValue>
{
private readonly ConcurrentDictionary<TKey, (TValue Value, LinkedListNode<TKey> Node)> _cache;
private readonly LinkedList<TKey> _accessOrder;
private readonly int _capacity;
public LRUCache(int capacity)
{
_capacity = capacity;
_cache = new ConcurrentDictionary<TKey, (TValue, LinkedListNode<TKey>)>();
_accessOrder = new LinkedList<TKey>();
}
public TValue Get(TKey key)
{
if (_cache.TryGetValue(key, out var entry))
{
lock (_accessOrder)
{
_accessOrder.Remove(entry.Node);
_accessOrder.AddFirst(entry.Node);
}
return entry.Value;
}
return default;
}
public void Put(TKey key, TValue value)
{
if (_cache.TryGetValue(key, out var existing))
{
lock (_accessOrder)
{
_accessOrder.Remove(existing.Node);
var newNode = _accessOrder.AddFirst(key);
_cache[key] = (value, newNode);
}
return;
}
lock (_accessOrder)
{
if (_cache.Count >= _capacity)
{
var leastRecent = _accessOrder.Last;
if (leastRecent != null)
{
_cache.TryRemove(leastRecent.Value, out _);
_accessOrder.RemoveLast();
}
}
var node = _accessOrder.AddFirst(key);
_cache[key] = (value, node);
}
}
}
// HELPER METHODS
private void ProcessItem(int item) => Thread.Sleep(10);
// Helper classes
public class WorkItem
{
public int Id { get; set; }
public string Data { get; set; }
}
public class ProcessedItem
{
public int Id { get; set; }
public string Result { get; set; }
}
}
// SUMMARY - When to use which collection:
//
// ConcurrentDictionary: Key-value lookups (most common)
// ConcurrentBag: Unordered, high-throughput adds/removes
// ConcurrentQueue: FIFO producer-consumer
// ConcurrentStack: LIFO stack operations
// BlockingCollection: Bounded producer-consumer with blocking
Module 5 Summary
You've mastered C# Asynchronous Programming:
Core Concepts (52-53)
- ✅ Task & Task - Async operation representation
- ✅ Async/Await - Language pattern for async code
- ✅ State Machine - Compiler transforms async methods
- ✅ ValueTask - Performance optimization for cached results
Control Flow (54-56)
- ✅ CancellationToken - Graceful cancellation
- ✅ IProgress - Progress reporting
- ✅ ConfigureAwait - Context control (avoid deadlocks)
- ✅ Async void - Only for event handlers
Advanced Patterns (57-59)
- ✅ IAsyncEnumerable - Async streams (push-based)
- ✅ Parallel.For/ForEach - CPU-bound parallelization
- ✅ PLINQ - Parallel LINQ queries
- ✅ AsParallel/AsSequential - Control parallelism
Data Structures (60)
- ✅ ConcurrentDictionary - Thread-safe dictionary
- ✅ ConcurrentBag/Queue/Stack - Thread-safe collections
- ✅ BlockingCollection - Producer-consumer queue
Practice Exercises for Module 5
Exercise 1: Async Rate Limiter
// Implement async rate limiter that:
// - Limits requests per time window
// - Provides async throttle method
// - Supports burst capacity
// - Returns Task that completes when request allowed
Exercise 2: Parallel Image Processor
// Create image processor that:
// - Loads images in parallel
// - Applies filters concurrently
// - Reports progress per image
// - Supports cancellation
// - Saves results in order
Exercise 3: Async Event Aggregator
// Build event system with:
// - Publish/subscribe pattern
// - Async handlers
// - Weak references to prevent memory leaks
// - Thread-safe dispatch
// - Handler priority
Exercise 4: Real-time Data Pipeline
// Implement data pipeline with:
// - Multiple producers (IAsyncEnumerable)
// - Parallel processing stage
// - Batch aggregation
// - Ordered output
// - Backpressure handling
// - Dead letter queue for failures