Apache Camel Splitter and Aggregator
Splitter is used to split a message into pieces that are routed separately.
Aggregator is used to combine results of individual but related messages into a single outgoing message.
Aggregator in detail
The Aggregator receives a stream of messages and identifies messages that are related, which are then aggregated into a single combined message. After a completion condition occurs, the aggregated message is sent to the output channel for further processing.
The Aggregator is a stateful EIP because it needs to store the in-progress aggregates until completion conditions occur and the aggregated message can be published. By default, the Aggregator will keep state in memory only. If the application is shut down or the host container crashes, the state will be lost.
Example: In loan brokering application, the broker sends loan requests to multiple banks and then aggregate the replies to prepare the best deal.
When using the Aggregator, you have to pay attention to the following three settings, which must be configured.
- Correlation identifier — An
Expression
that determines which incoming messages belong together - Completion condition — A
Predicate
or time-based condition that determines when the result message should be sent - Aggregation strategy — An
AggregationStrategy
that specifies how to combine the messages into a single message
Figure-2 shows how this works. When the first message with correlation identifier 1 arrives, the aggregator initializes a new aggregate and stores the message inside the aggregate. In this example, the completion condition is the aggregation of three messages, so the aggregate isn’t yet complete. When the second message with correlation identifier 1 arrives, the EIP adds it to the already existing aggregate. The third message specifies a different correlation identifier value of 2, so the aggregator starts a new aggregate for that value. The fourth message relates to the first aggregate (identifier 1), so the aggregate has now aggregated three messages, and the completion condition is fulfilled. As a result, the aggregator marks the aggregate as complete and publishes the resulting message.
As mentioned before, three configurations are in play when using the Aggregator EIP: correlation identifier, completion condition, and aggregation strategy. To understand how these three are specified and how they work, let’s start with the example of a Camel route in the Java DSL (with the configurations in bold):
public void configure() throws Exception {
from("direct:start")
.log("Sending ${body} with correlation key ${header.myId}")
.aggregate(header("myId"), new MyAggregationStrategy())
.completionSize(3)
.log("Sending out ${body}")
.to("mock:result");
AggregationStrategy
for merging messages:
public class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn()
.getBody(String.class);
String body = oldBody + newBody;
oldExchange.getIn().setBody(body); return oldExchange;
}
}
Completion conditions for the Aggregator
Imagine a situation in which a condition never occurs, causing aggregated messages never to be published. To remedy this, you could add a time-out condition that reacts if all messages aren’t received within a certain time period.
Splitter
How the Splitter works
The Splitter works something like a big iterator that iterates through something and processes each entry.