Spring Integration Samples

In my recent post, I had mentioned that the Subversion repository for Spring Integration would be publicly accessible soon, and I'm pleased to provide that link now. You can checkout the project with the following command:
If the checkout is successful, you should see the following directory structure:
+–build-spring-integration/
+–spring-build/
+–spring-integration-core/
+–spring-integration-samples/
I would like to take this opportunity to walk through a couple of the samples that are in 'spring-integration-samples'. Keep in mind this project is definitely a work-in-progress (currently a 0.5 SNAPSHOT), but the samples should give you an idea of how the programming model is taking shape, and I'm very much looking forward to getting some feedback.
Hello World
The first sample is the obligatory "Hello World" demo. This one demonstrates the main components: Message Channel and Message Endpoint. This demo also reveals how the Spring Integration approach is non-invasive - providing a complete separation of concerns between business logic and messaging. In this case, the "business logic" is a simple HelloService:
This example uses an XML-based configuration for the Message Endpoint (we'll see the annotation approach next):
default-output-channel="outputChannel"
handler-ref="helloService"
handler-method="sayHello"/>
There you see that 'handler-ref' simply points to a Spring-managed bean. If you have used Spring's MessageListenerAdapter for asynchronous JMS reception, then this should look familiar - especially if you are using Spring 2.5's new jms namespace and the "jms:listener" element. Finally, the HelloWorldDemo starts the application context and then interacts with the channels:
MessageChannel inputChannel = channelRegistry.lookupChannel("inputChannel");
MessageChannel outputChannel = channelRegistry.lookupChannel("outputChannel");
inputChannel.send(new StringMessage(1, "World"));
System.out.println(outputChannel.receive().getPayload());
That example involves lookup of the MessageBus bean - which implements the ChannelRegistry interface. However, in a non-demo "real world" scenario, any component that would access channels can have the registry provided via dependency injection. All it needs to do is implement ChannelRegistryAware (or use @Autowired). This is the same approach used elsewhere in Spring - such as ApplicationEventPublisherAware.
Annotation-driven Endpoint and Subscriber
The next example shows how to configure a Message Endpoint with annotations. In fact, this particular endpoint even provides the data that is translated (behind the scenes) into Message payload content with the @Polled method annotation. It could alternatively provide an input channel for receiving messages asynchronously.
public class QuotePublisher {
@Polled(period=300)
public Quote getQuote() {
BigDecimal price = new BigDecimal(new Random().nextDouble() * 100);
return new Quote(generateTicker(), price.setScale(2, RoundingMode.HALF_EVEN));
}
private String generateTicker() {
// randomly generates 3-letter tickers
}
}
On the receiving side, there is a @Subscriber annotation:
Here is the XML which registers the annotation post-processor and the 2 Spring-managed beans (notice that this example is using the 'spring-integration' schema as the primary namespace.
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
<message-bus/>
<annotation-driven/>
<channel id="quotes"/>
<beans:bean id="publisher" class="org.springframework.integration.samples.quote.QuotePublisher"/>
<beans:bean id="subscriber" class="org.springframework.integration.samples.quote.QuoteSubscriber"/>
</beans:beans>
By the way, the 'annotation-driven' element also enables the @Publisher annotation which triggers the creation of AOP advice for asynchronously sending the return value of any annotated method to a channel.
Simple Routing
The routing sample features a Message Endpoint that produces an incrementing integer every 3 seconds and a router that resolves the target channel name (note that a router method can also return more than one result and can return actual MessageChannel instances rather than names).
public class Counter {
private AtomicInteger count = new AtomicInteger();
@Polled(period=3000)
public int getNumber() {
return count.incrementAndGet();
}
@Router
public String resolveChannel(int i) {
if (i % 2 == 0) {
return "even";
}
return "odd";
}
}
On the receiving end of these channels, we have 2 different methods that simply log the message payload:
By the way, notice that the NumberLogger is annotated with Spring's @Component. The @MessageEndpoint annotation also contains @Component as a meta-annotation. Both are therefore "stereotypes" and eligible for autodetection with Spring 2.5's classpath-scanning. The XML for this example is extremely simple:
<message-bus auto-create-channels="true"/>
<annotation-driven/>
Hopefully this provides a decent introduction to the Spring Integration programming model. Feel free to checkout the code and try out these samples yourself. I am currently working on a "Getting Started" guide that I will make available after the holidays. If you do checkout the code, please be sure to update frequently. The code is constantly evolving, and in particular I am refactoring much of the core consumer/dispatcher code with the goal of providing the simplest possible extension points for adding either polling or event-driven message sources. In the next blog post, I plan to show some new additions to the 'spring-integration-samples' featuring those extension points.
Steven says:
Added on December 22nd, 2007 at 6:56 amAfter playing with the sample applications, I tried to setup a common file batching processing
scenario using InboundFileAdapter:
then I found a bug in AbstractChannelAdapter:
public Message receive(long timeout) {
if (!this.initialized) {
throw new MessageHandlingException("adapter not initialized");
}
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
Future result = executor.submit(new Callable() {
public Message call() throws Exception {
return receive();
}
});
try {
result.get(timeout, TimeUnit.MILLISECONDS);
if (result.isDone()) {
return result.get();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (TimeoutException e) {
return null;
} catch (ExecutionException e) {
throw new MessageHandlingException("Exception occurred in message source", e);
}
result.cancel(true);
} finally {
executor.shutdown();
}
return null;
}
the executor is not shutdown properly (I have added the finally clause).
I can not find the support forum for Spring Integration yet, so I raise the bug here.
Mark Fisher (blog author) says:
Added on December 22nd, 2007 at 11:30 amThank you Steven. I committed the change, and I also adjusted EndpointExecutor so that it implements Lifecycle and is properly shutdown as well. That AbstractChannelAdapter is one of the components that will be replaced or significantly refactored shortly. Individual components will ultimately be exposing tasks to be scheduled on an executor rather than creating a new one within the class. Thanks again, -Mark
Srini Penchikala says:
Added on December 22nd, 2007 at 10:59 pmHi Mark,
Your blog entry was very helpful for me in checking out SpringIntegration source and running the samples w/o any problems. This is a very good introduction to a new framework like SpringIntegration. I would love to see more advanced examples on how to use SI and may be implementation of some of the EIS patterns discussed in Gregor Hohpe's book.
Great job.
Thanks
Srini
Mark Fisher (blog author) says:
Added on December 23rd, 2007 at 11:43 amThank you Srini. I am definitely planning to include more complex (and more complete) samples, such as the Loan Broker example from the Enterprise Integration Patterns book. Many other patterns will also be included in the framework, and as those are added I will continue to post samples on this blog. By the time of the first non-snapshot release (target is January 11th), we will be providing reference documentation and a "Getting Started" guide as well. I do hope to have an early version of the "Getting Started" guide available by the end of next week.
Cheers,
-Mark
Steven says:
Added on December 24th, 2007 at 6:16 amHi, Mark,
I am very interested in the bright future of Spring Integration and we will base our solution on this new project(We have already integrated Spring Batch and are not afraid of being cutting edge.)
When I digged deeper with current release, I find another issue in class MessageBus:
public void registerEndpoint(String name, MessageEndpoint endpoint) {
Assert.notNull(name, "'name' must not be null");
Assert.notNull(endpoint, "'endpoint' must not be null");
this.endpoints.put(name, endpoint);
if (logger.isInfoEnabled()) {
logger.info("registered endpoint '" name "'");
}
endpoint.setChannelRegistry(this);
if (endpoint.getInputChannelName() != null && endpoint.getConsumerPolicy() != null) {
Subscription subscription = new Subscription();
subscription.setChannel(endpoint.getInputChannelName());
subscription.setEndpoint(name);
subscription.setPolicy(endpoint.getConsumerPolicy());
this.activateSubscription(subscription);
}
if (this.autoCreateChannels) {
String defaultOutputChannelName = endpoint.getDefaultOutputChannelName();
if (StringUtils.hasText(defaultOutputChannelName)) {
this.registerChannel(defaultOutputChannelName, new PointToPointChannel());
}
}
}
when autoCreateChannels is set and defaultOutputChannelName has text, the channel with defaultOutputChannelName should first be lookuped in the channel registry, if failed then fall back to create a new one.
-Steven
Mark Fisher (blog author) says:
Added on December 24th, 2007 at 8:35 amSteven, I just added the registry lookup attempt for the output channel name there. Thanks for catching that. By the way, Spring Integration's Jira should be publicly available by the end of the week, and I will post a comment here with that link. -Mark
Peter says:
Added on December 30th, 2007 at 8:35 amHi Mark,
First of all, thanks for this nice intro into your framework.
After running your examples w/o any problems and trying to understand the code so far, I'm
wondering if it could make sense to extend the XML-element like his:
Defines a consumer policy.
where bean-ref points to a policy-bean, somthing like:
I gave it a try and changed the EndpointParser in parseConsumer() method:
String beanName = element.getAttribute(POLICY_NAME_ATTRIBUTE);
if (StringUtils.hasText(beanName)) {
parserContext.registerBeanComponent(
new BeanComponentDefinition(consumerDef, beanName));
} else {
beanName = parserContext.getReaderContext().generateBeanName(consumerDef);
parserContext.registerBeanComponent(
new BeanComponentDefinition(consumerDef, beanName));
}
return beanName;
and it seems to work quite well.
Any comments or restrictions on doing it this way?
Would your recomment an other solution for this kind of problem?
kind regards
-peter
Peter says:
Added on December 30th, 2007 at 8:43 amoops, sorry,
the XML-code isn't shown in my previous post.
So what I've done is this:
I extended the "consumer" xml-schema-element definition by a new attribute "bean-ref" of type xsd:string,
which is the bean-id of an bean of type org.springframework.integration.bus.ConsumerPolicy.
This way I can reuse a Policy for different endpoints.
I would like to use this especially for different consumer threadpools.
again kind regards
-peter
Mark Fisher (blog author) says:
Added on December 30th, 2007 at 4:32 pmPeter,
Thank you for the excellent idea. I will add a 'ref' for consumer - perhaps as just a 'consumer' attribute on the main endpoint element. By the way, I have also been considering a simplification for using "common" policies, such as a basic "PollingPolicy" that simply accepts a period (poll interval) and uses typical settings for the other parameters (1 concurrent poller, 0 timeout, etc).
Mark
Peter says:
Added on December 30th, 2007 at 6:49 pmHi Mark,
thanks for your answere.
Concerning common policies, I think one major part is missing.
In my opinion for the policy a boolean flag (or something different, but with same semantics) is
"isIndempotent", …. maybe as implementation of a Marker-interface.
A value of "true" will guarantee (surely, with the necessary locic implemented) that a record is retrieved only once.
peter
Roel says:
Added on January 5th, 2008 at 11:30 amHi Mark,
I have been playing around with the samples and I tried to create some nice examples myself.
When I try to access the repository I get an error saying the repository is no longer available.
Today I kicked off a SVN update and when I came back I was surprised to see that SVN deleted the project content.
Is the latest version available in a different location? Or can you provide me the sources in some other way?
It would be much appreciated.
Kind regards,
Roel
Roel says:
Added on January 5th, 2008 at 11:37 amOeps, never mind I figured it out.
The URL for a checkout should now be:
https://anonsvn.springframework.org/svn/spring-integration/trunk/
instead of
https://anonsvn.springframework.org/svn/spring-integration/base/trunk
Cheers,
Roel
Mark Fisher (blog author) says:
Added on January 5th, 2008 at 11:50 amRoel,
I apologize for not leaving a comment here earlier. As you have now discovered (rather abruptly), the SVN repository was restructured yesterday. It is now flattened such that 'trunk' is directly under the root. You may also notice that a "spring-integration-reference" project has been added, and there you will see gradual creation of the reference guide (although it's empty as of right now). All of these changes are occurring as we approach the proposed 1.0 M1 release on January 18th. I will be posting another blog entry next week that includes more information about the reference guide as well as the Jira instance which will soon be publicly accessible. That would provide a good arena for sharing your samples if you are interested. In my blog post next week, I will also highlight a couple more samples. One (FileCopyDemo) has already been added within the "spring-integration-samples" project, and I intend to include a JMS sample as well. I am currently wrapping up the namespace support for the JMS adapters.
Mark
Roel says:
Added on January 6th, 2008 at 4:57 pmHi Mark,
Thanks for giving us an update on what you are working on.
I will check up on your progress regularly and checkout the new stuff.
I've been playing around a bit, making some demo's with a shared XML and 2.5 annotated configuration, when the Jira is in place I'll see if they add value.
Cheers,
Roel
Vladimir says:
Added on January 7th, 2008 at 1:54 pmMark,
I'm very interested in the project but stuck with JDK 1.4.2. I downloaded the latest tree and tried to compile for 1.4.2 by adjusting the compliance levels in the spring-build/standard/common.xml script file but without luck.
Is there any plan to support 1.4.2 or do I have to use some retroweaving library?
Many thanks,
Mark Fisher (blog author) says:
Added on January 7th, 2008 at 4:48 pmVladimir,
Spring Integration relies heavily on Java 5 - including core language features (e.g. annotations and generics) as well as the Classes within java.util.concurrent. Based on that and the Spring Integration timeline, providing support for 1.4.2 just doesn't make sense. For example, according to the official 1.4.2 homepage (http://java.sun.com/j2se/1.4.2/), "J2SE 1.4.2 has begun the Sun End of Life (EOL) process. The EOL transition period is from Dec, 11 2006, until the General Availability (GA) of the next Java version, Java SE 7, currently planned for the summer of 2008. With this notice, customers are strongly encouraged to migrate to the current release, Java SE 6."
I apologize for the inconvenience, but hopefully you will be able to move to Java 5 (or 6!) by mid-2008.
Mark
Juergen Hoeller (blog author) says:
Added on January 7th, 2008 at 5:38 pmI guess the problem is not so much Sun but rather IBM here, Vladimir? As far as I hear, IBM's JDK 1.4.2 won't disappear until everyone has upgraded to WebSphere 6.1 - which is not likely to happen before 2009, partly due to parts of the WebSphere stack (e.g. WebSphere Portal) still not being available in a WebSphere 6.1 compatible version!
We do generally follow Sun's End-of-Life recommendations while also taking the IBM part of the world into account. This is why Spring 2.5 still supports JDK 1.4.2. We also decided to release Spring Batch 1.0 in a JDK 1.4.2 compatible fashion - but we eventually went with Java 5 for Spring Integration 1.0, for the design reasons that Mark outlined.
Fortuntely, there are a lot of core integration facilities available in Spring Framework 2.5 itself (e.g. the TaskExecutor support, JMS message listener containers, JCA bootstrapping), so hopefully you'll be able to build your solutions based on that part of the Spring stack for the time being. Once you're going Java 5 , you should be able to move to the Spring Integration product quite seamlessly…
Juergen
Vladimir says:
Added on January 8th, 2008 at 11:08 amMark, Juergen,
Thank you for the answer. I was not aware of the JDK 1.4.2 EOL (me think "hurray!").
We are currently deploying on WebLogic 8.1.4 on Sun JDK 1.4.2. and already using facilities provided by Spring 2.0 (JMS listener). I was just hoping to have a ready-to-use full-blown EIS system with Spring Integration.
Looks like I'll just have to wait until we (finally) move to JDK 5.
Best regards,
-Vladimir
Steven says:
Added on January 9th, 2008 at 9:06 amHi Mark,
I want to write a MessageHandler which make use of the CorrelationId header to support request/reply message processing:
public class MessageHandler1 {
public Message processMessageSimple(Message requestMessage) {
Message responseMessage = new StringMessage("return from message: " requestMessage.getPayload());
responseMessage.getHeader().setCorrelationId(requestMessage.getHeader().getCorrelationId());
return responseMessage;
}
}
but the DefaultMessageHandlerAdapter uses SimplePayloadMessageMapper to extract the payload of the message as the argument to the handler method,
public class DefaultMessageHandlerAdapter extends AbstractMessageHandlerAdapter
implements Ordered, InitializingBean {
public Object doHandle(Message message, SimpleMethodInvoker invoker) {
return invoker.invokeMethod(this.getMapper().fromMessage(message));
}
}
So, that is not possible to pass the raw Message as the parameter to the handler method. Is that by design or is there another way to manipulate the correlationId
header? And How can I customize the MessageMapper?
Thanks.
Mark Fisher (blog author) says:
Added on January 9th, 2008 at 11:23 amSteven,
It definitely should be possible for handler methods to accept the Message (or any subclass) as the parameter. While Message return values were already supported (not using the mapper in that case), the same was not yet supported for parameters - so thank you for pointing this out! I have just added a property 'shouldUseMapperOnInvocation' to the DefaultMessageHandlerAdapter. The default value is 'true', but now the DefaultMessageHandlerCreator will set the value to 'false' whenever the target method does expect an actual Message object. If you are interested in the details, then do an svn update, then have a look at the following tests:
DefaultMessageHandlerAdapterTests, DefaultMessageHandlerCreatorTests, and MessageEndpointAnnotationPostProcessorTests (specifically with MessageParameterAnnotatedEndpoint).
Thanks again,
Mark
Jörg says:
Added on January 15th, 2008 at 2:28 pmHi,
why do you use Ivy for dependency-management? Do you plan to migrate to maven for spring-integration in near future or will Ivy be the build-tool for new subprojects?
Thanks
Jörg
Ben Hale (blog author) says:
Added on January 17th, 2008 at 11:20 am[quote comment="88389"]Hi,
why do you use Ivy for dependency-management? Do you plan to migrate to maven for spring-integration in near future or will Ivy be the build-tool for new subprojects?
Thanks
Jörg[/quote]
We use Ivy because after evaluating the options, Ivy's dependency management is, for our needs, more powerful and better suited to our projects. We have no plans to migrate to Maven at any point for our projects that are not already on it. We do however plan to produce POMs and other artifacts for Maven users in both release and snapshot form.
-Ben
Ryan Daum says:
Added on February 7th, 2008 at 6:46 pmWhile I can run the examples from the Subversion repository fine, when I attempt to create my own usages of message endpoints I have noticed that the PollingSourceAdapter.start method never gets called (though its constructor and setPeriod methods do). I am loading my application context using the ContextLoaderListener via the web.xml, and am wondering if the Lifecycle.start method even gets called on my ApplicationContext.
My code looks almost identical to the (annotated) Quote example.
I have no idea what I'm doing wrong here, but has anybody successfully made Spring Integration work from an application inside a servlet container?
Mark Fisher (blog author) says:
Added on February 7th, 2008 at 10:06 pm[quote comment="97472"]wondering if the Lifecycle.start method even gets called on my ApplicationContext.[/quote]
Indeed, the start() method of an Application Context is not invoked automatically. Now, as far as Spring Integration is concerned, you only need to start the MessageBus - so if you get the MessageBus bean, you can invoke start() directly (you can look it up with the constant: MessageBusParser.MESSAGE_BUS_BEAN_NAME). Perhaps we should include an auto-start option on the bus itself (I'll add that in Jira now). Another options it to expose the bus' lifecycle methods as JMX managed operations.
-Mark
Ryan Daum says:
Added on February 8th, 2008 at 9:56 amSorry to drag this on, and this may not be the right forum for this, but I've noticed that other Lifecycle beans do not get start() events either, presumably because the application context has not been started. Would it be safe to write my own context loading listener, and have it invoke the start() method on the application context, and would that in turn start the MessageBus?
Autostart on the bus sounds great, but I'm really wondering in general about Spring's application context start() and the general lack of documentation (that I can see) on it…
Stephan Strittmatter says:
Added on March 4th, 2008 at 7:42 amFor industrial solutions we have designed also an EAI-solution like spring-integration.
We support there communication for example with PLSs (http://en.wikipedia.org/wiki/Programmable_logic_controller) and special industrial displays on production lines on one site and SAP for sales information on the other hand.
For more details see http://www.sybit.de/en/solutions/industrial-solutions/industrial-solutions-solutions.html or contact us: info@sybit.de.
AndrewB says:
Added on March 20th, 2008 at 8:00 amMark,
How would I get a Cobol application connected to Spring Integration?
Danny Robinson says:
Added on April 14th, 2008 at 3:16 pmGreat stuff. Would you be able to also post the SI slides you presented at the NEJUG?