• Post Reply Bookmark Topic Watch Topic
  • New Topic
programming forums Java Mobile Certification Databases Caching Books Engineering Micro Controllers OS Languages Paradigms IDEs Build Tools Frameworks Application Servers Open Source This Site Careers Other Pie Elite all forums
this forum made possible by our volunteer staff, including ...
Marshals:
  • Tim Cooke
  • Campbell Ritchie
  • paul wheaton
  • Jeanne Boyarsky
  • Ron McLeod
Sheriffs:
  • Paul Clapham
  • Devaka Cooray
Saloon Keepers:
  • Tim Holloway
  • Carey Brown
  • Piet Souris
Bartenders:

Need an explanation for stream().reduce vs parallelStream().reduce output

 
Ranch Hand
Posts: 491
5
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
1. Simple Use case: Get the list of Class name.

2. See below Code 1.

I want to learn/understand the mechanical of many things simultaneously.
I want to use Stream reduce method and not using lambda expression.
I used old way: Interface interface = new InterfaceImpl();

3. See below Log output with sequential .stream() and Log output with .parallelStream()

From log, I did not see the output from combiner BinaryOperator ft (with code using .stream())
By chance/accident not by design   , I switch from sequential stream() to parallelStream(),
I saw the output from combiner BinaryOperator ft.

But with sequential stream(), the end result is correct: Got a list of 2 class name.
With parallelStream, the end result is NOT correct: Got a list of 2 class name appeared twice (i.e a list of 4)

How to make sense of this? Can some one provide an explanation for stream().reduce vs parallelStream().reduce output?










 
H Paul
Ranch Hand
Posts: 491
5
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
1. Basically, I rewrite using lambdas expression for stream().reduce() and parallelStream().reduce().

The output is the same as I posted earlier using non-lambdas expression.

a. stream().reduce() output correct result but combiner is not called
b. parallelStream().reduce() did NOT output correctly but combiner is called.

2. If possible, can some check if the code for identity, accumulator and combiner are correct?

Thanks.



 
Bartender
Posts: 15737
368
  • Likes 1
  • Mark post as helpful
  • send pies
    Number of slices to send:
    Optional 'thank-you' note:
  • Quote
  • Report post to moderator
The problem is caused because your accumulator and combiner functions are not stateless. The reduce() method requires stateless and associative functions. Stateless means that the functions are based on nothing else but the input arguments, and they cause no side-effects. Your accumulator calls List.add() and your combiner calls List.addAll(), both causing side-effects in the input arguments.

This usually doesn't cause problems in serial reductions, because the combiner is never called, and the accumulator is called once for each element in the stream. However, here's what happens in a parallel reduction:

  • The elements are split in two halves, which are both reduced in parallel.
  • The first half is reduced by adding FunctionalInterface.class to the identity list. Your identity list is now no longer empty.
  • The second half is reduced by adding Stream.class to the identity list. Because the previous reduction was stateful, the identity list is now [FunctionalInterface.class, Stream.class].
  • The reduction operation expects the results of your accumulator to be independent objects, so it now has two (supposedly independent) reduced halves which it has to combine. Essentially it performs identity.addAll(identity), which results in [FunctionalInterface.class, Stream.class, FunctionalInterface.class, Stream.class]

  • You can make the accumulator stateless by copying the input list, adding the element to that, and returning the new list. You can make the combiner stateless by copying the first input list, adding all elements of the second list to it, and returning the new list. Because copying the lists all the time can become very expensive, the Java designers added a mutable reduction operator: Stream.collect(). Stream.collect() does the same as Stream.reduce(), except it keeps in mind that the accumulation is reused by the different threads performing the reduction.

    A handy guideline is to use Stream.reduce() if your result is an immutable type, and Stream.collect() if your result is a mutable type.
     
    Stephan van Hulst
    Bartender
    Posts: 15737
    368
    • Likes 1
    • Mark post as helpful
    • send pies
      Number of slices to send:
      Optional 'thank-you' note:
    • Quote
    • Report post to moderator
    Here's the same operation using Stream.collect(), assuming we're NOT using Collectors.toList():
     
    H Paul
    Ranch Hand
    Posts: 491
    5
    • Mark post as helpful
    • send pies
      Number of slices to send:
      Optional 'thank-you' note:
    • Quote
    • Report post to moderator
    To trouble shooting programmatically to explain output Logs that I posted earlier.

    1) I use Thread information.
    2) I use hashcode to see the List memory address. But this is NOT good.
    3) I identityHashCode to see the List memory address. And this is GOOD! Help from

    http://stackoverflow.com/questions/18396927/how-to-print-the-address-of-an-object-if-you-have-redefined-tostring-method

    1 and 3 showed only 1 List passed around (one Sharedable List)

    4)
    See below code: I use three classes.
    See below Log: I have 2 set of logs:
    a. In 1 set of log: Luckly, I saw 2 different threads: ForkJoinPool.commonPool-worker-1 and main
    b. In 2nd set of log: I saw the correct identityHashCode of List memory address.





     
    H Paul
    Ranch Hand
    Posts: 491
    5
    • Mark post as helpful
    • send pies
      Number of slices to send:
      Optional 'thank-you' note:
    • Quote
    • Report post to moderator
    Master Stephan,

    1. I read what your 1st post of lengthy explanation and wanted to give you a cow for that.  (Unless you tell me how, master.      

    What you explained confirmed my trouble shooting code posted to see what is happening.

    2. I will have look at your next "coded" post.
     
    Stephan van Hulst
    Bartender
    Posts: 15737
    368
    • Mark post as helpful
    • send pies
      Number of slices to send:
      Optional 'thank-you' note:
    • Quote
    • Report post to moderator
    Ron McLeod gave me a cow in your name, thank you both

    In turn, have a cow for your own investigations!
     
    H Paul
    Ranch Hand
    Posts: 491
    5
    • Mark post as helpful
    • send pies
      Number of slices to send:
      Optional 'thank-you' note:
    • Quote
    • Report post to moderator
    0) Thank-you, Ron McLeod.  

    1) Following Stephen's lengthy explanation: to make stream().parallelStream().reduce(...) work
    See below code.
    2) Following Stephen's lengthy explanation: to make stream()..parallelStream().collect(Collectors.reducing(...)) work
    This is new. See below code.

    3) How Imperative follows Functional declarative footstep (How OP sees Stephen's Functional declarative )
    Reply to Stephen's posted code.    See below.


    1) Following Stephen's lengthy explanation: to make stream().parallelStream().reduce(...) work



    2) Following Stephen's lengthy explanation: to make stream()..parallelStream().collect(Collectors.reducing(...)) work



    3) How Imperative follows Functional declarative (How OP sees Stephen's Functional declarative )



     
    Stephan van Hulst
    Bartender
    Posts: 15737
    368
    • Mark post as helpful
    • send pies
      Number of slices to send:
      Optional 'thank-you' note:
    • Quote
    • Report post to moderator
    Note that using Stream.collect() in combination with a Collectors.reduce() just a roundabout way of calling Stream.reduce(). You're performing an immutable reduction which is not very efficient if you're working with a mutable result.
     
    H Paul
    Ranch Hand
    Posts: 491
    5
    • Mark post as helpful
    • send pies
      Number of slices to send:
      Optional 'thank-you' note:
    • Quote
    • Report post to moderator
    Yes, Master! will follow your lengthy explanation on 2 points: performance/efficiency and readability.

    (My examples are just a way to explore how things work. Thanks.)
     
    H Paul
    Ranch Hand
    Posts: 491
    5
    • Mark post as helpful
    • send pies
      Number of slices to send:
      Optional 'thank-you' note:
    • Quote
    • Report post to moderator
    1. Back to the future: A question out of curiosity/notice (Not on performance nor readability nor on best practice)

    From my  1st posted log and your explanation for sequential stream().reduce(...): the combiner is not called.



    2. Why does reduce required one (combiner)? See above code.

    I just passed in null, it complained NullPointerException.
    java.lang.NullPointerException
    at java.util.Objects.requireNonNull(Objects.java:203)
    at java.util.stream.ReduceOps.makeRef(ReduceOps.java:71)
    at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:484)

    So to make it happy? :-D. I passed a FT combiner which has no meaning: simple return null.
     
    Stephan van Hulst
    Bartender
    Posts: 15737
    368
    • Mark post as helpful
    • send pies
      Number of slices to send:
      Optional 'thank-you' note:
    • Quote
    • Report post to moderator
    The reduce() method doesn't know whether it's being called on a parallel stream or not. To prevent bugs from occurring somewhere later in the application when it tries to apply a null combiner to two partial reductions, it immediately throws an exception when you try to pass it null. You should strive to write programs the same way. If it doesn't make sense for your methods and constructors to accept null, they should forbid it.
     
    reply
      Bookmark Topic Watch Topic
    • New Topic