Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The following excerpt shows how this channel binding is configured using XML:

Code Block
xml
xml
<composite name="ProducerComposite" ...>

   <component name="TestProducer">
      <implementation.java class="..."/>
      <producer name="channel" target="TestChannel"/>
   </component>
</composite>

<composite name="ChannelComposite"...>

   <channel name="TestChannel">
      <f3:binding.zeromq/>
   </channel>

</composite>

<composite name="ConsumerComposite" ...>

   <component name="TestConsumer">


    <implementation.java class="..."/>
      <consumer name="channel" source="TestChannel"/>
   </component>

</composite>

Consuming External Event Streams

The ZeroMQ binding can be used to connect a channel to an external event stream published by one or more external event sourcesZeroMQ publishers:

To bind a channel to an external source, the ZeroMQ binding is configured with the IP address and port of the event source socket. If more than one source publishes events, multiple addresses can be specified as a space-delimited list:

...

Code Block
xml
xml
<composite ...>

   <channel name="TestChannel">
      <f3:binding.zeromq addresses="192.0.2.1:12001 192.0.2.2:12001""/>
   </channel>

</composite>

Publishing to External Event Streams

The ZeroMQ binding can also be used to publish events to external ZeroMQ subscribers:

Image Added

To publish events to external subscribers, an IP address and port are specified as part of the ZeroMQ binding configuration:

Code Block
xml
xml
<composite ...>

  <component <channel name="TestConsumerTestChannel">
      <implementation<f3:binding.javazeromq classaddresses="192.0.2.1:12001"/>
   </channel>

</composite>


Serializing Message Payloads

When using the ZeroMQ binding, it is recommended to use a high-performance, interoperable serialization library such as Protocol Buffers. This is done by having producers publish a byte array and consumers receive a byte array. The following excerpt is taken from the FastQuote sample and shows a consumer that receives a byte array and decodes it as a protocol buffer:

Code Block
languagejava
public class Gateway {
    ...
 
   <consumer @Consumer(value name= "providerChannel", sequence = 1)
    public void onPrice(byte[] serialized) {
        try {
            PriceProtos.Price protoPrice = PriceProtos.Price.parseFrom(serialized);
            double value = protoPrice.hasBidPrice() ? protoPrice.getBidPrice() : protoPrice.getAskPrice();
            Price price = new Price(protoPrice.getSymbol(), value, correlationId);
            pricingService.marginAndSend(price);
        } catch (InvalidProtocolBufferException e) {
            monitor.error(e);
        }
    }
} 

The following illustrates sending an encoded protocol buffer to a channel:

Code Block
languagehtml/xml
@Composite
public class ProviderComponent implements Runnable {
    ...
 
    @Producer
    protected ProviderChanel providerChannel;


    public void run() {
        while (!shutdown.get()) {
            PriceProtos.Price.Builder builder = PriceProtos.Price.newBuilder();
            builder.setVenueId(0);
            builder.setType(PriceProtos.Price.Type.SPOT).setBidPrice(10).setBidSize(100);
            int modulo = sequence % NUMBER_CURRENCY_PAIRS;
            generatePrice(builder, modulo);
            generateSize(builder, modulo);
            if (sequence == Integer.MAX_VALUE) {
                sequence = 0;
            } else {
                sequence++;
            }
            System.out.println(" source="TestChannel"/>
   </component>

</composite>

Publishing to External Event Streams

...

Sending: " + builder.getBidSize() + "@" + builder.getBidPrice());
            providerChannel.send(builder.build().toByteArray());
            try {
                Thread.sleep(waitTime);
            } catch (InterruptedException e) {
                Thread.interrupted();
            }
        }
    }

 

Note that serializable Java types are also supported for sending and receiving events but are strongly discouraged as they are not interoperable and incur a significant performance penalty.

Accessing Headers

It is possible for a consumer to access a header (ZeroMQ frame) by declaring a two dimensional byte array as a parameter type on the consumer method:

Code Block
languagejava
public class Gateway {
    ...
 
    @Consumer
    public void onPrice(byte[] header, byte[] serialized) {
       ...
    }
} 

This is useful when receiving messages from external ZeroMQ publishes that send multi-frame messages.

Performance Considerations

When using the ZeroMQ binding with channels, it is strongly recommended to use ring buffer channels as described in the next section. Ring buffer channels provide low contention and allow business logic to be isolated on a thread that is separate from IO operations. If ring buffer channels are not used (i.e. default channels), ZeroMQ consumers will be invoked on the same thread as the socket read. In some situations, this may be the desired result and perform adequately.

In addition, it is highly recommended to use the bytecode proxy generation extension (Maven coordinates: org.fabric3:fabric3-bytecode-proxy) to avoid the overhead and object creation associated with JDK proxies.

Port Allocation

The ZeroMQ binding attempts to acquire ports for sockets from the block configured for a particular runtime or zone. For information on how to configure a port block, see Port Allocation.