fabric3
Content
Documentation
Developer Reference
Channels
Ring Buffer (Disruptor) Channels
Ring Buffer (Disruptor) Channels
search
attachments
weblink
advanced
image-effects
image-attributes
Paragraph
Paragraph
Heading 1
Heading 2
Heading 3
Heading 4
Heading 5
Heading 6
Preformatted
Quote
Bold
Italic
Underline
Colour picker
More colours
Formatting
Strikethrough
Subscript
Superscript
Monospace
Clear formatting
Bullet list
Numbered list
Task list
Outdent
Indent
Align left
Align center
Align right
Page layout
Link
Table
Insert
Insert content
Files and images
Link
Symbol
Emoticon
Markup
Horizontal rule
Insert macro
User mention
Jira Issue/Filter
Info
Status
Gallery
Table of Contents
Jira timeline
Lucidchart Diagrams
Trello Board
Other macros
Page layout
No layout
Two column (simple)
Two column (simple, left sidebar)
Two column (simple, right sidebar)
Three column (simple)
Two column
Two column (left sidebar)
Two column (right sidebar)
Three column
Three column (left and right sidebars)
Find/Replace
Keyboard shortcuts help
You are not logged in. Any changes you make will be marked as
anonymous
. You may want to
Log In
if you already have an account.
<h1>Overview</h1><p>Fabric3 provides support for ring buffer channels based on the <a href="http://lmax-exchange.github.io/disruptor/">LMAX Disruptor</a>. Ring buffer channels are particularly useful in latency-sensitive applications as they reduce contention an unnecessary object allocation. The default channel implementation uses the runtime <code>ExecutorService</code> to schedule asynchronous event dispatch on a pooled thread. Placing work on the <code>ExecutorService</code> pool can result in contention and object creation, which can be prohibitive in the most demanding high performance applications.</p><p>Ring buffer channels are designed to avoid contention by placing events on a circular buffer implementation, in this case the LMAX Disruptor. Consumers running on different threads receive events from the buffer.</p><p><img class="confluence-embedded-image" loading="lazy" src="https://fabric3.atlassian.net/wiki/download/attachments/1212494/Slide8.jpg?version=1&modificationDate=1374525431274&cacheVersion=1&api=v2" data-image-src="https://fabric3.atlassian.net/wiki/download/attachments/1212494/Slide8.jpg?version=1&modificationDate=1374525431274&cacheVersion=1&api=v2" data-unresolved-comment-count="0" data-linked-resource-id="1474569" data-linked-resource-version="1" data-linked-resource-type="attachment" data-linked-resource-default-alias="Slide8.jpg" data-base-url="https://fabric3.atlassian.net/wiki" data-linked-resource-content-type="image/jpeg" data-linked-resource-container-id="1212494" data-linked-resource-container-version="14" data-media-id="2f656bdc-14b4-489e-83e6-82742e9ebfaa" data-media-type="file" title="fabric3 > Ring Buffer (Disruptor) Channels > Slide8.jpg" data-location="fabric3 > Ring Buffer (Disruptor) Channels > Slide8.jpg" data-image-height="405" data-image-width="720"></p><p>Using Disruptor-based channels is simple. All that needs to be done is to specify the <code>ring.buffer</code> channel type as shown below:</p><table class="wysiwyg-macro" data-macro-name="code" data-macro-parameters="language=html/xml" data-macro-schema-version="1" style="background-image: url(https://fabric3.atlassian.net/wiki/plugins/servlet/confluence/placeholder/macro-heading?definition=e2NvZGU6bGFuZ3VhZ2U9aHRtbC94bWx9&locale=en_GB&version=2); background-repeat: no-repeat;" data-macro-body-type="PLAIN_TEXT"><tr><td class="wysiwyg-macro-body"><pre><composite ...> <channel name="OrderChannel" type="ring.buffer"/> </composite> </pre></td></tr></table><p>No special coding is required for producers and consumers:</p><table class="wysiwyg-macro" data-macro-name="code" data-macro-parameters="language=html/xml" data-macro-schema-version="1" style="background-image: url(https://fabric3.atlassian.net/wiki/plugins/servlet/confluence/placeholder/macro-heading?definition=e2NvZGU6bGFuZ3VhZ2U9aHRtbC94bWx9&locale=en_GB&version=2); background-repeat: no-repeat;" data-macro-body-type="PLAIN_TEXT"><tr><td class="wysiwyg-macro-body"><pre>public void class OrderPlacer ... { @Producer(target="OrderChannel") protected OrderChannel channel; public void placeOrder(Order order) { channel.publish(order); } } </pre></td></tr></table><table class="wysiwyg-macro" data-macro-name="code" data-macro-parameters="language=html/xml" data-macro-schema-version="1" style="background-image: url(https://fabric3.atlassian.net/wiki/plugins/servlet/confluence/placeholder/macro-heading?definition=e2NvZGU6bGFuZ3VhZ2U9aHRtbC94bWx9&locale=en_GB&version=2); background-repeat: no-repeat;" data-macro-body-type="PLAIN_TEXT"><tr><td class="wysiwyg-macro-body"><pre>public void class OrderTaker ... { @Consumer(source="OrderChannel") public void onOrder(Order order) { .... } } </pre></td></tr></table><h1>Configuration</h1><p>Ring buffer channels have the following configuration options:</p><ul><li><code>ring.size</code> - The number of elements (slots) to create in the ring buffer. </li><li><code>wait.strategy</code> - The consumer wait strategy. Valid values are LOCKING, YIELDING, SLEEPING, BACKOFF, SPIN, and TIMEOUT.</li><li><code>blocking.timeout</code> - The consumer blocking timeout in nanoseconds.</li><li><code>spin.timeout</code> - The consumer spin timeout in nanoseconds.</li><li><code>yield.timeout</code> - The consumer yield timeout in nanoseconds.</li><li><code>phased.blocking.type</code> - The phased blocking type. Valid values are LOCK and SLEEP.</li></ul><p>For more information on these values, refer to the <a href="http://lmax-exchange.github.io/disruptor/">LMAX Disruptor</a> site.</p><h1>Bindings</h1><p>Disruptor-based channels may be combined with messaging technologies such as <a class="confluence-link" href="/wiki/spaces/FABRIC/pages/524334/The+ZeroMQ+Binding" data-linked-resource-id="524334" data-linked-resource-version="24" data-linked-resource-type="page" data-linked-resource-default-alias="The ZeroMQ Binding" data-base-url="https://fabric3.atlassian.net/wiki">The ZeroMQ Binding</a> to create high performance distributed services. The FastQuote sample application demonstrates using ring buffer channels in conjunction with ZeroMQ and Google Protocol Buffers in a trading application that achieves microsecond processing times. For more details, see the respective binding sections.</p><p> </p><p><span style="font-size: 24.0px;line-height: 1.25;">Performance Tuning</span></p><p>By default, Fabric3 uses JDK proxies to create producer proxies. JDK proxies are less performant than handwritten code and allocate objects during invocation (for example, an array to create parameter values). Fabric3 provides an optional extension that uses bytecode generation to create proxies. This results in proxies that are as fast as handwritten code and do not allocate objects during invocation. To enable bytecode generation, the <em>fabric3-bytecode-proxy</em> module must be installed in the runtime. Its dependency coordinates are <code><a href="http://fabric3fabric3-bytecode-proxy">org.fabric3:fabric3-bytecode-proxy</a>.</code></p><h1>Advanced Features</h1><p>Ring buffer channels support a number of advanced features included ordered consumers, worker pools, and batching. Note that the <em>channels</em> sample application provides examples of these advanced features.</p><h3>Ordered Consumers</h3><p>The <code>@Consumer</code> annotation contains the <code>sequence</code> attribute. A sequence may be used to specify the order in which a consumer should receive messages. For example, if a channel has three consumers and one of the consumers must be called first, it must specify a lower sequence number than the others (the default sequence number is 0):</p><table class="wysiwyg-macro" data-macro-name="code" data-macro-parameters="language=java" data-macro-schema-version="1" style="background-image: url(https://fabric3.atlassian.net/wiki/plugins/servlet/confluence/placeholder/macro-heading?definition=e2NvZGU6bGFuZ3VhZ2U9amF2YX0&locale=en_GB&version=2); background-repeat: no-repeat;" data-macro-body-type="PLAIN_TEXT"><tr><td class="wysiwyg-macro-body"><pre>public class Deserializer { @Consumer (sequence = 0) public onOrder(ChannelEvent event) { ... } } public class OrderTaker { @Consumer (sequence = 1) public onOrder(ChannelEvent event) { ... } } </pre></td></tr></table><p>Specifying a sequence is useful to perform tasks such as deserializing, replication, and journaling. In the above example, a deserializer is called before the <code>OrderTaker</code> consumer. Notice that the parameter type is <code>org.fabric3.api.ChannelEvent</code>, which is a specialized type:</p><table class="wysiwyg-macro" data-macro-name="code" data-macro-parameters="language=java" data-macro-schema-version="1" style="background-image: url(https://fabric3.atlassian.net/wiki/plugins/servlet/confluence/placeholder/macro-heading?definition=e2NvZGU6bGFuZ3VhZ2U9amF2YX0&locale=en_GB&version=2); background-repeat: no-repeat;" data-macro-body-type="PLAIN_TEXT"><tr><td class="wysiwyg-macro-body"><pre>public interface ChannelEvent { /** * Returns the raw event. * * @return the event */ <T> T getEvent(Class<T> type); /** * Returns the parsed event if applicable; otherwise null. * * @return the parsed event or null */ <T> T getParsed(Class<T> type); /** * Sets the parsed event. * * @param parsed the event */ void setParsed(Object parsed); /** * Returns true if the event is an end of batch. * * @return true if the event is an end of batch. */ boolean isEndOfBatch(); /** * Returns the event sequence number or -1 if not defined. * * @return the sequence number or -1 */ long getSequence(); }</pre></td></tr></table><p>If one consumer needs to modify the event for subsequent consumers (such as a deserializer), they must accept type <code>ChannelEvent</code>. As shown above, that type contains fields for reading the raw event and setting a parsed (or modified value). </p><h3>Worker Pools</h3><p>Sometimes it is beneficial to have a pool of consumers that accept messages from a ring buffer channel. The <code>ChannelEvent</code> type provides access to the message sequence number, which can be used to determine whether a worker should process or ignore the request (effectively creating a worker pool). This can be done using a modulo operation:</p><table class="wysiwyg-macro" data-macro-name="code" data-macro-parameters="language=java" data-macro-schema-version="1" style="background-image: url(https://fabric3.atlassian.net/wiki/plugins/servlet/confluence/placeholder/macro-heading?definition=e2NvZGU6bGFuZ3VhZ2U9amF2YX0&locale=en_GB&version=2); background-repeat: no-repeat;" data-macro-body-type="PLAIN_TEXT"><tr><td class="wysiwyg-macro-body"><pre>public class PooledWorker { @Monitor protected SystemMonitor monitor; @Property protected int ordinal; @Property protected int numberOfConsumers; @Consumer public void onEvent(ChannelEvent event) { if ((event.getSequence() % numberOfConsumers) != ordinal) { // ignore the event if it is not for this consumer return; } String message = event.getEvent(String.class); monitor.process(ordinal, message); } }</pre></td></tr></table><h3>Batching</h3><p>In many situations batching provides significant performance gains when writing to a database, a file or a network socket. The <code>ChannelEvent</code> type provides an <code>isEndOfBatch</code> method which can be used to determine when the end of a batch has been received so that a flush operation can be issued.</p><p> </p><p> </p>
Save
Close
Edit
Preview
View changes
Revert to last published version
{"serverDuration": 268, "requestCorrelationId": "bbdeb46fa09a4ce1894608e1f9ed2db6"}