• Post Reply Bookmark Topic Watch Topic
  • New Topic

Producer/Consumer with data order  RSS feed

 
Piotr Nowakowski
Greenhorn
Posts: 11
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
Hi,

I have a question about the Producer/Consumer problem.

Let's assume I have a file with lots of data.
Each line of the file is defined as a pair
<Type>;<Value>
The task is to read the file line by line using one/multiple thread(s) (producer(s)) and consume the data by multiple threads (consumers).
The problem is that each line with a given type must be ordered with other lines with the same type.

Example:
A;1
A;2
B;10
C;100
B;11
A;3
...

And we need to ensure that A;1 is processed before A;2.
We can, however, process B;10 before these two lines (A;1 and A;2).

My first solution was to create one thread (a procuder). The producer was responsible for reading the file line by line and putting pair into a BlockingQueue in the order they occur in the file.
I also created two additional BlockingQueues that provided data to two consumer threads. To achieve the proper order I created a ConcurrentHashMap where I stored pairs <String, Intetger> (which means that the data of type X should go to Y-th queue). To achieve this I created an AtomicInteger counter which determines the number of a queue into which the data is put.

For the example above, the scenario could be:
1) first thread read "A;1", counter is 0, the mapping is not available in the map, so I need to add it and increment the counter (the mappig is: A->0)
1.1) first thread add "A;1" into the first output queue
2) second thread read "A;2", counter is 1, but the mapping is available (A->0), so I don't need to decrement the counter
2.1) second thread add "A;2" into the first output queue
3) first thread read "B;10", counter is 1, the mapping is not available in the map, so I need to add it and decrement the counter (the mappig is: B->1)
3.1) first thread add "B;10" into the Second output queue
4) second thread read "C;100", counter is 0, the mapping is not available in the map, so I need to add it and increment the counter (the mappig is: C->0)
4.1) first thread add "C;100" into the first output queue
...

Every data of the type "A" and "C" will be processed by consumer1 and every data of the type "B" will be processed by the consumer2.

The solution behaves well in the scenario described above, but in real world the second thread can put the line "A;2" before the first thread put the line "A;1".

My second solution was to modify the first one. I created only one thread which was responsible for data split. The solution works, but I wonder if it is effective.

How can I implement such producer/consumer problem and preserve the order for each type?
 
Anayonkar Shivalkar
Bartender
Posts: 1558
5
Eclipse IDE Java Linux
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
Hello Piotr,

Welcome to CodeRanch!

I also like your second approach. What I would do is:

1) Make only single thread to read file (i.e. single producer). This might not be very effective (by the way, is there absolute necessity to have multiple producers?), but this will avoid the situation where producer provides jobs which are out of order (i.e. A;2 before A;1).
2) Then, as you mentioned, there will be map (or some kind of data structure) - with the help of this, I would go for multiple SingleThreadExecutor objects - each will handle a number of same type (e.g. in example above - there will be 3 such executors - for A, B and C).

Another approach can be - read the whole file in data structure, and then create multiple producers and consumers (just like consumers, we'll have 3 producers - for A, B and C).

I hope this helps.
 
Piotr Nowakowski
Greenhorn
Posts: 11
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
Anayonkar, thanks for your answer.

And what if we make the problem more interesting and assume we have inifinite number of incoming lines (data) ?
Let say we have a constantly completing BlockingQueue as an input and we want to process the data while preserving the order mentioned in my first post.
One thread that will be processing the data and splitting it into n BlockingQueues (consumers), according to some additional structure (e.g. a map), works but maybe there is more effective solution.
 
Chris Hurst
Ranch Hand
Posts: 443
3
C++ Eclipse IDE Java
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
Given a large number of data types you want to ensure the consumers are from a pool of worker threads that consumed the far larger number of queues of data types. A new entry would wake up a worker which would acquire the right to work the given queue. I've seen this pattern before and it had similar issues where a thread per queue was required.

If you wish to consider a more high performance solution I suggest you google the disrupter pattern which would use a circular buffer rather than a blocking queue as a for for instance .

A simple overview ...

Disrupter

there are a few implementations out there already that should support similar work flows to the one your after and at least one is open source from memory.
 
  • Post Reply Bookmark Topic Watch Topic
  • New Topic
Boost this thread!