A JMS provider can be used as the transport for one-way and request-response operations. A minimal one-way configuration is shown below:
Code Block |
---|
|
@Component
public class Client ... {
@JMS(value = @JMSConfiguration(destination = "ServiceQueue"))
protected Service service;
public String process() {
// ...
service.onReceive(message);
// ...
}
}
@Component
@JMS(value = @JMSConfiguration(destination = "ServiceQueue"))
public class ServiceImpl implements Service {
public void onReceive(String message) {
// ...
}
}
public interface Service {
@OneWay
void onReceive(String message);
} |
In XML, JMS bindings are configured as follows:
Code Block |
---|
|
<component name="OneWayClientClient">
<implementation.java class="..."/>
<reference name="service">
<binding.jms>
<destination jndiName="serviceQueueServiceQueue"/>
</binding.jms>
</reference>
</component>
<component name="OneWayServiceService">
<implementation.java class="..."/>
<service>
<binding.jms>
<destination jndiName="serviceQueueServiceQueue"/>
</binding.jms>
</service>
</component>
|
The above configuration uses the "serviceQueueServiceQueue" queue to propagate messages. Depending on the JMS provider, it may also be necessary to specify a connection factory name (see below).
...
Configuring request-response operations involves specifying a separate response queue in addition to the forward queue:
Code Block |
---|
|
@Component
public class Client ... {
@JMS(value = @JMSConfiguration(destination = "ServiceQueue", responseDestination="ResponseQueue"))
protected Service service;
public void process() {
// ...
String response = service.onReceive(message);
// ...
}
}
@Component
@JMS(value = @JMSConfiguration(destination = "ServiceQueue", responseDestination="ResponseQueue"))
public class ServiceImpl implements Service {
public String onReceive(String message) {
// ...
return response;
}
}
public interface Service {
String onReceive(String message);
} |
Code Block |
---|
|
<component name="RequestResponseClientClient">
<implementation.java class="..."/>
<reference name="service">
<binding.jms>
<destination name="serviceQueueServiceQueue"/>
<response>
<destination jndiName="responseQueueResponseQueue"/>
</response>
</binding.jms>
</reference>
</component>
<component name="RequestResponseServiceService">
<implementation.java class="..."/>
<service>
<binding.jms>
<destination name="serviceQueueServiceQueue"/>
<response>
<destination jndiName="responseQueueResponseQueue"/>
</response>
</binding.jms>
</service>
</component>
|
...
While JMS is an asynchronous model, it is important to note that the client component will block on request-response operations until a response is received. In some cases, this is the desired behavior. In other situations, such as long-running interactions, looser coupling is required where the client can continue processing without waiting for a response to be returned. Callbacks can be used to provide responses at some later point in time. Configuring callbacks involves specifying a callback queue:
Code Block |
---|
|
@Component
public class CallbackClientImpl implements Client, ConsumerCallback {
@JMS(value = @JMSConfiguration(destination = "ServiceQueue"), callback = @JMSConfiguration(destination = "CallbackQueue"))
protected Service service;
public void invoke(String message) {
service.onReceive(message);
}
public void onResponse(String message) {
// ...
}
}
public interface ConsumerCallback {
@OneWay
void onResponse(String message);
}
@Component
@JMS(value = @JMSConfiguration(destination = "ServiceQueue"), callback = @JMSConfiguration(destination = "CallbackQueue"))
public class ServiceImpl implements Service {
@Callback
protected ConsumerCallback callback;
public void onReceive(String message) {
callback.onResponse(message);
}
}
public interface Service {
@OneWay
void onReceive(String message);
} |
And in XML:
Code Block |
---|
|
<component name="CallbackClient">
<implementation.java class="..."/>
<reference name="service">
<binding.jms>
<destination name="serviceQueueServiceQueue"/>
</binding.jms>
<callback>
<binding.jms>
<destination name="callbackQueueCallbackQueue"/>
</binding.jms>
</callback>
</reference>
</component>
<component name="CallbackService">
<implementation.java class="..."/>
<service>
<binding.jms>
<destination name="serviceQueueServiceQueue"/>
</binding.jms>
<callback>
<binding.jms>
<destination name="callbackQueueCallbackQueue"/>
</binding.jms>
</callback>
</service>
</component>
|
...
The previous examples assumed the JMS provider did not require the connection factory to be specified for simplicity. However, most JMS providers will require the connection factory to be specified using the conectionFactory element:
Code Block |
---|
|
<component name="CallbackClient">
<implementation.java class="..."/>
<reference name="service">
<binding.jms>
<connectionFactory jndiName="TheConnectionFactory"/>
<destination name="serviceQueue"/>
</binding.jms>
</reference>
</component>
|
...
Fabric3 supports XA transacted messaging. This is useful when a message must be reliably sent in conjunction with a database update. To enable transacted messaging, use the transactedOneWay intent on the JMS binding:
Code Block |
---|
|
<component name="TheComponent">
<implementation.java class="..."/>
<reference name="service">
<binding.jms requires="transactedOneWay">
<destination jndiName="TheQueue"/>
</binding.jms>
</reference>
</component>
|
The above will enqueue a message transactionally with the message provider. The component implementation is as follows:
Code Block |
---|
|
public class TheComponent implements ... {
@Reference
protected Service service;
public void operation() {
Message message = ...
service.invoke(message);
}
}
|
If the above implementation also needed to update a database using Hibernate in the same transaction as the message enque, it could be modified as follows to use the @ManagedTransaction annotation:
Code Block |
---|
|
@ManagedTransaction
public class TheComponent implements ... {
@PersisitenceContext(unitName = "employee")
Session session;
@Reference
protected Service service;
public void operation() {
Message message = ...
long id = message.getId();
Entity entity = session.find(Entity.class, id);
entity.update(message.getUpdate());
service.invoke(message);
}
}
|
Info |
---|
title |
Transacted Messaging and Request-Response
In a word: don't try it. Transacted messaging will not work with request reply. To see why, consider the following:
Code Block |
---|
|
@ManagedTransaction
public class TheComponent implements ... {
@Reference
protected Service service;
public void operation() {
Message message = ...
Response response = service.invoke(message);
}
}
|
The forward transaction will not commit until after the response is received and the operation() method has returned. However, the outgoing message will not be enqueued until the transaction commits. This means that the message can not be sent and consequently a response will never be received. The result will be a message timeout.
Receiving Responses with Transacted Messaging
If you need to receive a response and require transacted messaging, use a callback. The following shows how this is configured:
Code Block |
---|
|
<component name="AsyncClientServiceTheComponent">
<implementation.java class="..."/>
<reference name="service">
<binding.jms requires="transactedOneWay">
<destination jndiName="TheQueue"/>
</binding.jms>
<callback>
<binding.jms requires="transactedOneWay">
<destination jndiName="asyncClientServiceCallbackQueueTheCallbackQueue"/>
</binding.jms>
</callback>
</reference>
</component>
|
...
Service Auto Scaling
Fabric3 supports autoscaling where the number of message listeners for a service endpoint are dynamically resized based on workload. As load increases, Fabric3 will create more queue listeners. Conversely, as load decreases, listeners will be removed, thereby freeing kernel threads to perform other work.
The following are the autoscaling attributes which can be configured per endpoint as attributes on binding.jms:idle.limit
transaction.timout
receive.timeout
max.messages
recovery.interval
max.receivers
min.receivers
- max.receivers - Sets the maximum number of receivers to create
- min.receivers - Sets the minimum number of receivers to retain
- idle.limit - Sets the number of times a receiver can be marked idle during its execution window before it is removed from the work scheduler. Note if the maximum number of receivers is set to one, the single receiver will not be removed. Likewise, if a minimum number of receivers is set, idle receivers will not be removed if that threshold is reached.
- receive.timeout - Set the time in milliseconds the message receive operation should block for.
- max.messages - Sets the maximum number of messages to process by a receiver during its execution window. The default is unlimited, which results in the receiver processing messages until shutdown. Setting the number lower increases the ability of the work scheduler to adjust scheduling at the expense of thread context switches as receivers may be scheduled on different threads.
- recovery.interval - Sets the time in milliseconds to wait while making repeated recovery attempts.
Pausing on Startup
Starting with Fabric3 1.9.5, JMS listeners can be configured to pause (i.e. not receive messages) when the runtime is booted. The listeners can then be started using an HTTP POST operation via the REST Management API. To configure listeners to pause on startup, use the jms/@pause.on.start attribute in systemConfig.xml:
Code Block |
---|
|
<config>
<jms pause.on.start="true"/>
<!-- ... -->
</config>
|
The Fabric3 Admin CLI can then be used to start the listeners. Alternatively, a script can issue an HTTP POST operation. Note that if the POST is made against the zone cluster address, Fabric3 will replicate the management change to all runtimes in the zone (cluster). The following is an example CLI session that starts all listeners:
Code Block |
---|
//Goes ("follows" the link) to zone2 where the listeners are deployed. Substitute the appropriate zone name.
f zone2
// Puts the value "jms" to the zone/runtime/transports/resume resource.
// Since this is a zone address, the changes will be replicated throughout the cluster.
p zone/runtime/transports/resume jms
|
Alternatively, a script could POST the "jms" value serialized in the JSON format to http://<zone leader>/management/zone/runtime/transports/resume. Listeners on individual runtime instances can be started by using the direct runtime address as opposed to the zone address, e.g.:
Code Block |
---|
p runtime/transports/resume jms
|