...
When using the ZeroMQ binding, it is recommended to use a high-performance, interoperable serialization library such as Protocol Buffers. This is done by having producers publish a byte array and consumers receive a byte array. The following excerpt is taken from the FastQuote sample and shows a consumer that receives a byte array and decodes it as a protocol buffer:
Code Block | ||
---|---|---|
| ||
public class Gateway {
...
@Consumer(value = "providerChannel", sequence = 1)
public void onPrice(byte[] serialized) {
try {
PriceProtos.Price protoPrice = PriceProtos.Price.parseFrom(serialized);
double value = protoPrice.hasBidPrice() ? protoPrice.getBidPrice() : protoPrice.getAskPrice();
Price price = new Price(protoPrice.getSymbol(), value, correlationId);
pricingService.marginAndSend(price);
} catch (InvalidProtocolBufferException e) {
monitor.error(e);
}
}
} |
The following illustrates sending an encoded protocol buffer to a channel:
Code Block | ||
---|---|---|
| ||
@Composite
public class ProviderComponent implements Runnable {
...
@Producer
protected ProviderChanel providerChannel;
public void run() {
while (!shutdown.get()) {
PriceProtos.Price.Builder builder = PriceProtos.Price.newBuilder();
builder.setVenueId(0);
builder.setType(PriceProtos.Price.Type.SPOT).setBidPrice(10).setBidSize(100);
int modulo = sequence % NUMBER_CURRENCY_PAIRS;
generatePrice(builder, modulo);
generateSize(builder, modulo);
if (sequence == Integer.MAX_VALUE) {
sequence = 0;
} else {
sequence++;
}
System.out.println("Sending: " + builder.getBidSize() + "@" + builder.getBidPrice());
providerChannel.send(builder.build().toByteArray());
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
Thread.interrupted();
}
}
}
|
Note that serializable Java types are also supported for sending and receiving events but are strongly discouraged as they are not interoperable and incur a significant performance penalty.