Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Fabric3 supports low-latency, high performance communications between clients and services and producers and consumers (Pub/Sub) via a binding built on the ZeroMQ Library. The following messaging patterns are supported:

  • Non-blocking queue-based messaging
  • Non-blocking pub/sub messaging
  • Request-reply messaging
  • Callbacks

It is recommended for performance reasons that services uses non-blocking (asynchronous) ZeroMQ messaging, that is, either one-way service invocations or channels. If a service needs to return a response to a client, callbacks will generally scale better than blocking-request-response messaging.

...

Pub/Sub

...

Communications

ZeroMQ can be used to bind a channel so that producers and consumers can connect to it remotely. In the following example, ProducerComposite and ConsumerComposite are deployed to different zones (they could also be deployed to the same zone). Fabric3 will transparently setup ZeroMQ pub/sub sockets and propagate the socket addresses so that the publisher and consumer can connect remotely:

...

The following excerpt shows how this channel binding is configured using XML:

Code Block
xml
xml
<composite name="ProducerCompositeChannelComposite" ...>

   <channel <component name="TestProducerTestChannel">
      <f3:binding.zeromq/>
   <implementation.java class="..."/>
 </channel>

</composite>

Consuming External Event Streams

The ZeroMQ binding can be used to connect a channel to an external event stream published by one or more external ZeroMQ publishers:

Image Added

To bind a channel to an external source, the ZeroMQ binding is configured with the IP address and port of the event source socket. If more than one source publishes events, multiple addresses can be specified as a space-delimited list:

 

Code Block
xml
xml
<composite ...>

   <channel name="TestChannel">
      <producer name="channel" target="TestChannel<f3:binding.zeromq addresses="192.0.2.1:12001 192.0.2.2:12001""/>
    </component>channel>

</composite>

<composite name="ChannelComposite"

Publishing to External Event Streams

The ZeroMQ binding can also be used to publish events to external ZeroMQ subscribers:

Image Added

To publish events to external subscribers, an IP address and port are specified as part of the ZeroMQ binding configuration:

Code Block
xml
xml
<composite ...>


   <channel name="TestChannel">
        <f3:binding.zeromq addresses="192.0.2.1:12001"/>
    </channel>

</composite>


Serializing Message Payloads

When using the ZeroMQ binding, it is recommended to use a high-performance, interoperable serialization library such as Protocol Buffers. This is done by having producers publish a byte array and consumers receive a byte array. The following excerpt is taken from the FastQuote sample and shows a consumer that receives a byte array and decodes it as a protocol buffer:

Code Block
languagejava
public 
<composite name="ConsumerComposite"class Gateway {
    ...>
 
    <component@Consumer(value name= "TestConsumer">providerChannel", sequence = 1)
    public void onPrice(byte[] serialized) {
        try {
            <implementationPriceProtos.javaPrice protoPrice class=" PriceProtos.Price.."/>
  parseFrom(serialized);
            double value = protoPrice.hasBidPrice() ? protoPrice.getBidPrice() : protoPrice.getAskPrice();
     <consumer name="channel" source="TestChannel"/>     </component>Price price </composite>= 

 

 

Using ZeroMQ as the SCA Binding

Typically, ZeroMQ is used as the default remote transport when no binding is configured (the "default" binding is called the SCA binding, or "binding.sca"). When used in this fashion, Fabric3 will automatically provision ZeroMQ sockets as well as propagate socket address information across clusters so that remote clients can connect. In addition, if socket address information changes (e.g. a runtime joins a cluster or drops out), this information will be automatically propagated and updates applied across clusters.

Assuming the ZeroMQ profile is installed in a runtime, the following example demonstrates how ZeroMQ can be used as the SCA binding:

Code Block
xmlxml
<composite name="ClientComposite" ...>

    <component name="TestProducer">new Price(protoPrice.getSymbol(), value, correlationId);
            pricingService.marginAndSend(price);
        } catch (InvalidProtocolBufferException e) {
            monitor.error(e);
        }
    }
} 

The following illustrates sending an encoded protocol buffer to a channel:

Code Block
languagehtml/xml
@Composite
public class ProviderComponent implements Runnable {
    ...
 
    @Producer
    protected ProviderChanel providerChannel;


    public void run() {
        while (!shutdown.get()) {
            PriceProtos.Price.Builder builder = PriceProtos.Price.newBuilder();
            <implementation.java class="..."/>builder.setVenueId(0);
            builder.setType(PriceProtos.Price.Type.SPOT).setBidPrice(10).setBidSize(100);
            int modulo = sequence % NUMBER_CURRENCY_PAIRS;
            generatePrice(builder, modulo);
            generateSize(builder, modulo);
            if (sequence == Integer.MAX_VALUE) {
                <referencesequence name="service" target="TestService"/>= 0;
            } else {
                sequence++;
            }
            System.out.println("Sending: " + builder.getBidSize() + "@" + builder.getBidPrice());
      </component>  </composite>  <composite name="ServiceComposite" providerChannel.send(builder..>build().toByteArray());
            try {
                Thread.sleep(waitTime);
            } catch (InterruptedException e) {
                Thread.interrupted();
            }
     <component name="TestService">  }
      <implementation.java class="..."/>
    </component>

</composite>

Notice there is no binding configuration. If the ClientComposite and ServiceComposite are deployed to different zones, Fabric3 will create the appropriate ZeroMQ socket type to wire the components. The socket type chose will depend on the service contract. For example, a service contract with @OneWay operations will create a ZeroMQ non-blocking socket (ZMQ.PUSH). However, if the service contract has request-reply operations, a ZeroMQ request-response (ZMQ.XREQ) socket is created. Similarly, if the service interface requires a callback, Fabric3 will create a callback socket for replies.

Configuration

The following ZeroMQ values may be configured as attributes on the <f3:zeromq.binding> element contained in the controller and single-VM configurations (systemConfig.xml):

  • high.water
  • multicastRate
  • multicastRecovery
  • sendBuffer
  • receiveBuffer

The following ZeroMQ values may be configured as attributes on the <f3:zeromq.binding> element contained in the participant and single-VM configurations:

...

}

 

Note that serializable Java types are also supported for sending and receiving events but are strongly discouraged as they are not interoperable and incur a significant performance penalty.

Accessing Headers

It is possible for a consumer to access a header (ZeroMQ frame) by declaring a two dimensional byte array as a parameter type on the consumer method:

Code Block
languagejava
public class Gateway {
    ...
 
    @Consumer
    public void onPrice(byte[] header, byte[] serialized) {
       ...
    }
} 

This is useful when receiving messages from external ZeroMQ publishes that send multi-frame messages.

Performance Considerations

When using the ZeroMQ binding with channels, it is strongly recommended to use ring buffer channels as described in the next section. Ring buffer channels provide low contention and allow business logic to be isolated on a thread that is separate from IO operations. If ring buffer channels are not used (i.e. default channels), ZeroMQ consumers will be invoked on the same thread as the socket read. In some situations, this may be the desired result and perform adequately.

In addition, it is highly recommended to use the bytecode proxy generation extension (Maven coordinates: org.fabric3:fabric3-bytecode-proxy) to avoid the overhead and object creation associated with JDK proxies.

Port Allocation

The ZeroMQ binding attempts to acquire ports for sockets from the block configured for a particular runtime or zone. For information on how to configure a port block, see see Port Allocation.

Multi-Homed Configuration

To force ZeroMQ to bind sockets to a particular address on a multi-homed machine, use the host.address attribute on the <f3:runtime> element in the participant or single-VM systemConfig. For details, see Multi-Homed Configuration.