• Post Reply Bookmark Topic Watch Topic
  • New Topic

implementing a thread safe Publish/subscribe mechanism  RSS feed

 
Federico Minarelli
Greenhorn
Posts: 29
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
Hi everybody!
I am trying to figure out the best possible approach to implement a publish/subscribe mechanism which should work more or less like this:
Several Subscribers (1 Thread per Subscriber) subscribe their interest for a given topic T and a given kind of message M by calling initListenerForTopic(T topic) followed by waitNewMessage(T topic). Such threads get blocked until a publisher (i.e., a separate Thread) publishes a message having the specified topic T.

I don't like my solution at all, above all because it is not really thread safe..

I used latches to block the subscriber Threads, here is how it looks like:



The most evident problem is marked by ***: the desired behavior would be that, as soon as the latch gets opened, the message retrieved by getMessage() is that set by the call of setMessage() which caused the latch to open. Nevertheless, this is not guaranteed to happen. In fact, two successive calls to setMessage() may let the subscriber retrieve another message.. In other words, what I marked by *** should happen atomically, and this is not the case...
I am quite sure there are much better ways to achieve this behavior.. What would you advise?

Thanks a lot!
Bye!
 
Steve Luke
Bartender
Posts: 4181
22
IntelliJ IDE Java Python
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
One thought is to replace the LatchGuardedMessage with a BlockingQueue. When you wait for a message you use either the take() or the poll(...) methods to get the next message out of the queue. When a new message comes in you add it to the queue with put(...). That way, if two messages come in consecutively the waiter will be sure to get the first message first, and not lose the second message.
 
Federico Minarelli
Greenhorn
Posts: 29
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
Steve Luke wrote:One thought is to replace the LatchGuardedMessage with a BlockingQueue. When you wait for a message you use either the take() or the poll(...) methods to get the next message out of the queue. When a new message comes in you add it to the queue with put(...). That way, if two messages come in consecutively the waiter will be sure to get the first message first, and not lose the second message.


hi steve, and thanks for your reply! I thought about BlockingQueue but I used latches in the end because I am dealing with multiple subscribers: a poll() will let the message be delivered only to the first registered subscriber.. Do you know how could solve this?

thanks again! Bye!
 
Steve Luke
Bartender
Posts: 4181
22
IntelliJ IDE Java Python
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
In that case I would probably follow the strategy used by FutureTask. I would create a non-reusable Object which holds both the Lock/Condition and the results. So when a new waiter comes in it sees the current FutureMessage and waits on its Lock/Condition. When that condition is met it gets that FutureMessage instance's message. An intermediate message can't interfere since it would be working on a different instance of the FutureMessage, so a different lock and a different message. Something maybe like this:

 
Henry Wong
author
Sheriff
Posts: 23283
125
C++ Chrome Eclipse IDE Firefox Browser Java jQuery Linux VI Editor Windows
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
Federico Minarelli wrote:hi steve, and thanks for your reply! I thought about BlockingQueue but I used latches in the end because I am dealing with multiple subscribers: a poll() will let the message be delivered only to the first registered subscriber.. Do you know how could solve this?


I think the blocking queue is still the better option here... just have one blocking queue per subscriber. Adding a subscriber is simply creating a new blocking queue for the subscriber. The only down side is that the publisher has to publish the message to all the subscribers, but in return, you get...

Much less locking needed -- as publisher is doing shallow copies. Fire and forget at the publisher side -- no need to track subscribers, and the latches. No dependancy between the subscribers -- no need to go at the rate of the slowest subscriber. etc. The benefits seems to outweight the issue IMO.

Henry
 
Federico Minarelli
Greenhorn
Posts: 29
  • Mark post as helpful
  • send pies
  • Quote
  • Report post to moderator
thanks a lot Steve!
  • Post Reply Bookmark Topic Watch Topic
  • New Topic
Boost this thread!