Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Ring buffer channels are designed to avoid contention by placing events on a circular buffer implementation, in this case the LMAX Disruptor. Consumers running on different threads receive events from the buffer.

Image Added

Using Disruptor-based channels is simple. All that needs to be done is to specify the ring.buffer channel type as shown below: 

Code Block
languagehtml/xml
<composite ...>
   <channel name="OrderChannel" type="ring.buffer"/>
</composite> 

...

Code Block
languagehtml/xml
public void class OrderPlacer ... {
 
   @Producer(target="OrderChannel")
   protected OrderChannel channel;
 
   public void placeOrder(Order order) {
      channel.publish(order);
   }
 
} 
Code Block
languagehtml/xml
public void class OrderTaker ... {
 
  @Consumer(source="OrderChannel")
  public void onOrder(Order order) {
      ....
  }
 
} 

...

By default, Fabric3 uses JDK proxies to create producer proxies. JDK proxies are less performant than handwritten code and allocate objects during invocation (for example, an array to create parameter values). Fabric3 provides an optional extension that uses bytecode generation to create proxies. This results in proxies that are as fast as handwritten code and do not allocate objects during invocation. To enable bytecode generation, the fabric3-bytecode-proxy module must be installed in the runtime. Its maven dependency coordinates are org.codehaus.fabric3:fabric3-bytecode-proxy.

...

The @Consumer annotation contains the sequence attribute. A sequence may be used to specify the order in which a consumer should receive messages. For example, if a channel has three consumers and one of the consumers must be called first, it must specify a lower sequence number than the others (the default sequence number is 0):

 

Code Block
languagejava
public class Deserializer {

   @Consumer (sequence = 0)
   public onOrder(ChannelEvent event) {
      ...
   }  
} 
 
public class OrderTaker {
 
   @Consumer (sequence = 1)
   public onOrder(ChannelEvent event) {
      ...
   }  
}  

Specifying a sequence is useful to perform tasks such as deserializing, replication, and journaling. In the above example, a deserializer is called before the OrderTaker consumer. Notice that the parameter type is org.fabric3.api.ChannelEvent, which is a specialized type:

Code Block
languagejava
public interface ChannelEvent {
    /**
     * Returns the raw event.
     *
     * @return the event
     */
    <T> T getEvent(Class<T> type);
    /**
     * Returns the parsed event if applicable; otherwise null.
     *
     * @return the parsed event or null
     */
    <T> T getParsed(Class<T> type);
    /**
     * Sets the parsed event.
     *
     * @param parsed the event
     */
    void setParsed(Object parsed);
    /**
     * Returns true if the event is an end of batch.
     *
     * @return true if the event is an end of batch.
     */
    boolean isEndOfBatch();
    /**
     * Returns the event sequence number or -1 if not defined.
     *
     * @return the sequence number or -1
     */
    long getSequence();
}

 

If one consumer needs to modify the event for subsequent consumers (such as a deserializer), they must accept type ChannelEvent. As shown above, that type contains fields for reading the raw event and setting a parsed (or modified value). 

...

Sometimes it is beneficial to have a pool of consumers that accept messages from a ring buffer channel. The ChannelEvent type provides access to the message sequence number, which can be used to determine whether a worker should process or ignore the request (effectively creating a worker pool). This can be done using a modulo operation: 

Code Block
languagejava
public class PooledWorker {
    @Monitor
    protected SystemMonitor monitor;

    @Property
    protected int ordinal;

    @Property
    protected int numberOfConsumers;

    @Consumer
    public void onEvent(ChannelEvent event) {
        if ((event.getSequence() % numberOfConsumers) != ordinal) {
            // ignore the event if it is not for this consumer
            return;
        }
        String message = event.getEvent(String.class);
        monitor.process(ordinal, message);
    }
}

...