Spring Integration Samples

Mark Fisher

In my recent post, I had mentioned that the Subversion repository for Spring Integration would be publicly accessible soon, and I'm pleased to provide that link now. You can checkout the project with the following command:

svn co https://anonsvn.springframework.org/svn/spring-integration/base/trunk spring-integration

If the checkout is successful, you should see the following directory structure:

spring-integration/
+–build-spring-integration/
+–spring-build/
+–spring-integration-core/
+–spring-integration-samples/

I would like to take this opportunity to walk through a couple of the samples that are in 'spring-integration-samples'. Keep in mind this project is definitely a work-in-progress (currently a 0.5 SNAPSHOT), but the samples should give you an idea of how the programming model is taking shape, and I'm very much looking forward to getting some feedback.

Hello World

The first sample is the obligatory "Hello World" demo. This one demonstrates the main components: Message Channel and Message Endpoint. This demo also reveals how the Spring Integration approach is non-invasive - providing a complete separation of concerns between business logic and messaging. In this case, the "business logic" is a simple HelloService:

public class HelloService {

    public String sayHello(String name) {
        return "Hello " + name;
    }
}

This example uses an XML-based configuration for the Message Endpoint (we'll see the annotation approach next):

<endpoint input-channel="inputChannel"
               default-output-channel="outputChannel"
               handler-ref="helloService"
               handler-method="sayHello"/>

There you see that 'handler-ref' simply points to a Spring-managed bean. If you have used Spring's MessageListenerAdapter for asynchronous JMS reception, then this should look familiar - especially if you are using Spring 2.5's new jms namespace and the "jms:listener" element. Finally, the HelloWorldDemo starts the application context and then interacts with the channels:

ChannelRegistry channelRegistry = (ChannelRegistry) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
MessageChannel inputChannel = channelRegistry.lookupChannel("inputChannel");
MessageChannel outputChannel = channelRegistry.lookupChannel("outputChannel");
inputChannel.send(new StringMessage(1, "World"));
System.out.println(outputChannel.receive().getPayload());

That example involves lookup of the MessageBus bean - which implements the ChannelRegistry interface. However, in a non-demo "real world" scenario, any component that would access channels can have the registry provided via dependency injection. All it needs to do is implement ChannelRegistryAware (or use @Autowired). This is the same approach used elsewhere in Spring - such as ApplicationEventPublisherAware.

Annotation-driven Endpoint and Subscriber

The next example shows how to configure a Message Endpoint with annotations. In fact, this particular endpoint even provides the data that is translated (behind the scenes) into Message payload content with the @Polled method annotation. It could alternatively provide an input channel for receiving messages asynchronously.

@MessageEndpoint(defaultOutput="quotes")
public class QuotePublisher {

    @Polled(period=300)
    public Quote getQuote() {
        BigDecimal price = new BigDecimal(new Random().nextDouble() * 100);
        return new Quote(generateTicker(), price.setScale(2, RoundingMode.HALF_EVEN));
    }

    private String generateTicker() {
        // randomly generates 3-letter tickers
    }
}

On the receiving side, there is a @Subscriber annotation:

public class QuoteSubscriber {

    @Subscriber(channel="quotes")
    public void log(Object o) {
        System.out.println(o);
    }
}

Here is the XML which registers the annotation post-processor and the 2 Spring-managed beans (notice that this example is using the 'spring-integration' schema as the primary namespace.

<beans:beans xmlns="http://www.springframework.org/schema/integration"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:beans="http://www.springframework.org/schema/beans"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                http://www.springframework.org/schema/integration
                http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"
>

    <message-bus/>

    <annotation-driven/>

    <channel id="quotes"/>

    <beans:bean id="publisher" class="org.springframework.integration.samples.quote.QuotePublisher"/>

    <beans:bean id="subscriber" class="org.springframework.integration.samples.quote.QuoteSubscriber"/>

</beans:beans>

By the way, the 'annotation-driven' element also enables the @Publisher annotation which triggers the creation of AOP advice for asynchronously sending the return value of any annotated method to a channel.

Simple Routing

The routing sample features a Message Endpoint that produces an incrementing integer every 3 seconds and a router that resolves the target channel name (note that a router method can also return more than one result and can return actual MessageChannel instances rather than names).

@MessageEndpoint
public class Counter {

    private AtomicInteger count = new AtomicInteger();

    @Polled(period=3000)
    public int getNumber() {
        return count.incrementAndGet();
    }

    @Router
    public String resolveChannel(int i) {
        if (i % 2 == 0) {
            return "even";
        }
        return "odd";
    }
}

On the receiving end of these channels, we have 2 different methods that simply log the message payload:

@Component
public class NumberLogger {

    @Subscriber(channel="even")
    public void even(int i) {
        System.out.println("even: " + i);
    }

    @Subscriber(channel="odd")
    public void odd(int i) {
        System.out.println("odd:  " + i);
    }
}

By the way, notice that the NumberLogger is annotated with Spring's @Component. The @MessageEndpoint annotation also contains @Component as a meta-annotation. Both are therefore "stereotypes" and eligible for autodetection with Spring 2.5's classpath-scanning. The XML for this example is extremely simple:

<context:component-scan base-package="org.springframework.integration.samples.oddeven"/>

<message-bus auto-create-channels="true"/>

<annotation-driven/>



Hopefully this provides a decent introduction to the Spring Integration programming model. Feel free to checkout the code and try out these samples yourself. I am currently working on a "Getting Started" guide that I will make available after the holidays. If you do checkout the code, please be sure to update frequently. The code is constantly evolving, and in particular I am refactoring much of the core consumer/dispatcher code with the goal of providing the simplest possible extension points for adding either polling or event-driven message sources. In the next blog post, I plan to show some new additions to the 'spring-integration-samples' featuring those extension points.

 

28 responses


  1. After playing with the sample applications, I tried to setup a common file batching processing
    scenario using InboundFileAdapter:

    then I found a bug in AbstractChannelAdapter:

    public Message receive(long timeout) {
    if (!this.initialized) {
    throw new MessageHandlingException("adapter not initialized");
    }

    ExecutorService executor = Executors.newSingleThreadExecutor();

    try {
    Future result = executor.submit(new Callable() {
    public Message call() throws Exception {
    return receive();
    }
    });
    try {
    result.get(timeout, TimeUnit.MILLISECONDS);
    if (result.isDone()) {
    return result.get();
    }
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    return null;
    } catch (TimeoutException e) {
    return null;
    } catch (ExecutionException e) {
    throw new MessageHandlingException("Exception occurred in message source", e);
    }
    result.cancel(true);

    } finally {
    executor.shutdown();
    }
    return null;
    }

    the executor is not shutdown properly (I have added the finally clause).

    I can not find the support forum for Spring Integration yet, so I raise the bug here.


  2. Thank you Steven. I committed the change, and I also adjusted EndpointExecutor so that it implements Lifecycle and is properly shutdown as well. That AbstractChannelAdapter is one of the components that will be replaced or significantly refactored shortly. Individual components will ultimately be exposing tasks to be scheduled on an executor rather than creating a new one within the class. Thanks again, -Mark


  3. Hi Mark,

    Your blog entry was very helpful for me in checking out SpringIntegration source and running the samples w/o any problems. This is a very good introduction to a new framework like SpringIntegration. I would love to see more advanced examples on how to use SI and may be implementation of some of the EIS patterns discussed in Gregor Hohpe's book.

    Great job.

    Thanks
    Srini


  4. Thank you Srini. I am definitely planning to include more complex (and more complete) samples, such as the Loan Broker example from the Enterprise Integration Patterns book. Many other patterns will also be included in the framework, and as those are added I will continue to post samples on this blog. By the time of the first non-snapshot release (target is January 11th), we will be providing reference documentation and a "Getting Started" guide as well. I do hope to have an early version of the "Getting Started" guide available by the end of next week.

    Cheers,
    -Mark


  5. Hi, Mark,

    I am very interested in the bright future of Spring Integration and we will base our solution on this new project(We have already integrated Spring Batch and are not afraid of being cutting edge.)

    When I digged deeper with current release, I find another issue in class MessageBus:

    public void registerEndpoint(String name, MessageEndpoint endpoint) {
    Assert.notNull(name, "'name' must not be null");
    Assert.notNull(endpoint, "'endpoint' must not be null");
    this.endpoints.put(name, endpoint);
    if (logger.isInfoEnabled()) {
    logger.info("registered endpoint '" name "'");
    }
    endpoint.setChannelRegistry(this);
    if (endpoint.getInputChannelName() != null && endpoint.getConsumerPolicy() != null) {
    Subscription subscription = new Subscription();
    subscription.setChannel(endpoint.getInputChannelName());
    subscription.setEndpoint(name);
    subscription.setPolicy(endpoint.getConsumerPolicy());
    this.activateSubscription(subscription);
    }
    if (this.autoCreateChannels) {
    String defaultOutputChannelName = endpoint.getDefaultOutputChannelName();
    if (StringUtils.hasText(defaultOutputChannelName)) {
    this.registerChannel(defaultOutputChannelName, new PointToPointChannel());
    }
    }
    }

    when autoCreateChannels is set and defaultOutputChannelName has text, the channel with defaultOutputChannelName should first be lookuped in the channel registry, if failed then fall back to create a new one.

    -Steven


  6. Steven, I just added the registry lookup attempt for the output channel name there. Thanks for catching that. By the way, Spring Integration's Jira should be publicly available by the end of the week, and I will post a comment here with that link. -Mark


  7. Hi Mark,

    First of all, thanks for this nice intro into your framework.
    After running your examples w/o any problems and trying to understand the code so far, I'm
    wondering if it could make sense to extend the XML-element like his:

    Defines a consumer policy.

    where bean-ref points to a policy-bean, somthing like:

    I gave it a try and changed the EndpointParser in parseConsumer() method:

    String beanName = element.getAttribute(POLICY_NAME_ATTRIBUTE);
    if (StringUtils.hasText(beanName)) {
    parserContext.registerBeanComponent(
    new BeanComponentDefinition(consumerDef, beanName));
    } else {
    beanName = parserContext.getReaderContext().generateBeanName(consumerDef);
    parserContext.registerBeanComponent(
    new BeanComponentDefinition(consumerDef, beanName));
    }
    return beanName;

    and it seems to work quite well.
    Any comments or restrictions on doing it this way?
    Would your recomment an other solution for this kind of problem?

    kind regards
    -peter


  8. oops, sorry,

    the XML-code isn't shown in my previous post.

    So what I've done is this:

    I extended the "consumer" xml-schema-element definition by a new attribute "bean-ref" of type xsd:string,
    which is the bean-id of an bean of type org.springframework.integration.bus.ConsumerPolicy.
    This way I can reuse a Policy for different endpoints.
    I would like to use this especially for different consumer threadpools.

    again kind regards
    -peter


  9. Peter,

    Thank you for the excellent idea. I will add a 'ref' for consumer - perhaps as just a 'consumer' attribute on the main endpoint element. By the way, I have also been considering a simplification for using "common" policies, such as a basic "PollingPolicy" that simply accepts a period (poll interval) and uses typical settings for the other parameters (1 concurrent poller, 0 timeout, etc).

    Mark


  10. Hi Mark,

    thanks for your answere.
    Concerning common policies, I think one major part is missing.
    In my opinion for the policy a boolean flag (or something different, but with same semantics) is
    "isIndempotent", …. maybe as implementation of a Marker-interface.
    A value of "true" will guarantee (surely, with the necessary locic implemented) that a record is retrieved only once.

    peter


  11. Hi Mark,

    I have been playing around with the samples and I tried to create some nice examples myself.
    Today I kicked off a SVN update and when I came back I was surprised to see that SVN deleted the project content. :(
    When I try to access the repository I get an error saying the repository is no longer available.

    Is the latest version available in a different location? Or can you provide me the sources in some other way?
    It would be much appreciated.

    Kind regards,
    Roel


  12. Oeps, never mind I figured it out.
    The URL for a checkout should now be:
    https://anonsvn.springframework.org/svn/spring-integration/trunk/
    instead of
    https://anonsvn.springframework.org/svn/spring-integration/base/trunk

    Cheers,
    Roel


  13. Roel,

    I apologize for not leaving a comment here earlier. As you have now discovered (rather abruptly), the SVN repository was restructured yesterday. It is now flattened such that 'trunk' is directly under the root. You may also notice that a "spring-integration-reference" project has been added, and there you will see gradual creation of the reference guide (although it's empty as of right now). All of these changes are occurring as we approach the proposed 1.0 M1 release on January 18th. I will be posting another blog entry next week that includes more information about the reference guide as well as the Jira instance which will soon be publicly accessible. That would provide a good arena for sharing your samples if you are interested. In my blog post next week, I will also highlight a couple more samples. One (FileCopyDemo) has already been added within the "spring-integration-samples" project, and I intend to include a JMS sample as well. I am currently wrapping up the namespace support for the JMS adapters.

    Mark


  14. Hi Mark,

    Thanks for giving us an update on what you are working on.
    I will check up on your progress regularly and checkout the new stuff.
    I've been playing around a bit, making some demo's with a shared XML and 2.5 annotated configuration, when the Jira is in place I'll see if they add value.

    Cheers,
    Roel


  15. Mark,

    I'm very interested in the project but stuck with JDK 1.4.2. I downloaded the latest tree and tried to compile for 1.4.2 by adjusting the compliance levels in the spring-build/standard/common.xml script file but without luck.

    Is there any plan to support 1.4.2 or do I have to use some retroweaving library?

    Many thanks,


  16. Vladimir,

    Spring Integration relies heavily on Java 5 - including core language features (e.g. annotations and generics) as well as the Classes within java.util.concurrent. Based on that and the Spring Integration timeline, providing support for 1.4.2 just doesn't make sense. For example, according to the official 1.4.2 homepage (http://java.sun.com/j2se/1.4.2/), "J2SE 1.4.2 has begun the Sun End of Life (EOL) process. The EOL transition period is from Dec, 11 2006, until the General Availability (GA) of the next Java version, Java SE 7, currently planned for the summer of 2008. With this notice, customers are strongly encouraged to migrate to the current release, Java SE 6."

    I apologize for the inconvenience, but hopefully you will be able to move to Java 5 (or 6!) by mid-2008.

    Mark


  17. I guess the problem is not so much Sun but rather IBM here, Vladimir? As far as I hear, IBM's JDK 1.4.2 won't disappear until everyone has upgraded to WebSphere 6.1 - which is not likely to happen before 2009, partly due to parts of the WebSphere stack (e.g. WebSphere Portal) still not being available in a WebSphere 6.1 compatible version!

    We do generally follow Sun's End-of-Life recommendations while also taking the IBM part of the world into account. This is why Spring 2.5 still supports JDK 1.4.2. We also decided to release Spring Batch 1.0 in a JDK 1.4.2 compatible fashion - but we eventually went with Java 5 for Spring Integration 1.0, for the design reasons that Mark outlined.

    Fortuntely, there are a lot of core integration facilities available in Spring Framework 2.5 itself (e.g. the TaskExecutor support, JMS message listener containers, JCA bootstrapping), so hopefully you'll be able to build your solutions based on that part of the Spring stack for the time being. Once you're going Java 5 , you should be able to move to the Spring Integration product quite seamlessly…

    Juergen


  18. Mark, Juergen,

    Thank you for the answer. I was not aware of the JDK 1.4.2 EOL (me think "hurray!").

    We are currently deploying on WebLogic 8.1.4 on Sun JDK 1.4.2. and already using facilities provided by Spring 2.0 (JMS listener). I was just hoping to have a ready-to-use full-blown EIS system with Spring Integration.

    Looks like I'll just have to wait until we (finally) move to JDK 5.

    Best regards,

    -Vladimir


  19. Hi Mark,

    I want to write a MessageHandler which make use of the CorrelationId header to support request/reply message processing:

    public class MessageHandler1 {
    public Message processMessageSimple(Message requestMessage) {
    Message responseMessage = new StringMessage("return from message: " requestMessage.getPayload());

    responseMessage.getHeader().setCorrelationId(requestMessage.getHeader().getCorrelationId());

    return responseMessage;
    }
    }

    but the DefaultMessageHandlerAdapter uses SimplePayloadMessageMapper to extract the payload of the message as the argument to the handler method,

    public class DefaultMessageHandlerAdapter extends AbstractMessageHandlerAdapter
    implements Ordered, InitializingBean {

    public Object doHandle(Message message, SimpleMethodInvoker invoker) {
    return invoker.invokeMethod(this.getMapper().fromMessage(message));
    }

    }

    So, that is not possible to pass the raw Message as the parameter to the handler method. Is that by design or is there another way to manipulate the correlationId
    header? And How can I customize the MessageMapper?

    Thanks.


  20. Steven,

    It definitely should be possible for handler methods to accept the Message (or any subclass) as the parameter. While Message return values were already supported (not using the mapper in that case), the same was not yet supported for parameters - so thank you for pointing this out! I have just added a property 'shouldUseMapperOnInvocation' to the DefaultMessageHandlerAdapter. The default value is 'true', but now the DefaultMessageHandlerCreator will set the value to 'false' whenever the target method does expect an actual Message object. If you are interested in the details, then do an svn update, then have a look at the following tests:
    DefaultMessageHandlerAdapterTests, DefaultMessageHandlerCreatorTests, and MessageEndpointAnnotationPostProcessorTests (specifically with MessageParameterAnnotatedEndpoint).

    Thanks again,
    Mark


  21. Hi,

    why do you use Ivy for dependency-management? Do you plan to migrate to maven for spring-integration in near future or will Ivy be the build-tool for new subprojects?

    Thanks

    Jörg


  22. [quote comment="88389"]Hi,

    why do you use Ivy for dependency-management? Do you plan to migrate to maven for spring-integration in near future or will Ivy be the build-tool for new subprojects?

    Thanks

    Jörg[/quote]

    We use Ivy because after evaluating the options, Ivy's dependency management is, for our needs, more powerful and better suited to our projects. We have no plans to migrate to Maven at any point for our projects that are not already on it. We do however plan to produce POMs and other artifacts for Maven users in both release and snapshot form.

    -Ben


  23. While I can run the examples from the Subversion repository fine, when I attempt to create my own usages of message endpoints I have noticed that the PollingSourceAdapter.start method never gets called (though its constructor and setPeriod methods do). I am loading my application context using the ContextLoaderListener via the web.xml, and am wondering if the Lifecycle.start method even gets called on my ApplicationContext.

    My code looks almost identical to the (annotated) Quote example.

    I have no idea what I'm doing wrong here, but has anybody successfully made Spring Integration work from an application inside a servlet container?


  24. [quote comment="97472"]wondering if the Lifecycle.start method even gets called on my ApplicationContext.[/quote]

    Indeed, the start() method of an Application Context is not invoked automatically. Now, as far as Spring Integration is concerned, you only need to start the MessageBus - so if you get the MessageBus bean, you can invoke start() directly (you can look it up with the constant: MessageBusParser.MESSAGE_BUS_BEAN_NAME). Perhaps we should include an auto-start option on the bus itself (I'll add that in Jira now). Another options it to expose the bus' lifecycle methods as JMX managed operations.

    -Mark


  25. Sorry to drag this on, and this may not be the right forum for this, but I've noticed that other Lifecycle beans do not get start() events either, presumably because the application context has not been started. Would it be safe to write my own context loading listener, and have it invoke the start() method on the application context, and would that in turn start the MessageBus?

    Autostart on the bus sounds great, but I'm really wondering in general about Spring's application context start() and the general lack of documentation (that I can see) on it…


  26. For industrial solutions we have designed also an EAI-solution like spring-integration.
    We support there communication for example with PLSs (http://en.wikipedia.org/wiki/Programmable_logic_controller) and special industrial displays on production lines on one site and SAP for sales information on the other hand.
    For more details see http://www.sybit.de/en/solutions/industrial-solutions/industrial-solutions-solutions.html or contact us: info@sybit.de.


  27. Mark,
    How would I get a Cobol application connected to Spring Integration?


  28. Great stuff. Would you be able to also post the SI slides you presented at the NEJUG?

2 trackbacks

Leave a Reply