Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When using the ZeroMQ binding, it is recommended to use a high-performance, interoperable serialization library such as Protocol Buffers. This is done by having producers publish a byte array and consumers receive a byte array. The following excerpt is taken from the FastQuote sample and shows a consumer that receives a byte array and decodes it as a protocol buffer:

Code Block
languagejava
public class Gateway {
    ...
 
    @Consumer(value = "providerChannel", sequence = 1)
    public void onPrice(byte[] serialized) {
        try {
            PriceProtos.Price protoPrice = PriceProtos.Price.parseFrom(serialized);
            double value = protoPrice.hasBidPrice() ? protoPrice.getBidPrice() : protoPrice.getAskPrice();
            Price price = new Price(protoPrice.getSymbol(), value, correlationId);
            pricingService.marginAndSend(price);
        } catch (InvalidProtocolBufferException e) {
            monitor.error(e);
        }
    }
} 

The following illustrates sending an encoded protocol buffer to a channel:

Code Block
languagehtml/xml
@Composite
public class ProviderComponent implements Runnable {
    ...
 
    @Producer
    protected ProviderChanel providerChannel;


    public void run() {
        while (!shutdown.get()) {
            PriceProtos.Price.Builder builder = PriceProtos.Price.newBuilder();
            builder.setVenueId(0);
            builder.setType(PriceProtos.Price.Type.SPOT).setBidPrice(10).setBidSize(100);
            int modulo = sequence % NUMBER_CURRENCY_PAIRS;
            generatePrice(builder, modulo);
            generateSize(builder, modulo);
            if (sequence == Integer.MAX_VALUE) {
                sequence = 0;
            } else {
                sequence++;
            }
            System.out.println("Sending: " + builder.getBidSize() + "@" + builder.getBidPrice());
            providerChannel.send(builder.build().toByteArray());
            try {
                Thread.sleep(waitTime);
            } catch (InterruptedException e) {
                Thread.interrupted();
            }
        }
    }

 

Note that serializable Java types are also supported for sending and receiving events but are strongly discouraged as they are not interoperable and incur a significant performance penalty.