Monday, 30 January 2012

Spring Integration - Splitter-Aggregator

Within Spring Integration, one form of EIP scatter-gather is provided by the splitter and aggregator constructs. Semantics for both of these are pretty straight forward to understand, the splitter receives an input message and returns a list of objects that are each turned into first-class messages. Those messages get routed to message handlers following which they are aggregated as a list of input messages by an aggregator. This pattern can be used successfully with fairly simple configuration.

The following example works with trivial message processing taking place. It uses simple one-fail-all-fail error handling semantics. This means that any errors occurring in one or both services will result in an exception being thrown back to the invoker.

Having worked on projects that require a far more robust solution following partial failure, I've written another post that includes details regarding a design strategy to support that - A Robust Splitter Aggregator Strategy <URL>.

Let's explore how this context configuration is working and what happens in circumstances where and exception is thrown by one of the services.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/integration
       http://www.springframework.org/schema/integration/spring-integration-2.1.xsd">

    <!--##############################################-->
    <!-- Bean specifications.  -->
    <!--##############################################-->
    <import resource="bean-refs.xml"/>

    <!--##############################################-->
    <!-- Gateway specification. -->
    <!--##############################################-->
    <int:gateway service-interface="com.l8mdv.sample.gateway.BrokerRequestGateway"/>
    <int:channel id="broker-request-channel"
                 datatype="com.l8mdv.sa.BrokerRequestMessage"/>

    <!--##############################################-->
    <!-- Request message splitter. -->
    <!--##############################################-->
    <int:splitter input-channel="broker-request-channel"
                  output-channel="broker-router-channel"
                  
                  ref="brokerQuoteRequestSplitter"/>

    <!--##############################################-->
    <!-- Request message routing. -->
    <!--##############################################-->
    <int:channel id="broker-router-channel"
                 datatype="com.l8mdv.sa.BrokerQuoteRequestMessage"/>
    <int:recipient-list-router input-channel="broker-router-channel">
        <int:recipient channel="openex-broker-channel"
                       selector-expression="payload.BrokerName.equals('openex')"/>
        <int:recipient channel="yahoo-broker-channel"
                       selector-expression="payload.BrokerName.equals('yahoo')"/>
    </int:recipient-list-router>

    <!--##############################################-->
    <!-- Request message routing to OpenEx. -->
    <!--##############################################-->
    <int:channel id="openex-broker-channel" datatype="com.l8mdv.sa.BrokerQuoteRequestMessage"/>
    <int:chain input-channel="openex-broker-channel"
               output-channel="aggregator-channel">
        <int:service-activator>
            <bean id="OpenExServiceFaker" class="com.l8mdv.sample.ServiceFaker">
                <constructor-arg name="response" ref="OpenExFakeResponseData"/>
            </bean>
        </int:service-activator>
    </int:chain>

    <!--##############################################-->
    <!-- Request message routing to Yahoo. -->
    <!--##############################################-->
    <int:channel id="yahoo-broker-channel"
                 datatype="com.l8mdv.sa.BrokerQuoteRequestMessage"/>
    <int:chain input-channel="yahoo-broker-channel"
               output-channel="aggregator-channel">
        <int:service-activator>
            <bean id="YahooServiceFaker" class="com.l8mdv.sample.ServiceFaker">
                <constructor-arg name="response" ref="YahooFakeResponseData"/>
            </bean>
        </int:service-activator>
    </int:chain>

    <!--##############################################-->
    <!-- Response message handling, return the best -->
    <!-- quote to the invoker. -->
    <!--##############################################-->
    <int:channel id="aggregator-channel" datatype="com.l8mdv.sa.BrokerQuoteResponseMessage"/>
    <int:chain input-channel="aggregator-channel">
        <int:aggregator/>
        <int:transformer ref="aggregationToBrokerQuoteResponseTransformer"/>
    </int:chain>

</beans>

Regarding the splitter and aggregator, the interesting configuration starts at line 25 with the splitter specification. The construct has been customised with input-channel, output-channel and ref attributes. The channel configuration for this construct is obvious, the other attribute allows association of a bean reference that is able to perform the split function. It's generally true that a splitter bean with appropriate arguments and return types will be invoked if it's unambiguous.

The next stop for messages is the router that's defined on line 35, the recipient list router. This message endpoint is able to forward route messages into appropriate channels given an expression to invoke on the payload of the message. This router will examine the payload and route each message to one of two services - these are located on lines 48 and 62. The service(s) that are invoked are entirely dependent on what is returned from the splitter. Either or both of these services may be invoked for a given splitter input message.

Finally, results of one or two service invocations are routed towards the aggregator, the configuration starts for this at the chain defined at line 74. Finally the result of the aggregation is input to a transformer where some further processing takes place. Notice that there is no output channel on this chain, the implicitly created default output channel is relied upon here - it goes back through the gateway to the gateway invoker.

There are some interesting aspects to this service:

  1. If the splitter at line 25 receives a message but does not generate a list of one or more response messages, then an empty list will result on the router not getting called. This can be overridden by using the requires-reply attribute in which case an empty list will result in a message handling exception being thrown.
  2. Strong typing has been used on the data channels in an attempt to enforce strict processing rules and make the configuration easier to follow and understand.
  3. The chain construct has been used in an attempt to keep configuration compact where useful. It should be noticed that chain definition and strong typing are often two sides of the same coin. By grouping the aggregator and transformer in a chain I have been unable to control and hence document message type input to the transformer endpoint.
  4. Spring beans referenced from within this context have been loaded from an external file. Whilst they could have been component-scanned or defined in the same file I have chosen to keep them distinct in order that they are not loaded if not necessary for operation - I'd usually create mock and spy objects around these message endpoints.
  5. Any exception in Service Activator invocation at lines 48 and 62 would result in aggregation not completing. In this example, I have not created an error handler on the gateway in line 18 and so any exceptions thrown by SAs would result in an exception being thrown to the invoker of the gateway. A more robust solution, in the face of exception handling, would require a different design approach.
  6. I have documented, albeit briefly, intent for each section of the configuration specifically in order to help readers understand intentions of my design.
In the case that they're useful, Spring bean definitions are as follows:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="brokerQuoteRequestSplitter"
          class="com.l8mdv.sample.service.BrokerQuoteRequestSplitter"/>

    <bean id="brokerRawResponseTransformer"
          class="com.l8mdv.sample.service.impl.BrokerRawResponseTransformer"/>

    <bean id="aggregationToBrokerQuoteResponseTransformer"
          class="com.l8mdv.sample.service.impl.AggregationToBrokerQuoteResponseTransformer"/>

    <bean id="YahooFakeResponseData" class="com.l8mdv.sa.BrokerResponseMessage">
        <property name="brokerName" value="Yahoo"/>
        <property name="value" value="2"/>
        <property name="brokerRequestType" value="#{T(com.l8mdv.sa.BrokerRequestType).QUOTE}"/>
    </bean>

    <bean id="OpenExFakeResponseData" class="com.l8mdv.sa.BrokerResponseMessage">
        <property name="brokerName" value="OpenEx"/>
        <property name="value" value="5"/>
        <property name="brokerRequestType" value="#{T(com.l8mdv.sa.BrokerRequestType).QUOTE}"/>
    </bean>

</beans>

The splitter code:

package com.l8mdv.sample.service;

import com.l8mdv.sa.BrokerQuoteRequestMessage;
import com.l8mdv.sa.BrokerRequest;
import com.l8mdv.sa.BrokerRequestMessage;
import com.l8mdv.sa.QuoteRequestSortPolicy;
import org.springframework.util.Assert;

import java.util.ArrayList;
import java.util.List;

public class BrokerQuoteRequestSplitter {

    public List<BrokerQuoteRequestMessage> split(BrokerRequestMessage message) {

        Assert.notNull(message, "Mandatory argument missing.");
        List requests = new ArrayList();

        for (BrokerRequest brokerRequest:
                message.getBrokerRequest()) {
            BrokerQuoteRequestMessage brokerQuoteRequestMessage
                    = new BrokerQuoteRequestMessage();
            brokerQuoteRequestMessage
                    .setBrokerName(brokerRequest.getBrokerName());
            brokerQuoteRequestMessage
                    .setQuoteRequestSortPolicy(QuoteRequestSortPolicy.BUY_LOWEST);
            requests.add(brokerQuoteRequestMessage);
        }

        return requests;
    }
}

The transformer:

package com.l8mdv.sample.service.impl;

import com.l8mdv.sa.BrokerQuoteResponseMessage;
import org.springframework.util.Assert;

import java.util.List;

public class AggregationToBrokerQuoteResponseTransformer {

    public BrokerQuoteResponseMessage
    transform(List<BrokerQuoteResponseMessage> serviceResponses) {

        Assert.notNull(serviceResponses, "Mandatory argument missing.");

        BrokerQuoteResponseMessage bestQuote = null;
        for (BrokerQuoteResponseMessage 
                brokerQuoteResponseMessage: serviceResponses) {
            if (bestQuote == null)
                bestQuote = brokerQuoteResponseMessage;
            else {
                if (brokerQuoteResponseMessage.getSellPrice()
                        .compareTo(bestQuote.getSellPrice()) > 0)
                    bestQuote = brokerQuoteResponseMessage;
            }
        }

        return bestQuote;
    }
}

and the integration test:

package com.l8mdv.sample;

import com.l8mdv.sa.BrokerQuoteResponseMessage;
import com.l8mdv.sa.BrokerRequest;
import com.l8mdv.sa.BrokerRequestMessage;
import com.l8mdv.sa.BrokerRequestType;
import com.l8mdv.sample.gateway.BrokerRequestGateway;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(
    locations = {"classpath:META-INF/spring/simple-splitter-aggregator.xml"}
)
public class SimpleSplitterAggregatorIntegrationTest {

    @Autowired
    public BrokerRequestGateway brokerRequestGateway;

    @Test
    public void run() throws Exception {
        BrokerRequestMessage requestMessage = new BrokerRequestMessage();

        BrokerRequest yahooRequest = new BrokerRequest();
        yahooRequest.setBrokerName("yahoo");
        yahooRequest.setBrokerRequestType(BrokerRequestType.QUOTE);
        requestMessage.getBrokerRequest().add(yahooRequest);

        BrokerRequest oxRequest = new BrokerRequest();
        oxRequest.setBrokerName("openex");
        oxRequest.setBrokerRequestType(BrokerRequestType.QUOTE);
        requestMessage.getBrokerRequest().add(oxRequest);

        BrokerQuoteResponseMessage response = 
           brokerRequestGateway.send(requestMessage);
    }

}


Resources:
  1. Spring Integration Reference Manual - Splitter Overview

No comments:

Post a Comment