...
The following excerpt shows how this channel binding is configured using XML:
Code Block | ||||
---|---|---|---|---|
| ||||
<composite name="ProducerCompositeChannelComposite" ...> <component<channel name="TestProducerTestChannel"> <f3:binding.zeromq/> </channel> <implementation.java class="..."/> </composite> |
Consuming External Event Streams
The ZeroMQ binding can be used to connect a channel to an external event stream published by one or more external ZeroMQ publishers:
To bind a channel to an external source, the ZeroMQ binding is configured with the IP address and port of the event source socket. If more than one source publishes events, multiple addresses can be specified as a space-delimited list:
Code Block | ||||
---|---|---|---|---|
| ||||
<composite ...> <channel name="TestChannel"> <producer name="channel" target="TestChannel <f3:binding.zeromq addresses="192.0.2.1:12001 192.0.2.2:12001""/> </component>channel> </composite> <composite name="ChannelComposite" |
Publishing to External Event Streams
The ZeroMQ binding can also be used to publish events to external ZeroMQ subscribers:
To publish events to external subscribers, an IP address and port are specified as part of the ZeroMQ binding configuration:
Code Block | ||||
---|---|---|---|---|
| ||||
<composite ...> <channel name="TestChannel"> <f3:binding.zeromq addresses="192.0.2.1:12001"/> </channel> </composite> <composite name="ConsumerComposite" ...> |
Serializing Message Payloads
When using the ZeroMQ binding, it is recommended to use a high-performance, interoperable serialization library such as Protocol Buffers. This is done by having producers publish a byte array and consumers receive a byte array. The following excerpt is taken from the FastQuote sample and shows a consumer that receives a byte array and decodes it as a protocol buffer:
Code Block | ||
---|---|---|
| ||
public class Gateway { ... @Consumer(value = "providerChannel", sequence = 1) public void onPrice(byte[] serialized) { try { PriceProtos.Price protoPrice = PriceProtos.Price.parseFrom(serialized); double value = protoPrice.hasBidPrice() ? protoPrice.getBidPrice() : protoPrice.getAskPrice(); <componentPrice price name="TestConsumer">= new Price(protoPrice.getSymbol(), value, correlationId); pricingService.marginAndSend(price); } catch (InvalidProtocolBufferException e) { <implementation.java class="..."/>monitor.error(e); } } } |
The following illustrates sending an encoded protocol buffer to a channel:
Code Block | ||
---|---|---|
| ||
@Composite public class ProviderComponent implements Runnable { ... @Producer protected ProviderChanel providerChannel; public void run() { while (!shutdown.get()) { PriceProtos.Price.Builder builder = PriceProtos.Price.newBuilder(); builder.setVenueId(0); builder.setType(PriceProtos.Price.Type.SPOT).setBidPrice(10).setBidSize(100); int modulo = sequence % NUMBER_CURRENCY_PAIRS; generatePrice(builder, modulo); generateSize(builder, modulo); if (sequence == Integer.MAX_VALUE) { <consumersequence name="channel" source="TestChannel"/> </component> </composite> |
Connecting to External Event Streams
...
= 0;
} else {
sequence++;
}
System.out.println("Sending: " + builder.getBidSize() + "@" + builder.getBidPrice());
providerChannel.send(builder.build().toByteArray());
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
Thread.interrupted();
}
}
}
|
Note that serializable Java types are also supported for sending and receiving events but are strongly discouraged as they are not interoperable and incur a significant performance penalty.
Accessing Headers
It is possible for a consumer to access a header (ZeroMQ frame) by declaring a two dimensional byte array as a parameter type on the consumer method:
Code Block | ||
---|---|---|
| ||
public class Gateway {
...
@Consumer
public void onPrice(byte[] header, byte[] serialized) {
...
}
} |
This is useful when receiving messages from external ZeroMQ publishes that send multi-frame messages.
Performance Considerations
When using the ZeroMQ binding with channels, it is strongly recommended to use ring buffer channels as described in the next section. Ring buffer channels provide low contention and allow business logic to be isolated on a thread that is separate from IO operations. If ring buffer channels are not used (i.e. default channels), ZeroMQ consumers will be invoked on the same thread as the socket read. In some situations, this may be the desired result and perform adequately.
In addition, it is highly recommended to use the bytecode proxy generation extension (Maven coordinates: org.fabric3:fabric3-bytecode-proxy) to avoid the overhead and object creation associated with JDK proxies.
Port Allocation
The ZeroMQ binding attempts to acquire ports for sockets from the block configured for a particular runtime or zone. For information on how to configure a port block, see Port Allocation.