Back to: LINQ Tutorial For Beginners and Professionals
Advanced PLINQ Features with Examples
In this article, I will discuss Advanced PLINQ Features with Examples. Please read our previous article discussing Partitioning Strategies for Data Parallelism in PLINQ with Examples. At the end of this article, you will understand the following pointers:
- Custom Aggregation with PLINQ
- Using PLINQ with Custom Types and Collections
- PLINQ and Asynchronous Programming Integration
- How to Cancel the PLINQ Query Execution using Cancellation Token
- Parallel Query Execution Planning in PLINQ
Custom Aggregation with PLINQ
Custom aggregation in PLINQ refers to the process of aggregating data in a custom manner while using PLINQ’s parallel processing capabilities. This can be useful when you have large datasets and need to perform complex aggregations that the standard query operators don’t directly support.
Implementing Custom Aggregation in PLINQ
To implement custom aggregation in PLINQ, you can use the Aggregate operator. The Aggregate operator has several overloads, but to perform a custom aggregation, you typically use the one that allows you to specify a seed value, an accumulator function, and a final result selector. This allows you to define precisely how the aggregation should be carried out.
Here’s a step-by-step guide on how to perform a custom aggregation using PLINQ:
- Convert to Parallel Query: Start by converting your data source into a parallel query using the AsParallel method.
- Apply the Aggregate Operator: Use the Aggregate operator to specify your custom aggregation logic. You’ll need to provide:
-
-
-
- A seed value is the initial accumulator state.
- An accumulator function which updates the accumulator state for each element.
- A combiner function which combines two accumulator states. This is crucial for parallel processing, as it defines how to merge results from different partitions of the data.
- Optionally, a result selector that can transform the final accumulator state into the desired result form.
-
-
- Query Execution: PLINQ queries are executed lazily, meaning that the query execution is deferred until the result is actually iterated or forced to evaluate. To trigger the execution, you may need to enumerate the result or convert it to a list or an array.
Custom Aggregation Example with PLINQ
Suppose we have a collection of sales data where each record is an object that contains Amount and Category properties. We aim to find the total sales amount for each category, utilizing parallel processing to speed up the computation.
The example below demonstrates how to use the Aggregate method in PLINQ for this purpose. We will use a custom aggregation function that maintains a dictionary to track the sum of sales amounts for each category. This approach shows how to implement more complex aggregations that go beyond simple sums or averages.
using System; using System.Collections.Generic; using System.Linq; namespace PLINQDemo { class Program { static void Main(string[] args) { // Example list of sales records List<SaleRecord> sales = new List<SaleRecord> { new SaleRecord { Category = "Electronics", Amount = 1000 }, new SaleRecord { Category = "Books", Amount = 500 }, new SaleRecord { Category = "Electronics", Amount = 1500 }, new SaleRecord { Category = "Clothing", Amount = 700 }, // Add more records as needed }; // Perform the aggregation var categoryTotals = sales .AsParallel() .Aggregate( // Seed factory function to initialize the local total for each thread () => new Dictionary<string, decimal>(), // Accumulator function to sum amounts by category locally on each thread (localTotals, sale) => { if (!localTotals.ContainsKey(sale.Category)) localTotals[sale.Category] = 0; localTotals[sale.Category] += sale.Amount; return localTotals; }, // Combiner function to merge local totals from each thread (mainTotals, localTotals) => { foreach (var localTotal in localTotals) { if (!mainTotals.ContainsKey(localTotal.Key)) mainTotals[localTotal.Key] = 0; mainTotals[localTotal.Key] += localTotal.Value; } return mainTotals; }, // Final result selector, could be used to transform the result but just returns it here finalTotals => finalTotals ); // Display the results foreach (var categoryTotal in categoryTotals) { Console.WriteLine($"Category: {categoryTotal.Key}, Total Sales: {categoryTotal.Value}"); } Console.ReadKey(); } } public class SaleRecord { public string Category { get; set; } public decimal Amount { get; set; } } }
Understanding the LINQ Query:
The LINQ Query is performing an advanced aggregation operation on a collection of sales data using PLINQ (Parallel LINQ), aiming to calculate the total sales amount for each category in parallel. Let’s break down each part of the Aggregate method call to understand how it works:
AsParallel(): This method is called on the sales collection to enable parallel processing. It partitions the collection into segments that can be processed concurrently across multiple threads, aiming to utilize multiple processor cores for faster execution.
Seed Factory Function.
(() => new Dictionary<string, decimal>()): This is a function that initializes the local total for each thread. Each thread starts with its own local dictionary, where keys are categories and values are the total sales amounts for those categories. This ensures that each thread works on its own separate instance, avoiding concurrency issues.
Accumulator Function:
(localTotals, sale) => { if (!localTotals.ContainsKey(sale.Category)) localTotals[sale.Category] = 0; localTotals[sale.Category] += sale.Amount; return localTotals; }
This function processes each sale record in the segment of the collection assigned to a thread. It checks if the localTotals dictionary already has an entry for the sale category. If not, it initializes it to 0. Then, it adds the sale amount to the total for that category. This step is performed locally within each thread, reducing the need for synchronization.
Combiner Function:
(mainTotals, localTotals) => { foreach (var localTotal in localTotals) { if (!mainTotals.ContainsKey(localTotal.Key)) mainTotals[localTotal.Key] = 0; mainTotals[localTotal.Key] += localTotal.Value; } return mainTotals; }
Once all threads have processed their assigned segments, this function merges the local totals from each thread into a main total. It iterates over each key-value pair in the local totals dictionary. For each category, it checks if it’s already in the main totals dictionary; if not, it initializes it. Then, it adds the local total to the main total. This step consolidates the results from all threads into a single dictionary.
Final Result Selector
(finalTotals => finalTotals): This part of the code is relatively straightforward in this example; it simply returns the final aggregated totals without any additional transformation. However, this function can be used to transform the aggregated result into a different shape or format if needed. When you run the above code, you will get the following output:
This example demonstrates several key concepts:
- Parallel Processing: The aggregation is performed in parallel, with each thread working on a portion of the data.
- Custom Aggregation Logic: Using a dictionary as the accumulator for summing values by category.
- Thread Safety: This approach is thread-safe since each thread starts with its own local dictionary (seed), and dictionaries are merged in the combiner function.
- Performance: This approach can significantly improve performance for large datasets by leveraging multiple cores.
Using PLINQ with Custom Types and Collections
Using PLINQ (Parallel LINQ) with custom types and collections can significantly enhance the performance of your .NET applications by enabling parallel processing of data. PLINQ is a part of the Task Parallel Library (TPL) and is designed to make it easier to write parallel queries that take advantage of multicore processors. Here’s how you can use PLINQ with custom types and collections:
Defining Custom Types
First, define your custom types as you normally would. Here’s a simple example of a custom type:
public class Person { public string Name { get; set; } public int Age { get; set; } }
Creating a Collection of Custom Types
Next, create a collection of your custom type. For PLINQ, you can use any collection that implements IEnumerable<T>, such as List<T>, Array, or your own custom collection.
List<Person> people = new List<Person> { new Person { Name = "Alice", Age = 30 }, new Person { Name = "Bob", Age = 25 }, new Person { Name = "Charlie", Age = 35 } };
Using PLINQ with Custom Collections
To use PLINQ with your custom collection, call the AsParallel method on the collection and then apply your LINQ query. Here’s an example where we filter and order the collection in parallel:
var youngPeople = people.AsParallel() .Where(person => person.Age < 35) .OrderBy(person => person.Age) .ToList(); foreach (var person in youngPeople) { Console.WriteLine($"{person.Name} is {person.Age} years old."); }
Handling Exceptions
Parallel queries can throw an AggregateException if one or more operations fail. You should handle this exception and inspect its InnerExceptions property to understand the specific errors that occurred.
try { // PLINQ query } catch (AggregateException e) { foreach (var inner in e.InnerExceptions) { Console.WriteLine(inner.Message); } }
Controlling Parallelism
PLINQ automatically determines the degree of parallelism. However, you can control it using the WithDegreeOfParallelism method if needed.
var youngPeople = people.AsParallel() .WithDegreeOfParallelism(2) // Limit to 2 cores .Where(person => person.Age < 35) .OrderBy(person => person.Age) .ToList();
The complete example code is given below:
using System; using System.Collections.Generic; using System.Linq; namespace PLINQDemo { class Program { static void Main(string[] args) { try { List<Person> people = new List<Person> { new Person { Name = "Alice", Age = 30 }, new Person { Name = "Bob", Age = 25 }, new Person { Name = "Charlie", Age = 35 }, new Person { Name = "Sara", Age = 30 }, new Person { Name = "Pam", Age = 36 }, new Person { Name = "James", Age = 45 } }; var youngPeople = people.AsParallel() .WithDegreeOfParallelism(2) // Limit to 2 cores .Where(person => person.Age < 35) .OrderBy(person => person.Age) .ToList(); foreach (var person in youngPeople) { Console.WriteLine($"{person.Name} is {person.Age} years old."); } } catch (AggregateException e) { foreach (var inner in e.InnerExceptions) { Console.WriteLine(inner.Message); } } Console.ReadKey(); } } public class Person { public string Name { get; set; } public int Age { get; set; } } }
When you run the above code, you will get the following output:
PLINQ and Asynchronous Programming Integration
Asynchronous programming is a model that enables non-blocking operations. It allows a program to initiate a potentially long-running task and proceed without waiting for the task to finish. This is useful for I/O-bound operations or any long-duration tasks such as network requests, file operations, or database transactions, ensuring the application remains responsive.
Integrating PLINQ with Asynchronous Programming
Integrating PLINQ with asynchronous programming involves running PLINQ queries so they do not block the main thread, thus keeping your application responsive. Here’s how you can achieve this:
Asynchronous Queries with PLINQ: You can execute PLINQ queries asynchronously by wrapping them in a task. This allows the PLINQ query to run on a separate thread, enabling the main thread to continue executing without waiting for the query to complete.
var query = from item in dataSource.AsParallel() where SomePredicate(item) select SomeTransformation(item); Task<List<YourType>> task = Task.Run(() => query.ToList());
In this example, Task.Run is used to execute the PLINQ query asynchronously. dataSource.AsParallel() is what enables PLINQ to process the data in parallel.
Awaiting the Results: Since the PLINQ operation is wrapped in a task, you can await it using an asynchronous method. This allows your application to be responsive, waiting for the PLINQ query to complete without blocking the main thread.
List<YourType> result = await task;
Combining Asynchronous Operations with PLINQ: In scenarios where your data processing involves both asynchronous operations and parallel queries, you can mix async/await with PLINQ to optimize performance. For instance, you might fetch data asynchronously from a database or web service and then use PLINQ to process that data in parallel.
var data = await FetchDataAsync(); // Asynchronous data fetch var result = data.AsParallel().Where(predicate).Select(transformation).ToList();
PLINQ and Asynchronous Programming Integration Example
Let’s create a simple example to demonstrate the integration of PLINQ with asynchronous programming. This example will asynchronously fetch a list of numbers, perform a parallel query to process these numbers (e.g., filtering and transforming), and then display the results. For simplicity, we’ll simulate the asynchronous data fetching and focus on integrating async/await with PLINQ.
The following example demonstrates combining PLINQ for parallel processing with asynchronous programming to keep applications responsive, especially when dealing with I/O operations and CPU-bound processing tasks.
using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace PLINQDemo { class Program { static async Task Main(string[] args) { try { // Asynchronously fetch data List<int> numbers = await FetchDataAsync(); // Use PLINQ to process data in parallel and asynchronously var processedNumbers = await Task.Run(() => numbers.AsParallel() .Where(n => IsPrime(n)) // For example, filter prime numbers .Select(n => n * 2) // For example, double each number .ToList()); // Display the results Console.WriteLine("Processed Numbers:"); processedNumbers.ForEach(Console.WriteLine); } catch (AggregateException e) { foreach (var inner in e.InnerExceptions) { Console.WriteLine(inner.Message); } } Console.ReadKey(); } static async Task<List<int>> FetchDataAsync() { // Simulate an asynchronous data fetch operation return await Task.Run(() => { var numbers = Enumerable.Range(1, 100000).ToList(); return numbers; }); } static bool IsPrime(int number) { if (number <= 1) return false; if (number == 2) return true; if (number % 2 == 0) return false; var boundary = (int)Math.Floor(Math.Sqrt(number)); for (int i = 3; i <= boundary; i += 2) { if (number % i == 0) return false; } return true; } } }
Explanation
- FetchDataAsync: This method simulates an asynchronous operation to fetch data. Here, it generates a list of integers from 1 to 100,000. In a real-world scenario, this could be replaced with a call to an asynchronous API to fetch data from a database or a web service.
- Main Method: This is the application’s entry point. It awaits the FetchDataAsync method to fetch data asynchronously. Then, it uses Task.Run to execute a PLINQ query in parallel, which filters prime numbers and doubles them. The PLINQ query is wrapped in Task.Run to ensure it runs asynchronously, preventing the main thread from blocking.
- IsPrime: A simple method to check if a number is prime. This is used in the PLINQ query to filter prime numbers.
How to Cancel the PLINQ Query Execution using Cancellation Token
To cancel a PLINQ (Parallel LINQ) query execution using a CancellationToken, we need to follow these steps. This technique is useful when you want to provide an option to stop a long-running PLINQ query based on some condition, such as user input or a specific timeout. Open the Program.cs file and write your PLINQ query using cancellation support. Here’s an example of how to do it:
using System; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace PLINQDemo { class Program { static void Main(string[] args) { // Create a cancellation token source CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); // Simulate a condition to cancel, for example, after 3 seconds Task.Run(() => { Thread.Sleep(3000); // Wait for 3 seconds cancellationTokenSource.Cancel(); Console.WriteLine("Cancellation Requested."); }); try { // Sample data var numbers = Enumerable.Range(1, 100); // PLINQ query with cancellation token var query = numbers.AsParallel() .WithCancellation(cancellationTokenSource.Token) .Select(n => { // Simulating some work Thread.Sleep(500); return n * n; }); // Execute the query foreach (var n in query) { Console.WriteLine($"Processing number: {n}"); } } catch (OperationCanceledException) { Console.WriteLine("Operation was canceled."); } catch (Exception ex) { Console.WriteLine($"An exception occurred: {ex.Message}"); } Console.ReadKey(); } } }
Explanation:
- CancellationTokenSource: Used to signal cancellation. You can call Cancel on this object to trigger the cancellation.
- Task.Run: Simulates a condition that the operation should be canceled after some time (3 seconds in this example). This could be based on user input or any other logic in a real application.
- WithCancellation: This method passes the CancellationToken to the PLINQ query, making it responsive to cancellation requests.
After running the application, it will start processing the numbers. After 3 seconds, the cancellation request will be triggered, and you will see the message “Cancellation requested.” followed by “Operation was canceled.” This indicates that the PLINQ query has been successfully canceled.
Parallel Query Execution Planning in PLINQ
Parallel LINQ (PLINQ) is a parallel implementation of LINQ to Objects, part of the .NET framework, which allows for parallel execution of queries on collections that implement IEnumerable<T>. This parallel execution can lead to significant performance improvements for certain types of queries and data sizes, especially on systems with multiple cores or processors. PLINQ automatically partitions the data source, schedules the work across multiple threads, and handles thread synchronization for you.
Key Concepts in PLINQ Parallel Query Execution Planning:
- Automatic Parallelization: PLINQ attempts to parallelize queries automatically by dividing the source data into partitions and processing these partitions in parallel across multiple threads.
- AsOrdered and AsUnordered: By default, PLINQ processes data in an unordered fashion to maximize performance. If order matters, you can use AsOrdered to preserve the order of the source sequence at the cost of potential performance penalties. AsUnordered can be used to remove order preservation and possibly improve performance.
- WithDegreeOfParallelism: This method allows you to specify the number of processors to use for the query. It’s useful for controlling how much parallelism is applied, especially when limiting resource utilization or working with IO-bound operations.
- WithExecutionMode: PLINQ supports two execution modes – default and forced parallelism. By default, PLINQ evaluates whether parallelization will likely offer a performance benefit and may choose to run the query sequentially. Forced parallelism (WithExecutionMode(ParallelExecutionMode.ForceParallelism)) instructs PLINQ to parallelize the query regardless.
- Aggregation and Merge Options: PLINQ supports various ways to aggregate results from parallel operations. It also provides merge options (WithMergeOptions) to control how results from parallel tasks are combined. For example, you can specify whether to prefer auto-buffering or fully streaming results.
- Cancellation and Exceptions: You can provide a CancellationToken to PLINQ queries to support cancellation. PLINQ queries can also aggregate exceptions encountered in parallel tasks and throw them as an AggregateException, allowing your code to handle multiple exceptions from parallel tasks.
- Partitioning Strategies: PLINQ uses partitioning strategies to divide the source data into segments that can be processed in parallel. It supports range partitioning (useful for arrays and lists where the size is known) and chunk partitioning (useful for enumerables where the size is not known upfront).
- Diagnostics and Debugging: PLINQ integrates with Visual Studio’s parallel diagnostics tools, allowing developers to visualize and debug parallel queries effectively. Using the System.Diagnostics namespace developers can also include custom logging and performance tracking in their PLINQ queries.
Parallel Query Execution Planning Example in PLINQ
Let’s walk through an example to illustrate how you might plan and execute a parallel query using PLINQ in a C# application. This example will demonstrate how to use PLINQ to process a large collection of data in parallel, including how to apply some of the key concepts mentioned earlier, like ordering, degree of parallelism, and handling exceptions.
Imagine we have a large collection of integers and want to perform a computationally intensive operation on each integer. We aim to square each number, but only if the number is even. We also want to preserve the order of the original collection in the final result. The following example demonstrates how PLINQ can be used to efficiently process large data sets in parallel while managing order, degree of parallelism, and exceptions.
using System; using System.Linq; namespace PLINQDemo { class Program { static void Main(string[] args) { try { //Define the Data Source var numbers = Enumerable.Range(1, 100); // A large collection of integers // Apply PLINQ with Ordering and Parallelism var squaredEvenNumbers = numbers.AsParallel() .AsOrdered() // Preserve the order of the original collection .WithDegreeOfParallelism(4) // Limit to 4 cores .Where(n => n % 2 == 0) // Filter even numbers .Select(n => n * n); // Square each number //Execution foreach (var result in squaredEvenNumbers) { Console.WriteLine(result); } } //Exception Handling catch (AggregateException e) { foreach (var inner in e.InnerExceptions) { Console.WriteLine(inner.Message); } } Console.ReadKey(); } } }
Explanation
- AsParallel: This method parallelizes the execution of the query over the source collection.
- AsOrdered: Ensures that the output respects the order of the input collection. This is important if the operation or the results need to be in the same order as the original collection.
- WithDegreeOfParallelism: Limits the number of concurrent tasks to 4. This is useful for controlling resource utilization, especially on systems with many processors or when running alongside other resource-intensive applications.
- Where: Filters the collection to include only even numbers.
- Select: Applies a square operation to each filtered number.
In the next article, I will discuss the Performance Considerations and Best Practices of LINQ with Examples. In this article, I explain Advanced PLINQ Features with Examples. I hope you enjoy this article on Advanced PLINQ Features with Examples.