Getting Java Event Notification Right

Home  >>  Common  >>  Getting Java Event Notification Right

Getting Java Event Notification Right

On March 11, 2015, Posted by , In Common,Spotlight, By ,, , With 6 Comments

Implementing the observer pattern to provide Java event notification seems to be a straight forward thing to do. However, there are some pitfalls one easily can run into. Here comes an explanation of common mistakes I carelessly have produced myself on various occasions…

Java Event Notification

Let’s start with a simple bean StateHolder that encapsulates a private int field state with appropriate accessors:

public class StateHolder {

  private int state;

  public int getState() {
    return state;
  }

  public void setState( int state ) {
    this.state = state;
  }
}

Consider that we have decided our bean should broadcast the news of state changes to registered observers. No problem at all! A convenient event and listener definition is easy to create…

// change event to broadcast
public class StateEvent {

  public final int oldState;
  public final int newState;

  StateEvent( int oldState, int newState ) {
    this.oldState = oldState;
    this.newState = newState;
  }
}

// observer interface
public interface StateListener {
  void stateChanged( StateEvent event );
}

…next we need to be able to register StatListeners at StateHolder instances…

public class StateHolder {

  private final Set<StateListener> listeners = new HashSet<>();

  [...]
     
  public void addStateListener( StateListener listener ) {
    listeners.add( listener );
  }

  public void removeStateListener( StateListener listener ) {
    listeners.remove( listener );
  }
}

… and last but not least StateHolder#setState have to be adjusted to trigger the actual notification on state changes:

public void setState( int state ) {
  int oldState = this.state;
  this.state = state;
  if( oldState != state ) {
    broadcast( new StateEvent( oldState, state ) );
  }
}

private void broadcast( StateEvent stateEvent ) {
  for( StateListener listener : listeners ) {
    listener.stateChanged( stateEvent );
  }
}

Bingo! That’s all there is. Being professionals we might even have implemented this test driven and feel comfortable with our thorough code coverage and the green bar. And anyway isn’t this what we have learned from tutorials on the web?

So here comes the bad news: The solution is flawed…

Concurrent Modification

Given the StateHolder above one can easily run into a ConcurrentModificationException, even if used within single thread confinement only. But who is causing it and why does it occur?

java.util.ConcurrentModificationException
	at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
	at java.util.HashMap$KeyIterator.next(HashMap.java:1453)
	at com.codeaffine.events.StateProvider.broadcast(StateProvider.java:60)
	at com.codeaffine.events.StateProvider.setState(StateProvider.java:55)
	at com.codeaffine.events.StateProvider.main(StateProvider.java:122)

A look at the stacktrace uncovers that the exception is thrown by an Iterator of the HashMap we use. Only that we did not use any iterators in our code, or did we? Well, we did. The for each construct in broadcast is based on Iterable and therefore gets transformed into an iterator loop at compile time.

Because of this a listener removing itself from the StateHolder instance during event notification might cause the ConcurrentModificationException. So instead of working on the original data structure one solution could be to iterate over a snapshot of listeners.

By doing so listener removal cannot interfere with the broadcast mechanism anymore (but note that notification semantics changes also slightly, since such a removal is not reflected by the snapshot while broadcast gets executed):

private void broadcast( StateEvent stateEvent ) {
  Set<StateListener> snapshot = new HashSet<>( listeners );
  for( StateListener listener : snapshot ) {
    listener.stateChanged( stateEvent );
  }
}

But what if StateHolder is meant to be used within a multi threaded context?

Synchronization

To be able to use StateHolder within a multi-threaded environment it has to be thread safe. This can be achieved quite easily. Adding synchronized to each method of our class should do the trick, right?

public class StateHolder {
  public synchronized void addStateListener( StateListener listener ) {  [...]
  public synchronized void removeStateListener( StateListener listener ) {  [...]
  public synchronized int getState() {  [...]
  public synchronized void setState( int state ) {  [...]

Now read/write access to a StateHolder instance is guarded by its intrinsic lock. This makes the public methods atomic and ensures correct state visibility for different threads. Mission accomplished!

See also  JUnit 5 - A First Look at the Next Generation of JUnit

Not quite… although the implementation is thread safe, it bears the risk to dead lock applications that use it.

Think about the following situation: Thread A changes the state of StateHolder S. During the notification of the listeners of S Thread B tries to access S and gets blocked. If B holds a synchronization lock on an object that is about to be notified by one of the listeners of S, we run into a deadlock.

That is why we need to narrow down synchronization to state access and broadcast the event outside of the guarded passages:

public class StateHolder {

  private final Set<StateListener> listeners = new HashSet<>();
  private int state;

  public void addStateListener( StateListener listener ) {
    synchronized( listeners ) {
      listeners.add( listener );
    }
  }

  public void removeStateListener( StateListener listener ) {
    synchronized( listeners ) {
      listeners.remove( listener );
    }
  }

  public int getState() {
    synchronized( listeners ) {
      return state;
    }
  }

  public void setState( int state ) {
    int oldState = this.state;
    synchronized( listeners ) {
      this.state = state;
    }
    if( oldState != state ) {
      broadcast( new StateEvent( oldState, state ) );
    }
  }

  private void broadcast( StateEvent stateEvent ) {
    Set<StateListener> snapshot;
    synchronized( listeners ) {
      snapshot = new HashSet<>( listeners );
    }
    for( StateListener listener : snapshot ) {
      listener.stateChanged( stateEvent );
    }
  }
}

Update 2015/03/14: While this code does not run into a deadlock, it still is not correct as the discussion started by Rafal Lyzwa seems to reveal. There are situations where notifications might be swallowed or propagate wrong values as the value of oldState is unpredictable (see Java Concurrency in Practice, Chapter 3.1 Visibility).

The following version of StateHolder#setState(int) shows how this can be solved by reading this.state within the guarded block. Another option could be to declare the state field as volatile:

public void setState( int state ) {
  StateEvent event;
  synchronized( listeners ) {
    event = new StateEvent( this.state, state );
    this.state = state;
  }
  if( event.oldState != event.newState ) {
    broadcast( event );
  }
}

The listing now shows an implementation evolved from the previous snippets providing proper (but somewhat old fashioned) synchronization using the Set instance as internal lock. Listener notification takes place outside of the guarded block and therefore avoids a circular wait.

Note: Due to the concurrent nature of the system, the solution does not guarantee that change notifications reach a listener in the order they occured. If more accuracy about the actual state value on observer side is needed, consider to provide the StateHolder as source of your event object.

If event ordering is crucial one could think of a thread safe FIFO structure to buffer events along with the according listener snapshot in the guarded block of setState. A separate thread could fire the actual event notifications from an unguarded block as long as the FIFO structure is not empty (Producer-Consumer-Pattern). This should ensure chronological order without risking a dead lock. I say should since I never tried this solution by myself..

Given the sematics of the previous implementation, composing our class using thread safe classes like CopyOnWriteArraySet and AtomicInteger makes the solution less verbose:

public class StateHolder {

  private final Set<StateListener> listeners = new CopyOnWriteArraySet<>();
  private final AtomicInteger state = new AtomicInteger();

  public void addStateListener( StateListener listener ) {
    listeners.add( listener );
  }

  public void removeStateListener( StateListener listener ) {
    listeners.remove( listener );
  }

  public int getState() {
    return state.get();
  }

  public void setState( int newState ) {
    StateEvent event = new StateEvent( state.getAndSet( newState ), newState );
    if( event.oldState != event.newState ) {
      broadcast( event );
    }
  }

  private void broadcast( StateEvent stateEvent ) {
    for( StateListener listener : listeners ) {
      listener.stateChanged( stateEvent );
    }
  }
}

Since CopyOnWriteArraySet and AtomicInteger are thread safe we do not have the need for guarded blocks anymore. But wait a moment! Didn’t we just learn to use a snapshot for broadcasting instead of looping over an hidden iterator of the origin set?

See also  Testing with JUnit: E-Book Xmas Offering

It might be a bit confusing, but an Iterator provided by CopyOnWriteArraySet is already a snapshot. CopyOnWriteXXX collections were invented particularly for such use cases – efficient if small sized, optimized for frequent iteration with rarely changing content. Which means our code is safe.

With Java 8 the broadcast method could be stripped down even more using Iterable#forEach in conjunction with lambdas. The code of course stays safe as iteration is also performed on a snapshot:

private void broadcast( StateEvent stateEvent ) {
  listeners.forEach( listener -> listener.stateChanged( stateEvent ) );
}

Exception Handling

The last section of this post discusses how to handle broken listeners that throw unexpected RuntimeExceptions. Although I usually opt strictly for a fail-fast approach, it might be inappropriate in this case to let such exceptions pass unhandled. Given in particular that the implementation is probably used in a multi-threading environment.

A broken listener harms the system in two ways. First, it prevents notification of those observers assorted after our bogey. Second, it can harm the calling thread which may not be prepared to deal with the problem. Summarized it can lead to multiple, sneaking malfunctions of which the initial cause might be hard to track down.

Hence it might be useful to shield each notification within a try-catch block:

private void broadcast( StateEvent stateEvent ) {
  listeners.forEach( listener -> notifySafely( stateEvent, listener ) );
}

private void notifySafely( StateEvent stateEvent, StateListener listener ) {
  try {
    listener.stateChanged( stateEvent );
  } catch( RuntimeException unexpected ) {
    // appropriate exception handling goes here...
  }
}

Conclusion

As shown in the sections above Java event notification has a few nuts and bolts one has to keep in mind. Ensure to iterate over a snapshot of the listener collection during event notification, keep event notification out of synchronized blocks and notify listeners safely if appropriate.

Hopefully, I was able to work out the subtleties in a comprehensible way and did not mess up, in particular, the concurrency sections. In case, you find some mistakes or have an additional wisdom to share, feel free to use the commenting sections below.

Title Image: © Depositphotos.com/sumkinn
Frank Appel
Follow me
Latest posts by Frank Appel (see all)

6 Comments so far:

  1. Leonard Brünings says:

    You should use AtomicInteger.getAndSet(newValue), otherwise your code won’t necessarily fire the correct events.

    • Frank Appel says:

      Although I think it should be save with respect to the new/old value relation as StateEvent is made immutable with final fields and therefore the values should be published correctly, I consider it an improvement to write setState as shown in the following listing. However this does not ensure that current state of StateHolder and the value StateEvent#newState are in sync when the event is thrown (see grey infobox above). Thanks for the hint:


      public void setState( int state ) {
      int oldState = this.state.getAndSet( state );
      if( oldState != state ) {
      broadcast( new StateEvent( oldState, state ) );
      }
      }

      Changed the code accordingly.

  2. Rafal Lyzwa says:

    Hello,

    The versions of StateHolder with narrowed synchronization and atomic integer are not 100% correct, since there might be cases of changes in the state without an event being sent. It happens when first of the threads changing state is suspended just after entering method and before executing first instruction.

    Let’s assume we have 2 points in time, T1 and T2, where T1 < T2. Let’s also assume that initial state is 0.
    At T1, thread A calls setState(5). At T2, thread B calls setState(0).
    Since T1 5
    5 -> 0

    However, it might happen that the listeners will be called only once, for 0 -> 5 change.

    Here is the example of execution of setState method for the StateHolder version with narrowed down synchronization:

    A: [enter setState, state parameter = 5]
    B: [enter setState, state parameter = 0]
    B: int oldState = this.state; // oldState = 0
    A: int oldState = this.state; // oldState = 0
    A: this.state = state; // this.state = 5;
    B: this.state = state; // this.state = 0;
    B: if (oldState != state) { // oldState = 0, state = 0, following block not executed
    B: [exit setState]
    A: if (oldState != state) { // oldState = 0, state = 5, following block executed
    A: broadcast(new StateEvent(oldState, state)); // 0 -> 5 change published
    B: [exit setState]

    As you can see, 5 -> 0 change is not published.

    • Frank Appel says:

      Hell, seems you are right – there might be a synchronization problem. But it is a bit more subtle. Looking at your example I think it is ok that the event 5 -> 0 is not published. Why? Because it never happened.

      You are expecting the event notification decision to be based on the state given at T1 and T2, assuming that those mark the atomic boundaries of the StateHolder state. However this is not the case.
      Entering setState(int) does not mark the atomic state-change boundaries, the synchronized block does. Which means in your example thread B simply overtakes thread A and therefore changes 0 to 0. Hence it is correct that no event is thrown. But…

      Determination of oldState is done by reading this.state outside of the guarded block. Which means the visibility of state changes of another thread are not guaranteed at all. Hence the value of oldState is completely unpredictable from the concurrency point of view (see Java Concurrency in Practice, 3.1 Visibility).

      The version with AtomicInteger is correct, since its atomic getAndSet(int) method is used (again thanks to Leonard Brünings above), which guarantees the state value gets properly published to other threads.

      So you just found another pitfall one (meaning me…) easily can run into.I guess this means this.state should either be declared volatile or the guarding block should be written somewhat like the following snippet:


      public void setState( int state ) {
      int oldState;
      synchronized( listeners ) {
      oldState = this.state;
      this.state = state;
      }
      if( oldState != state ) {
      broadcast( new StateEvent( oldState, state ) );
      }
      }

      Thanks for bringing this up, I will update the post accordingly

  3. Ayush Mathur says:

    Hi Frank,
    I have a situation where I’m receiving the values from devices in my REST services (deployed on wildfly) which currently is storing them in database (SQL Server 2012). As per the new requirement, I now have to create a system which should notify the external client if the device state has been modified.
    The client code which I have implemented will be used as library by the external client developers who actually should be notified of the value change.
    Could you please help me out and point me into the right direction.

    Thanks in advance :-)

    • Frank Appel says:

      Hi Ayush, thanks for reading this post. However, I believe that your request is a bit out of the scope of the article’s content. And, since I’m pretty busy right now please forgive me that I can’t give you any more specific advice as the one I’ve already given in the explanations above. Regards Frank Appel