Back to: LINQ Tutorial For Beginners and Professionals
Extending PLINQ with Custom Operators
In this article, I will discuss Extending PLINQ with Custom Operators with Examples. Please read our previous article discussing Debugging and Troubleshooting PLINQ Applications with Examples.
What is PLINQ with Custom Operators?
PLINQ, or Parallel LINQ, is an extension of LINQ (Language Integrated Query). It allows developers to easily execute data queries in parallel, taking advantage of multiple processors or cores for improved performance on large data sets. It is part of the Task Parallel Library (TPL) in the .NET Framework.
Using PLINQ, you can parallelize your queries by simply calling the AsParallel method on your data source, which instructs PLINQ to attempt to partition the source collection and process it in parallel. PLINQ tries to use the system resources efficiently by determining the number of threads to use and managing them to minimize overhead while maximizing performance.
Custom operators in the context of PLINQ refer to the ability to extend PLINQ with your own methods, allowing you to implement specialized parallel operations that the built-in PLINQ operators don’t directly support. Creating custom operators typically involves defining extension methods that operate on ParallelQuery<TSource> objects. Here’s a brief overview of how to create and use a custom PLINQ operator:
Defining a Custom PLINQ Operator
Custom operators are essentially extension methods. To create one, you define a static class and method.
- Extension Method: You start by defining an extension method. This method extends the ParallelQuery<T> type, which is the return type of PLINQ queries.
- Custom Logic: Inside your extension method, you implement the custom logic that will be executed in parallel. You can use existing PLINQ operators, lower-level TPL constructs like Parallel.ForEach, or a combination of both, to accomplish your task.
- Returning Results: Your custom operator should return a ParallelQuery<TResult> if it’s meant to be used in further query compositions or another appropriate type based on what the operator does (e.g., a single value, a collection, etc.).
Here’s an example of a simple custom operator that operates on IEnumerable<T> but can be easily adapted for parallel queries:
using System; using System.Linq; namespace PLINQDemo { public static class PlinqExtensions { public static ParallelQuery<TSource> CustomWhere<TSource>( this ParallelQuery<TSource> source, Func<TSource, bool> predicate) { return source.Where(predicate); } } }
Understanding the Code
Public Static Class PlinqExtensions: This class is defined as public and static. The public keyword makes it accessible from other parts of your application. The static keyword means that the class cannot be instantiated; instead, it serves as a container for extension methods. The name PlinqExtensions suggests that it contains extension methods for PLINQ (Parallel LINQ).
Extension Method CustomWhere<TSource>: Defined as a public static method, which is a requirement for all extension methods. The method extends the functionality of ParallelQuery<TSource>. The “this” keyword before the first parameter (ParallelQuery<TSource> source) indicates that this method is an extension method. It allows the method to be called as if it were a method of ParallelQuery<TSource> objects.
Generic Type TSource: The method is generic, denoted by <TSource>. This means it can operate on a ParallelQuery of any type. The type TSource is a placeholder for whatever type you are querying.
Parameters:
- ParallelQuery<TSource> source: The first parameter is the collection (or sequence) on which the method operates. Because this is an extension method for ParallelQuery<TSource>, the source represents a parallel query that can be processed in parallel.
- Func<TSource, bool> predicate: The second parameter is a delegate (a reference to a method) that takes an item of type TSource and returns a bool. This delegate defines the condition each item must meet to be included in the result.
Method Body: The method body returns the source.Where(predicate);. Here, it utilizes the standard Where method provided by PLINQ, passing it the predicate. This means that CustomWhere acts as a wrapper around the existing Where method, applying the given condition to filter elements in the source sequence in parallel.
How It Works
When you call CustomWhere on a ParallelQuery<TSource>, it filters the elements of the sequence in parallel based on the specified condition (predicate). Each element for which the predicate returns true will be included in the resulting ParallelQuery<TSource>.
Using a Custom PLINQ Operator
After defining your custom operator, you can use it just like any standard PLINQ operator. Here’s an example. The following example filters even numbers from a range of integers using the custom CustomWhere operator we defined.
using System; using System.Linq; namespace PLINQDemo { class Program { static void Main(string[] args) { var numbers = Enumerable.Range(2, 100); var data = numbers.AsParallel().CustomWhere(x => x % 2 == 0); foreach (var item in data) { Console.WriteLine(item); } Console.ReadKey(); } } }
In this example, CustomWhere filters even numbers from a parallel query of integers from 2 to 100. The method operates on each element in parallel, checking if it satisfies the condition (n % 2 == 0) and includes it in the result if it does.
Real-time Example to understand PLINQ Custom Operators
Let us see a real-time example that showcases the practicality and power of PLINQ with custom operators. Imagine we’re developing a feature for a social media analytics platform that processes a large dataset of posts to identify trending hashtags in real time. This dataset is dynamic, continuously growing, and needs to be analyzed quickly to provide timely insights.
We aim to implement a custom PLINQ operator that filters posts based on a specific criterion (e.g., posts within the last 24 hours), extracts hashtags from these posts, and then aggregates the results to find the most frequently occurring hashtags. This example demonstrates how to use PLINQ’s parallel processing capabilities to handle large datasets efficiently, along with a custom operator for specialized processing logic.
Define the Post Model and Sample Data
First, define a simple Post class to represent social media posts. We’ll also create a sample dataset of posts:
using System; using System.Collections.Generic; namespace PLINQDemo { public class Post { public DateTime Timestamp { get; set; } public string Content { get; set; } public static List<Post> GenerateSampleData() { // Sample data generation logic // In a real scenario, this data might come from a database or an API. var random = new Random(); var samplePosts = new List<Post>(); for (int i = 0; i < 100; i++) { var timestamp = DateTime.UtcNow.AddHours(-random.Next(0, 48)); var content = $"This is a sample post #{i} with hashtags #example #sample #post{i % 10}"; samplePosts.Add(new Post { Timestamp = timestamp, Content = content }); } return samplePosts; } } }
Implementing the Custom PLINQ Operator
Let’s add a custom PLINQ operator that processes the posts to find trending hashtags. The following code defines an extension method GetTrendingHashtags for objects of type ParallelQuery<Post>, where Post is assumed to be a class representing a social media post or similar, with at least a Timestamp and Content property. The method aims to identify and count hashtags in posts that are newer than a specified date, using parallel processing to improve performance.
using System; using System.Collections.Generic; using System.Linq; using System.Text.RegularExpressions; namespace PLINQDemo { public static class PlinqExtensions { public static IDictionary<string, int> GetTrendingHashtags( this ParallelQuery<Post> posts, DateTime since) { return posts .Where(post => post.Timestamp >= since) .SelectMany(post => Regex.Matches(post.Content, "#\\w+") .Cast<Match>() .Select(match => match.Value.ToUpperInvariant())) .Aggregate( () => new Dictionary<string, int>(), (acc, hashtag) => { if (acc.ContainsKey(hashtag)) { acc[hashtag]++; } else { acc.Add(hashtag, 1); } return acc; }, (dict1, dict2) => { foreach (var kvp in dict2) { if (dict1.ContainsKey(kvp.Key)) { dict1[kvp.Key] += kvp.Value; } else { dict1.Add(kvp.Key, kvp.Value); } } return dict1; }, dict => dict); } } }
Understanding the above code:
Extension Method
- public static class PlinqExtensions: Defines a static class containing extension methods. Extension methods allow you to add methods to existing types without modifying them.
- public static IDictionary<string, int> GetTrendingHashtags(this ParallelQuery<Post> posts, DateTime since): This is an extension method for the ParallelQuery<Post> type. It returns a dictionary (IDictionary<string, int>) where each key is a hashtag (in uppercase), and the value is the count of how many times that hashtag appears in the posts.
Method Logic
Filter Posts by Timestamp: posts.Where(post => post.Timestamp >= since) filters the posts to include only those with a Timestamp on or after the since date.
Find Hashtags:
- .SelectMany(post => Regex.Matches(post.Content, “#\\w+”)…) takes the filtered posts and extracts hashtags from their content. The regex pattern “#\\w+” matches strings that start with a # followed by one or more word characters (letters, digits, or underscores). This operation flattens all hashtags found in all posts into a single sequence.
- .Cast<Match>() is necessary because Regex.Matches returns a MatchCollection, which is non-generic and needs to be cast to a generic sequence of Match objects to work with LINQ.
- .Select(match => match.Value.ToUpperInvariant()) converts each matched hashtag to uppercase to ensure case-insensitive comparison.
Aggregate Hashtags:
- The .Aggregate(…) function is used to tally up the hashtags. It’s somewhat complex because it operates in parallel, requiring multiple steps to combine results correctly.
- () => new Dictionary<string, int>() is the seed function, creating a new, empty dictionary for each thread involved in the aggregation.
- (acc, hashtag) => {…} is the accumulator function. For each hashtag found, it checks if the hashtag is already in the dictionary acc. If it is, it increments the count. If not, it adds the hashtag with a count of 1.
- (dict1, dict2) => {…} is the combiner function. It combines dictionaries from different threads. For each key-value pair in dict2, it adds the value to dict1 if the key exists or adds the key-value pair if the key doesn’t exist.
- The last dict => dict part is a final transformation step, which is trivial in this case and just returns the dictionary as-is.
Result
The method returns a dictionary where the keys are unique hashtags (in uppercase) found in posts since the specified date, and the values are the counts of how many times each hashtag appeared. This method leverages parallel processing to efficiently process large datasets, which is particularly useful for analyzing trends in social media posts or similar large collections of text data.
Using the PLINQ Custom Operator
Finally, use the custom operator in the Main method to process the sample data and find trending hashtags:
using System; using System.Collections.Generic; using System.Linq; namespace PLINQDemo { class Program { static void Main(string[] args) { List<Post> posts = Post.GenerateSampleData(); var trendingHashtags = posts.AsParallel() .GetTrendingHashtags(DateTime.UtcNow.AddHours(-24)) .OrderByDescending(kvp => kvp.Value) .Take(5); Console.WriteLine("Trending Hashtags in the last 24 hours:"); foreach (var hashtag in trendingHashtags) { Console.WriteLine($"{hashtag.Key}: {hashtag.Value} times"); } Console.ReadKey(); } } }
Output:
This example demonstrates using PLINQ with a custom operator to process a large dataset of social media posts in parallel, efficiently analyzing and aggregating data to identify trending hashtags. By leveraging PLINQ’s parallel processing capabilities alongside custom logic tailored to specific analytical needs, you can achieve significant performance improvements for data-intensive operations, making it ideal for real-time analytics scenarios.
Example: Processing a large dataset of sales records
Let’s dive into a more practical example to understand the concept of extending PLINQ with custom operators. We’ll create a .NET application that simulates a real-world scenario: processing a large dataset of sales records to find records that meet certain criteria, using a custom PLINQ operator for filtering based on multiple conditions. This will demonstrate how custom operators can be utilized for complex parallel queries, enhancing performance and readability.
Imagine we have a large dataset of sales records, each record containing the sales amount, date, and region. We aim to filter records for sales exceeding a certain amount within a specific date range and region, showcasing how a custom PLINQ operator can simplify and optimize this common data processing task.
Define the Sales Record Model
Define a simple model for our sales records:
using System; namespace PLINQDemo { public class SaleRecord { public decimal Amount { get; set; } public DateTime Date { get; set; } public string Region { get; set; } } }
Implement the Custom PLINQ Operator
We’ll add a custom operator called FilterSalesRecords that encapsulates the filtering logic based on our criteria:
using System; using System.Linq; namespace PLINQDemo { public static class PlinqExtensions { public static ParallelQuery<SaleRecord> FilterSalesRecords( this ParallelQuery<SaleRecord> source, decimal minAmount, DateTime startDate, DateTime endDate, string region) { return source.Where(record => record.Amount >= minAmount && record.Date >= startDate && record.Date <= endDate && record.Region.Equals(region, StringComparison.OrdinalIgnoreCase)); } } }
Using the PLINQ Custom Operator
Finally, the custom operator is used in the main method. Let’s create some sample sales records and apply our custom operator to filter them:
using System; using System.Collections.Generic; using System.Linq; namespace PLINQDemo { class Program { static void Main(string[] args) { var salesData = new List<SaleRecord> { // Populate with sample data new SaleRecord { Amount = 500m, Date = new DateTime(2023, 1, 10), Region = "North America" }, new SaleRecord { Amount = 1500m, Date = new DateTime(2023, 1, 15), Region = "Europe" }, // Add more records as needed... }; var filteredSales = salesData .AsParallel() .FilterSalesRecords(1000m, new DateTime(2023, 1, 1), new DateTime(2023, 12, 31), "Europe") .ToList(); foreach (var sale in filteredSales) { Console.WriteLine($"Sale of {sale.Amount} on {sale.Date.ToShortDateString()} in {sale.Region}"); } Console.ReadKey(); } } }
Execute the application, and the output will display the filtered sales records that match our criteria. These records will be processed in parallel using our custom PLINQ operator.
When to Use PLINQ with Custom Operators?
PLINQ, or Parallel LINQ, is an extension of LINQ (Language Integrated Query) that allows you to execute queries in parallel, utilizing multiple processors or cores for better performance. However, using PLINQ, especially with custom operators, requires careful consideration to ensure you benefit from parallel execution without introducing unexpected behavior or performance degradation. Here are several factors and scenarios when you might choose to use PLINQ with custom operators:
Data Size and Complexity
- Large Data Sets: PLINQ is most beneficial when working with large collections of data where the overhead of parallelization is justified by the performance gains from executing operations in parallel.
- Complex Operations: If the operations on the data are CPU-intensive, parallel execution can significantly reduce the total execution time.
Environment
- Multi-core Processors: Ensure the target environment has multiple cores to take advantage of parallel execution. PLINQ’s benefits are minimized or negated on single-core processors.
Custom Operators and Thread Safety
- Thread Safety: Custom operators used in PLINQ queries must be thread-safe since they can be executed simultaneously from multiple threads. This includes any external resources or data structures they access.
- Statelessness: Ideally, custom operators should be stateless or ensure that any state is managed thread-safely to prevent race conditions or data corruption.
Ordering and Side Effects
- Order-sensitive Operations: PLINQ does not guarantee the order of operation execution or results. Consider this carefully if your custom operators or the overall query requires a specific order. PLINQ offers the AsOrdered() method to preserve order, but it can reduce parallel efficiency.
- Side Effects: Avoid or carefully manage side effects within custom operators in a parallel query, as the timing and order of these side effects can be unpredictable.
Considerations for Writing Custom PLINQ Operators
- Efficiency: Ensure your custom operator does not negate the benefits of parallel execution by introducing inefficient synchronization or data structures.
- Thread Safety: Ensure any shared data or resources accessed within the operator are thread-safe.
- Composability: Design your operators to be composable with other PLINQ operators.
- Cancellation and Exceptions: Respect PLINQ’s cancellation tokens and handle exceptions appropriately to integrate well with PLINQ’s error-handling mechanisms.
In the next article, I will discuss PLINQ and Memory Management Considerations with Examples. In this article, I explain Extending PLINQ with Custom Operators with Examples. I hope you enjoy this Extending PLINQ with Custom Operators with Examples article.