-
Notifications
You must be signed in to change notification settings - Fork 31
Creating custom Outbound Channel
shmurthy62 edited this page Feb 11, 2015
·
2 revisions
Next we will show you how to build an OutboundChannel called SampleOutboundChannel
<bean id="SampleOutboundChannelBinder" class="com.ebay.jetstream.event.support.channel.ChannelBinding">
<property name="channel" ref="outboundSampleChannel" />
</bean>
<bean id="outboundSampleChannel" class="com.ebay.jetstream.samples.SampleOutboundChannel">
<property name="address" ref="SampleChannelAddress" />
</bean>
<bean id="SampleChannelAddress" class="com.ebay.jetstream.samples.SampleChannelAddress">
<property name="addresses">
<list>
<value>someaddress</value>
</list>
</property>
</bean>
package com.ebay.jetstream.samples;
import com.ebay.jetstream.event.AbstractOutboundChannel ;
import com.ebay.jetstream.config.ContextBeanChangedEvent;
import com.ebay.jetstream.event.EventException;
import com.ebay.jetstream.event.JetstreamEvent;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import com.ebay.jetstream.management.Management;
// The @ManagedResource annotation is needed to signal to management subsystem that this is a managed
// resource and all stats for this resource is made available under the folder 'Event/Channl'
@ManagedResource(objectName = "Event/Channel", description = "SampleOutboundChannel")
public class SampleOutboundChannel extends AbstractOutboundChannel {
private SampleChannelAddress m_address;
public void setAddress(SampleChannelAddress address) {
m_address = address;
}
@Override
public ChannelAddress getAddress() {
return m_address;
}
@Override
public void open() throws EventException {
// This is invoked by the ChannelBinder. This is a signal to
// open the pipe.
super.open();
// Now add binding to the stats folder
Management.addBean(getBeanName(), this);
}
@Override
public void close() throws EventException {
// This is invoked by the ChannelBinder. This is a signal to
// to close the channel
super.close();
// Now remove binding with the stats folder
Management.removeBeanOrFolder(getBeanName(), this);
}
@Override
public void flush() throws EventException {
// This is invoked by the ChannelBinder. This is a signal to
// flush all queues, will be called before close
}
@Override
public void sendEvent(JetstreamEvent event) throws EventException {
// This method is called when event is delivered to this class
incrementEventRecievedCounter(); // needed for pipeline monitoring
try {
// do processing here and forward to downstream interface
// outbound channels do not have a sink
// we will print it to console
System.out.println(event.toString());
incrementEventSentCounter(); // needed for pipeline monitoring
} catch (Throwable t) {
// handle exception
incrementEventDroppedCounter(); // needed for pipeline monitoring
}
// do processing here and forward to downstream interface
// outbound channels do not have a sink
}
@Override
public int getPendingEvents() {
// This method belongs to ShutDownable interface and is
// invoked by the Jetstream Framework Shutdown Orchestrator
// when the application receives a termination signal.
// This method must return a count of pending events in its
// queues. The orchestrator will wait till this count goes to 0
// before shutting down the next stage
return 0;
}
@Override
public void shutDown() {
// This method belongs to ShutDownable interface and is
// invoked by the Jetstream Framework Shutdown Orchestrator
// when the application receives a termination signal.
// Use this method as a signal to release all open resources
// held by this class.
}
@Override
public void processApplicationEvent(ApplicationEvent event) {
if (event instanceof ContextBeanChangedEvent) {
ContextBeanChangedEvent bcInfo = (ContextBeanChangedEvent) event;
// check changes
if (bcInfo.isChangedBean(getAddress())) {
try {
close();
}
catch (Throwable e) {
// handle error
}
try {
setAddress((ChannelAddress) bcInfo.getChangedBean());
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
open();
}
catch (Throwable e) {
// handle error
}
}
}
}
@Override
public void afterPropertiesSet() throws Exception {
// This method belongs to InitializingBean interface and
// is invoked by SpringFramework after all setters for this
// class have been called by Spring.
}
// Here we show you how to raise an alarm so upstream stages see you in alarm state
// and stop sending events to you. This method is shown for pure illustration purposes.
// You should implement this based on your needs. It is assumed that processState is
// called from somewhere in your processing logic.
public void processState() {
switch(m_state) {
case trouble:
getAlarmListener().alarm(ChannelAlarm.OVERRUN);
break;
case recovered:
getAlarmListener().alarm(ChannelAlarm.CLEAR);
break;
default:
break;
}
}
}
Address implementation for SampleChannel is same as Inbound Channel. The Spring bean definition and wiring for SampleOutboundChannel is shown below