Pages

Friday, February 5, 2021

Java 8 Parallel Streams - Custom Thread Pools Examples

1. Introduction


In this tutorial, You'll learn how to create custom thread pools in Java 8 for bulk data processing with parallel streams powerful API.

Parallel Stream can work well in concurrent environments and these are improved versions of streams performance at the cost of multi-threading overhead.

The main focus in this article is to look at one of the biggest limitations of Stream API and Examples on how can you use the Parallel Streams with the custom thread pools.

Custom Thread Pools In Java 8 Parallel Streams




2. Java 8 Parallel Streams


First, let us see how to create Parallel Streams from a Collection.

To make a stream that can run by multiple cores of the processer, you just need to call parallelStream() method.

package com.javaprogramto.java8.streams.parallel.streams;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

public class ParallelStreamCreation {

    public static void main(String[] args) {

        List<Integer> intList = Arrays.asList(10, 20, 30, 40, 50);

        Stream<Integer> parallelStream = intList.parallelStream();

        parallelStream.forEach(value -> System.out.println(value));
    }
}

Output:

[30
40
50
20
10]

You can observe the output that printed the values randomly by different cores.

Internally, it uses SplitIterator and StreamSupport classes to make it run parallelly.

The default processing is done with ForkJoinPool.commonPool() which is shared by the entire application. If you lots of parallel streams that are running at the same time then you may see performance and delay in processing time.

3. Using Custom Thread Pool


As a result of the above approach will use a common ForkJoinPool for all the parallel Streams.

If you have many parallel streams running at the same time and some of them take time longer than expected due to network slowness and those tasks may be blocking the threads from the common pool. Hence, it causes to slow down the tasks and take a longer time to complete.

In these cases, It is good to go with the custom thread pools with the parallel streams combination.

Look at the below program, that runs with 5 threads using ForkJoinPool and inside creating a new parallel stream to find the sum of all numbers for the given range.


package com.javaprogramto.java8.streams.parallel.streams;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class CustomPoolParallelStreams {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        parallelStreamProcess();
    }

    private static void parallelStreamProcess() throws ExecutionException, InterruptedException {

        int start = 1;
        int end = 10000;

        List<Integer> intList = IntStream.rangeClosed(start, end).boxed()
                .collect(Collectors.toList());
        System.out.println(intList.size());

        ForkJoinPool newCustomThreadPool = new ForkJoinPool(5);
        int actualTotal = newCustomThreadPool.submit(
                () -> {
                     int a = intList.stream().parallel().reduce(0, Integer::sum).intValue();
                     return a;
                }).get();

        System.out.println("actualTotal " + actualTotal);

    }

}

Output:

[10000
actualTotal 50005000]

Actually, The above program does not come up with the efficient but I have seen the many websites talking about this solution. In fact, this also creating a parallel stream inside ForkJoinPool which again internally consumes threads from a common pool of ForkJoinPool area.

So, If you are running multiple parallel streams then do not use this Steam api parallel method as this might slow other streams give the results in more time.

Here, we have taken the pool count as 5 but you can change it as per your CPU configuration. If you have more then you can fine-tune based on the other tasks.

If you have only one parallel stream then you can use it with a limited pool count.

But, Wait for a java update that parallel stream can take ForkJoinPool as input to limit the number of parallel processes.

4. Conclusion


In this article, You've seen how to create parallel streams in java stream api and parallel stream api uses a common share thread pool from ForkJoinPool. But this is shared by all other parallel threads so it is good to avoid the usage of Stream parallel but you can limit the number of threads with using the second approach. And also you have to consider that using the second approach also has some disadvantages.

Just wait for the new parallel stream api from the official oracle.




All the code is shown in this article is over GitHub.

You can download the project directly and can run in your local without any errors.



If you have any queries please post in the comment section.


No comments:

Post a Comment

Please do not add any spam links in the comments section.