/
Binding Services and References

Binding Services and References

A JMS provider can be used as the transport for one-way and request-response operations. A minimal one-way configuration is shown below:

@Component
public class Client ... {

    @JMS(value = @JMSConfiguration(destination = "ServiceQueue"))
    protected Service service;

    public String process() {
		// ...
        service.onReceive(message);
		// ...
    }
}
 
@Component
@JMS(value = @JMSConfiguration(destination = "ServiceQueue"))
public class ServiceImpl implements Service {

    public void onReceive(String message) {
       // ...
    }
}
 
public interface Service {
	@OneWay
	void onReceive(String message);
}

In XML, JMS bindings are configured as follows:

<component name="Client">
   <implementation.java class="..."/>
   <reference name="service">
      <binding.jms>
         <destination jndiName="ServiceQueue"/>
      </binding.jms>
   </reference>
</component>

<component name="Service">
   <implementation.java class="..."/>
   <service>
      <binding.jms>
         <destination jndiName="ServiceQueue"/>
      </binding.jms>
   </service>
</component>

The above configuration uses the "ServiceQueue" queue to propagate messages. Depending on the JMS provider, it may also be necessary to specify a connection factory name (see below).

Non-Blocking Operations

When writing asynchronous Java components, it is important to remember the @OneWay annotation. If a method is not marked with @OneWay, it will be taken as a request-response operation even if the return value is void. This means the operation will block until a response message is received.  

Configuring request-response operations involves specifying a separate response queue in addition to the forward queue:

@Component
public class Client ... {

    @JMS(value = @JMSConfiguration(destination = "ServiceQueue", responseDestination="ResponseQueue"))
    protected Service service;

    public void process() {
		// ...
        String response = service.onReceive(message);
		// ...
    }
}
 
@Component
@JMS(value = @JMSConfiguration(destination = "ServiceQueue", responseDestination="ResponseQueue"))
public class ServiceImpl implements Service {

    public String onReceive(String message) {
       // ...
	   return response;
    }
}
 
public interface Service {
	
	String onReceive(String message);
}
<component name="Client">
   <implementation.java class="..."/>
   <reference name="service">
      <binding.jms>
         <destination name="ServiceQueue"/>
         <response>
            <destination jndiName="ResponseQueue"/>
         </response>
      </binding.jms>
   </reference>
</component>

<component name="Service">
   <implementation.java class="..."/>
   <service>
      <binding.jms>
         <destination name="ServiceQueue"/>
         <response>
            <destination jndiName="ResponseQueue"/>
         </response>
      </binding.jms>
   </service>
</component>

Using Callbacks

While JMS is an asynchronous model, it is important to note that the client component will block on request-response operations until a response is received. In some cases, this is the desired behavior. In other situations, such as long-running interactions, looser coupling is required where the client can continue processing without waiting for a response to be returned. Callbacks can be used to provide responses at some later point in time. Configuring callbacks involves specifying a callback queue:

 

@Component
public class CallbackClientImpl implements Client, ConsumerCallback {


    @JMS(value = @JMSConfiguration(destination = "ServiceQueue"), callback = @JMSConfiguration(destination = "CallbackQueue"))
    protected Service service;


    public void invoke(String message) {
        service.onReceive(message);
    }

    public void onResponse(String message) {
		// ...
    }
}


public interface ConsumerCallback {
    @OneWay
    void onResponse(String message);
}


@Component
@JMS(value = @JMSConfiguration(destination = "ServiceQueue"), callback = @JMSConfiguration(destination = "CallbackQueue"))
public class ServiceImpl implements Service {


    @Callback
    protected ConsumerCallback callback;

    public void onReceive(String message) {
        callback.onResponse(message);
    }
}
 
public interface Service {
	@OneWay
	void onReceive(String message);
}

And in XML:

<component name="CallbackClient">
   <implementation.java class="..."/>
   <reference name="service">
      <binding.jms>
         <destination name="ServiceQueue"/>
      </binding.jms>
      <callback>
         <binding.jms>
            <destination name="CallbackQueue"/>
         </binding.jms>
      </callback>
   </reference>
</component>

<component name="CallbackService">
   <implementation.java class="..."/>
   <service>
      <binding.jms>
         <destination name="ServiceQueue"/>
      </binding.jms>
      <callback>
         <binding.jms>
            <destination name="CallbackQueue"/>
         </binding.jms>
      </callback>
   </service>
</component>

When the CallbackClient invokes the CallbackService, the call will return immediately. At some later point in time, a reponse will be delivered asynchronously using the "callbackQueue" queue.

Specifying Connection Factories

The previous examples assumed the JMS provider did not require the connection factory to be specified for simplicity. However, most JMS providers will require the connection factory to be specified using the conectionFactory element:

<component name="CallbackClient">
   <implementation.java class="..."/>
   <reference name="service">
      <binding.jms>
         <connectionFactory jndiName="TheConnectionFactory"/>
         <destination name="serviceQueue"/>
      </binding.jms>
   </reference>
</component>

Dynamically Creating Queues

The JMS binding can also be configured to create queues dynamically by using the create attribute on the destination element and setting it to "ifnotexist" or "always".

Queues vs. Topics

By default, the JMS binding uses Queues to reference and service bindings. While it is possible to specify a Topic using the type="topic" attribute on the destination element, this should be avoided as doing so may have unintended effects. Specifically, if a service is clustered and bound to a topic, all service replicas in the zone will receive copies of an invocation message. With Queues, only one clustered service replica will receive a copy, which is in most cases the correct behavior for service interactions.

Wire Formats

The JMS binding supports multiple wire formats including object serialization, JMS message types, and JAXB serialization. If a parameter type is annotated with the JAXB @XmlRootElement annotation, parameters will be sent as XML using a JMS text message. Otherwise, the JMS binding will introspect the parameter types and select the most appropriate message type (e.g. object, bytes, etc).

Transacted Messaging

Fabric3 supports XA transacted messaging. This is useful when a message must be reliably sent in conjunction with a database update. To enable transacted messaging, use the transactedOneWay intent on the JMS binding:

<component name="TheComponent">
   <implementation.java class="..."/>
   <reference name="service">
      <binding.jms requires="transactedOneWay">
         <destination jndiName="TheQueue"/>
      </binding.jms>
   </reference>
</component>

The above will enqueue a message transactionally with the message provider. The component implementation is as follows:

public class TheComponent implements ... {

   @Reference
   protected Service service;

   public void operation() {
      Message message = ...
      service.invoke(message);
   }

}

If the above implementation also needed to update a database using Hibernate in the same transaction as the message enque, it could be modified as follows to use the @ManagedTransaction annotation:

@ManagedTransaction
public class TheComponent implements ... {

   @PersisitenceContext(unitName = "employee")
   Session session;

   @Reference
   protected Service service;

   public void operation() {
      Message message = ...
      long id = message.getId();

      Entity entity = session.find(Entity.class, id);
      entity.update(message.getUpdate());
      service.invoke(message);
   }
}

Transacted Messaging and Request-Response
In a word: don't try it. Transacted messaging will not work with request reply. To see why, consider the following:

@ManagedTransaction
public class TheComponent implements ... {

   @Reference
   protected Service service;

   public void operation() {
      Message message = ...
      Response response = service.invoke(message);
   }
}

The forward transaction will not commit until after the response is received and the operation() method has returned. However, the outgoing message will not be enqueued until the transaction commits. This means that the message can not be sent and consequently a response will never be received. The result will be a message timeout.

Receiving Responses with Transacted Messaging 

If you need to receive a response and require transacted messaging, use a callback. The following shows how this is configured: 

<component name="TheComponent">
   <implementation.java class="..."/>
   <reference name="service">
      <binding.jms requires="transactedOneWay">
         <destination jndiName="TheQueue"/>
      </binding.jms>
      <callback>
         <binding.jms requires="transactedOneWay">
            <destination jndiName="TheCallbackQueue"/>
         </binding.jms>
      </callback>
  </reference>
</component>

Service Auto Scaling

Fabric3 supports autoscaling where the number of message listeners for a service endpoint are dynamically resized based on workload. As load increases, Fabric3 will create more queue listeners. Conversely, as load decreases, listeners will be removed, thereby freeing kernel threads to perform other work. 

The following are the autoscaling attributes which can be configured per endpoint as attributes on binding.jms:

  • max.receivers - Sets the maximum number of receivers to create
  • min.receivers - Sets the minimum number of receivers to retain
  • idle.limit - Sets the number of times a receiver can be marked idle during its execution window before it is removed from the work scheduler. Note if the maximum number of receivers is set to one, the single receiver will not be removed. Likewise, if a minimum number of receivers is set, idle receivers will not be removed if that threshold is reached.
  • receive.timeout - Set the time in milliseconds the message receive operation should block for.
  • max.messages - Sets the maximum number of messages to process by a receiver during its execution window. The default is unlimited, which results in the receiver processing messages until shutdown. Setting the number lower increases the ability of the work scheduler to adjust scheduling at the expense of thread context switches as receivers may be scheduled on different threads.
  • recovery.interval - Sets the time in milliseconds to wait while making repeated recovery attempts.

Pausing on Startup

Starting with Fabric3 1.9.5, JMS listeners can be configured to pause (i.e. not receive messages) when the runtime is booted. The listeners can then be started using an HTTP POST operation via the REST Management API. To configure listeners to pause on startup, use the jms/@pause.on.start attribute in systemConfig.xml:

<config>  
    <jms pause.on.start="true"/>
    <!-- ... -->
</config>

The Fabric3 Admin CLI can then be used to start the listeners. Alternatively, a script can issue an HTTP POST operation. Note that if the POST is made against the zone cluster address, Fabric3 will replicate the management change to all runtimes in the zone (cluster). The following is an example CLI session that starts all listeners:

 //Goes ("follows" the link) to zone2 where the listeners are deployed. Substitute the appropriate zone name.

f zone2                    

// Puts the value "jms" to the zone/runtime/transports/resume resource. 
// Since this is a zone address, the changes will be replicated throughout the cluster.

p zone/runtime/transports/resume jms   

Alternatively, a script could POST the "jms" value serialized in the JSON format to http://<zone leader>/management/zone/runtime/transports/resume. Listeners on individual runtime instances can be started by using the direct runtime address as opposed to the zone address, e.g.:

p runtime/transports/resume jms