This week's book giveaway is in the Java in General forum. We're giving away four copies of Beginning Java 17 Fundamentals: Object-Oriented Programming in Java 17 and have ishori Sharan & Adam L Davis on-line! See this thread for details.
I have a question about the Producer/Consumer problem.
Let's assume I have a file with lots of data.
Each line of the file is defined as a pair
The task is to read the file line by line using one/multiple thread(s) (producer(s)) and consume the data by multiple threads (consumers).
The problem is that each line with a given type must be ordered with other lines with the same type.
And we need to ensure that A;1 is processed before A;2.
We can, however, process B;10 before these two lines (A;1 and A;2).
My first solution was to create one thread (a procuder). The producer was responsible for reading the file line by line and putting pair into a BlockingQueue in the order they occur in the file.
I also created two additional BlockingQueues that provided data to two consumer threads. To achieve the proper order I created a ConcurrentHashMap where I stored pairs <String, Intetger> (which means that the data of type X should go to Y-th queue). To achieve this I created an AtomicInteger counter which determines the number of a queue into which the data is put.
For the example above, the scenario could be: 1) first thread read "A;1", counter is 0, the mapping is not available in the map, so I need to add it and increment the counter (the mappig is: A->0)
1.1) first thread add "A;1" into the first output queue
2) second thread read "A;2", counter is 1, but the mapping is available (A->0), so I don't need to decrement the counter
2.1) second thread add "A;2" into the first output queue
3) first thread read "B;10", counter is 1, the mapping is not available in the map, so I need to add it and decrement the counter (the mappig is: B->1)
3.1) first thread add "B;10" into the Second output queue
4) second thread read "C;100", counter is 0, the mapping is not available in the map, so I need to add it and increment the counter (the mappig is: C->0)
4.1) first thread add "C;100" into the first output queue
Every data of the type "A" and "C" will be processed by consumer1 and every data of the type "B" will be processed by the consumer2.
The solution behaves well in the scenario described above, but in real world the second thread can put the line "A;2" before the first thread put the line "A;1".
My second solution was to modify the first one. I created only one thread which was responsible for data split. The solution works, but I wonder if it is effective.
How can I implement such producer/consumer problem and preserve the order for each type?
I also like your second approach. What I would do is:
1) Make only single thread to read file (i.e. single producer). This might not be very effective (by the way, is there absolute necessity to have multiple producers?), but this will avoid the situation where producer provides jobs which are out of order (i.e. A;2 before A;1).
2) Then, as you mentioned, there will be map (or some kind of data structure) - with the help of this, I would go for multiple SingleThreadExecutor objects - each will handle a number of same type (e.g. in example above - there will be 3 such executors - for A, B and C).
Another approach can be - read the whole file in data structure, and then create multiple producers and consumers (just like consumers, we'll have 3 producers - for A, B and C).
And what if we make the problem more interesting and assume we have inifinite number of incoming lines (data) ?
Let say we have a constantly completing BlockingQueue as an input and we want to process the data while preserving the order mentioned in my first post.
One thread that will be processing the data and splitting it into n BlockingQueues (consumers), according to some additional structure (e.g. a map), works but maybe there is more effective solution.
Given a large number of data types you want to ensure the consumers are from a pool of worker threads that consumed the far larger number of queues of data types. A new entry would wake up a worker which would acquire the right to work the given queue. I've seen this pattern before and it had similar issues where a thread per queue was required.
If you wish to consider a more high performance solution I suggest you google the disrupter pattern which would use a circular buffer rather than a blocking queue as a for for instance .