Getting Java Event Notification Right
Implementing the observer pattern to provide Java event notification seems to be a straight forward thing to do. However, there are some pitfalls one easily can run into. Here comes an explanation of common mistakes I carelessly have produced myself on various occasions…
Java Event Notification
Let’s start with a simple bean StateHolder
that encapsulates a private int
field state
with appropriate accessors:
public class StateHolder { private int state; public int getState() { return state; } public void setState( int state ) { this.state = state; } }
Consider that we have decided our bean should broadcast the news of state changes
to registered observers. No problem at all! A convenient event and listener definition is easy to create…
// change event to broadcast public class StateEvent { public final int oldState; public final int newState; StateEvent( int oldState, int newState ) { this.oldState = oldState; this.newState = newState; } } // observer interface public interface StateListener { void stateChanged( StateEvent event ); }
…next we need to be able to register StatListeners
at StateHolder
instances…
public class StateHolder { private final Set<StateListener> listeners = new HashSet<>(); [...] public void addStateListener( StateListener listener ) { listeners.add( listener ); } public void removeStateListener( StateListener listener ) { listeners.remove( listener ); } }
… and last but not least StateHolder#setState
have to be adjusted to trigger the actual notification on state changes:
public void setState( int state ) { int oldState = this.state; this.state = state; if( oldState != state ) { broadcast( new StateEvent( oldState, state ) ); } } private void broadcast( StateEvent stateEvent ) { for( StateListener listener : listeners ) { listener.stateChanged( stateEvent ); } }
Bingo! That’s all there is. Being professionals we might even have implemented this test driven and feel comfortable with our thorough code coverage and the green bar. And anyway isn’t this what we have learned from tutorials on the web?
So here comes the bad news: The solution is flawed…
Concurrent Modification
Given the StateHolder
above one can easily run into a ConcurrentModificationException
, even if used within single thread confinement only. But who is causing it and why does it occur?
java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429) at java.util.HashMap$KeyIterator.next(HashMap.java:1453) at com.codeaffine.events.StateProvider.broadcast(StateProvider.java:60) at com.codeaffine.events.StateProvider.setState(StateProvider.java:55) at com.codeaffine.events.StateProvider.main(StateProvider.java:122)
A look at the stacktrace uncovers that the exception is thrown by an Iterator
of the HashMap
we use. Only that we did not use any iterators in our code, or did we? Well, we did. The for each
construct in broadcast
is based on Iterable
and therefore gets transformed into an iterator loop at compile time.
Because of this a listener removing itself from the StateHolder
instance during event notification might cause the ConcurrentModificationException
. So instead of working on the original data structure one solution could be to iterate over a snapshot of listeners.
By doing so listener removal cannot interfere with the broadcast mechanism anymore (but note that notification semantics changes also slightly, since such a removal is not reflected by the snapshot while broadcast
gets executed):
private void broadcast( StateEvent stateEvent ) { Set<StateListener> snapshot = new HashSet<>( listeners ); for( StateListener listener : snapshot ) { listener.stateChanged( stateEvent ); } }
But what if StateHolder
is meant to be used within a multi threaded context?
Synchronization
To be able to use StateHolder
within a multi-threaded environment it has to be thread safe. This can be achieved quite easily. Adding synchronized to each method of our class should do the trick, right?
public class StateHolder { public synchronized void addStateListener( StateListener listener ) { [...] public synchronized void removeStateListener( StateListener listener ) { [...] public synchronized int getState() { [...] public synchronized void setState( int state ) { [...]
Now read/write access to a StateHolder
instance is guarded by its intrinsic lock. This makes the public methods atomic and ensures correct state visibility for different threads. Mission accomplished!
Not quite… although the implementation is thread safe, it bears the risk to dead lock applications that use it.
Think about the following situation: Thread
A changes the state of StateHolder
S. During the notification of the listeners of S Thread
B tries to access S and gets blocked. If B holds a synchronization lock on an object that is about to be notified by one of the listeners of S, we run into a deadlock.
That is why we need to narrow down synchronization to state access and broadcast the event outside of the guarded passages:
public class StateHolder { private final Set<StateListener> listeners = new HashSet<>(); private int state; public void addStateListener( StateListener listener ) { synchronized( listeners ) { listeners.add( listener ); } } public void removeStateListener( StateListener listener ) { synchronized( listeners ) { listeners.remove( listener ); } } public int getState() { synchronized( listeners ) { return state; } } public void setState( int state ) { int oldState = this.state; synchronized( listeners ) { this.state = state; } if( oldState != state ) { broadcast( new StateEvent( oldState, state ) ); } } private void broadcast( StateEvent stateEvent ) { Set<StateListener> snapshot; synchronized( listeners ) { snapshot = new HashSet<>( listeners ); } for( StateListener listener : snapshot ) { listener.stateChanged( stateEvent ); } } }
Update 2015/03/14: While this code does not run into a deadlock, it still is not correct as the discussion started by Rafal Lyzwa seems to reveal. There are situations where notifications might be swallowed or propagate wrong values as the value of oldState
is unpredictable (see Java Concurrency in Practice, Chapter 3.1 Visibility).
The following version of StateHolder#setState(int)
shows how this can be solved by reading this.state
within the guarded block. Another option could be to declare the state
field as volatile:
public void setState( int state ) { StateEvent event; synchronized( listeners ) { event = new StateEvent( this.state, state ); this.state = state; } if( event.oldState != event.newState ) { broadcast( event ); } }
The listing now shows an implementation evolved from the previous snippets providing proper (but somewhat old fashioned) synchronization using the Set
instance as internal lock. Listener notification takes place outside of the guarded block and therefore avoids a circular wait.
StateHolder
as source of your event object.
If event ordering is crucial one could think of a thread safe FIFO structure to buffer events along with the according listener snapshot in the guarded block of setState
. A separate thread could fire the actual event notifications from an unguarded block as long as the FIFO structure is not empty (Producer-Consumer-Pattern). This should ensure chronological order without risking a dead lock. I say should since I never tried this solution by myself..
Given the sematics of the previous implementation, composing our class using thread safe classes like CopyOnWriteArraySet
and AtomicInteger
makes the solution less verbose:
public class StateHolder { private final Set<StateListener> listeners = new CopyOnWriteArraySet<>(); private final AtomicInteger state = new AtomicInteger(); public void addStateListener( StateListener listener ) { listeners.add( listener ); } public void removeStateListener( StateListener listener ) { listeners.remove( listener ); } public int getState() { return state.get(); } public void setState( int newState ) { StateEvent event = new StateEvent( state.getAndSet( newState ), newState ); if( event.oldState != event.newState ) { broadcast( event ); } } private void broadcast( StateEvent stateEvent ) { for( StateListener listener : listeners ) { listener.stateChanged( stateEvent ); } } }
Since CopyOnWriteArraySet
and AtomicInteger
are thread safe we do not have the need for guarded blocks anymore. But wait a moment! Didn’t we just learn to use a snapshot for broadcasting instead of looping over an hidden iterator of the origin set?
It might be a bit confusing, but an Iterator
provided by CopyOnWriteArraySet
is already a snapshot. CopyOnWriteXXX
collections were invented particularly for such use cases – efficient if small sized, optimized for frequent iteration with rarely changing content. Which means our code is safe.
With Java 8 the broadcast
method could be stripped down even more using Iterable#forEach
in conjunction with lambdas. The code of course stays safe as iteration is also performed on a snapshot:
private void broadcast( StateEvent stateEvent ) { listeners.forEach( listener -> listener.stateChanged( stateEvent ) ); }
Exception Handling
The last section of this post discusses how to handle broken listeners that throw unexpected RuntimeException
s. Although I usually opt strictly for a fail-fast approach, it might be inappropriate in this case to let such exceptions pass unhandled. Given in particular that the implementation is probably used in a multi-threading environment.
A broken listener harms the system in two ways. First, it prevents notification of those observers assorted after our bogey. Second, it can harm the calling thread which may not be prepared to deal with the problem. Summarized it can lead to multiple, sneaking malfunctions of which the initial cause might be hard to track down.
Hence it might be useful to shield each notification within a try-catch block:
private void broadcast( StateEvent stateEvent ) { listeners.forEach( listener -> notifySafely( stateEvent, listener ) ); } private void notifySafely( StateEvent stateEvent, StateListener listener ) { try { listener.stateChanged( stateEvent ); } catch( RuntimeException unexpected ) { // appropriate exception handling goes here... } }
Conclusion
As shown in the sections above Java event notification has a few nuts and bolts one has to keep in mind. Ensure to iterate over a snapshot of the listener collection during event notification, keep event notification out of synchronized blocks and notify listeners safely if appropriate.
Hopefully, I was able to work out the subtleties in a comprehensible way and did not mess up, in particular, the concurrency sections. In case, you find some mistakes or have an additional wisdom to share, feel free to use the commenting sections below.
Title Image: © Depositphotos.com/sumkinn
- Xmas Clean Sheet Update (0.9) - 21. December 2021
- Clean Sheet Service Update (0.8) - 23. May 2020
- Clean Sheet Service Update (0.7) - 24. April 2020
You should use AtomicInteger.getAndSet(newValue), otherwise your code won’t necessarily fire the correct events.
Although I think it should be save with respect to the new/old value relation as
StateEvent
is made immutable with final fields and therefore the values should be published correctly, I consider it an improvement to writesetState
as shown in the following listing. However this does not ensure that current state ofStateHolder
and the valueStateEvent#newState
are in sync when the event is thrown (see grey infobox above). Thanks for the hint:public void setState( int state ) {
int oldState = this.state.getAndSet( state );
if( oldState != state ) {
broadcast( new StateEvent( oldState, state ) );
}
}
Changed the code accordingly.
Hello,
The versions of StateHolder with narrowed synchronization and atomic integer are not 100% correct, since there might be cases of changes in the state without an event being sent. It happens when first of the threads changing state is suspended just after entering method and before executing first instruction.
Let’s assume we have 2 points in time, T1 and T2, where T1 < T2. Let’s also assume that initial state is 0.
At T1, thread A calls setState(5). At T2, thread B calls setState(0).
Since T1 5
5 -> 0
However, it might happen that the listeners will be called only once, for 0 -> 5 change.
Here is the example of execution of setState method for the StateHolder version with narrowed down synchronization:
A: [enter setState, state parameter = 5]
B: [enter setState, state parameter = 0]
B: int oldState = this.state; // oldState = 0
A: int oldState = this.state; // oldState = 0
A: this.state = state; // this.state = 5;
B: this.state = state; // this.state = 0;
B: if (oldState != state) { // oldState = 0, state = 0, following block not executed
B: [exit setState]
A: if (oldState != state) { // oldState = 0, state = 5, following block executed
A: broadcast(new StateEvent(oldState, state)); // 0 -> 5 change published
B: [exit setState]
As you can see, 5 -> 0 change is not published.
Hell, seems you are right – there might be a synchronization problem. But it is a bit more subtle. Looking at your example I think it is ok that the event 5 -> 0 is not published. Why? Because it never happened.
You are expecting the event notification decision to be based on the state given at T1 and T2, assuming that those mark the atomic boundaries of the
StateHolder
state. However this is not the case.Entering
setState(int)
does not mark the atomic state-change boundaries, the synchronized block does. Which means in your example thread B simply overtakes thread A and therefore changes 0 to 0. Hence it is correct that no event is thrown. But…Determination of
oldState
is done by readingthis.state
outside of the guarded block. Which means the visibility of state changes of another thread are not guaranteed at all. Hence the value ofoldState
is completely unpredictable from the concurrency point of view (see Java Concurrency in Practice, 3.1 Visibility).The version with
AtomicInteger
is correct, since its atomicgetAndSet(int)
method is used (again thanks to Leonard Brünings above), which guarantees the state value gets properly published to other threads.So you just found another pitfall one (meaning me…) easily can run into.I guess this means
this.state
should either be declared volatile or the guarding block should be written somewhat like the following snippet:public void setState( int state ) {
int oldState;
synchronized( listeners ) {
oldState = this.state;
this.state = state;
}
if( oldState != state ) {
broadcast( new StateEvent( oldState, state ) );
}
}
Thanks for bringing this up, I will update the post accordingly
Hi Frank,
I have a situation where I’m receiving the values from devices in my REST services (deployed on wildfly) which currently is storing them in database (SQL Server 2012). As per the new requirement, I now have to create a system which should notify the external client if the device state has been modified.
The client code which I have implemented will be used as library by the external client developers who actually should be notified of the value change.
Could you please help me out and point me into the right direction.
Thanks in advance :-)
Hi Ayush, thanks for reading this post. However, I believe that your request is a bit out of the scope of the article’s content. And, since I’m pretty busy right now please forgive me that I can’t give you any more specific advice as the one I’ve already given in the explanations above. Regards Frank Appel