Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current Restore this Version View Page History

« Previous Version 8 Next »

Pub/Sub Communications

ZeroMQ can be used to bind a channel so that producers and consumers can connect to it remotely. In the following example, ProducerComposite and ConsumerComposite are deployed to different zones (they could also be deployed to the same zone). Fabric3 will transparently setup ZeroMQ pub/sub sockets and propagate the socket addresses so that the publisher and consumer can connect remotely:

 

The following excerpt shows how this channel binding is configured:

<composite name="ProducerComposite" ...>

   <component name="TestProducer">
      <implementation.java class="..."/>
      <producer name="channel" target="TestChannel"/>
   </component>
</composite>

<composite name="ChannelComposite"...>

   <channel name="TestChannel">
      <f3:binding.zeromq/>
   </channel>

</composite>

<composite name="ConsumerComposite" ...>

   <component name="TestConsumer">
      <implementation.java class="..."/>
      <consumer name="channel" source="TestChannel"/>
   </component>

</composite>

Consuming External Event Streams

The ZeroMQ binding can be used to connect a channel to an external event stream published by one or more external ZeroMQ publishers:

To bind a channel to an external source, the ZeroMQ binding is configured with the IP address and port of the event source socket. If more than one source publishes events, multiple addresses can be specified as a space-delimited list:

 

<composite ...>

   <channel name="TestChannel">
      <f3:binding.zeromq addresses="192.0.2.1:12001 192.0.2.2:12001""/>
   </channel>

   <component name="TestConsumer">
      <implementation.java class="..."/>
      <consumer name="channel" source="TestChannel"/>
   </component>

</composite>

Publishing to External Event Streams

The ZeroMQ binding can also be used to publish events to external ZeroMQ subscribers:

To publish events to external subscribers, an IP address and port are specified as part of the ZeroMQ binding configuration:

<composite ...>

   <component name="TestProducer">
      <implementation.java class="..."/>
      <producer name="channel" target="TestChannel"/>
   </component>

   <channel name="TestChannel">
      <f3:binding.zeromq addresses="192.0.2.1:12001"/>
   </channel>

</composite>


Serializing Message Payloads

When using the ZeroMQ binding, it is recommended to use a high-performance, interoperable serialization library such as Protocol Buffers. This is done by having producers publish a byte array and consumers receive a byte array. The following excerpt is taken from the FastQuote sample and shows a consumer that receives a byte array and decodes it as a protocol buffer:

public class Gateway {
    ...
 
    @Consumer(value = "providerChannel", sequence = 1)
    public void onPrice(byte[] serialized) {
        try {
            PriceProtos.Price protoPrice = PriceProtos.Price.parseFrom(serialized);
            double value = protoPrice.hasBidPrice() ? protoPrice.getBidPrice() : protoPrice.getAskPrice();
            Price price = new Price(protoPrice.getSymbol(), value, correlationId);
            pricingService.marginAndSend(price);
        } catch (InvalidProtocolBufferException e) {
            monitor.error(e);
        }
    }
} 

The following illustrates sending an encoded protocol buffer to a channel:

@Composite
public class ProviderComponent implements Runnable {
    ...
 
    @Producer
    protected ProviderChanel providerChannel;


    public void run() {
        while (!shutdown.get()) {
            PriceProtos.Price.Builder builder = PriceProtos.Price.newBuilder();
            builder.setVenueId(0);
            builder.setType(PriceProtos.Price.Type.SPOT).setBidPrice(10).setBidSize(100);
            int modulo = sequence % NUMBER_CURRENCY_PAIRS;
            generatePrice(builder, modulo);
            generateSize(builder, modulo);
            if (sequence == Integer.MAX_VALUE) {
                sequence = 0;
            } else {
                sequence++;
            }
            System.out.println("Sending: " + builder.getBidSize() + "@" + builder.getBidPrice());
            providerChannel.send(builder.build().toByteArray());
            try {
                Thread.sleep(waitTime);
            } catch (InterruptedException e) {
                Thread.interrupted();
            }
        }
    }

 

Note that serializable Java types are also supported for sending and receiving events but are strongly discouraged as they are not interoperable and incur a significant performance penalty.