-
Notifications
You must be signed in to change notification settings - Fork 0
/
JMSHelper.java
132 lines (121 loc) · 4.3 KB
/
JMSHelper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package Main;
import java.io.Serializable;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class JMSHelper {
private static final String DEFAULT_HOST = "localhost";
private static final int DEFAULT_PORT = 3700;
private static final String JMS_CONNECTION_FACTORY = "jms/TestConnectionFactory";
private static final String MASTER_QUEUE = "jms/MasterQueue";
private static final String WORKER_QUEUE = "jms/WorkerQueue";
private static final String RESULT_QUEUE = "jms/ResultQueue";
private Context jndiContext;
private ConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private Queue masterQueue;
private Queue workerQueue;
private Queue resultQueue;
public JMSHelper() throws NamingException, JMSException {
this(DEFAULT_HOST);
}
public JMSHelper(String host) throws NamingException, JMSException {
int port = DEFAULT_PORT;
System.setProperty("org.omg.CORBA.ORBInitialHost", host);
System.setProperty("org.omg.CORBA.ORBInitialPort", ""+port);
try {
jndiContext = new InitialContext();
connectionFactory = (ConnectionFactory)jndiContext.lookup(JMS_CONNECTION_FACTORY);
masterQueue = (Queue)jndiContext.lookup(MASTER_QUEUE);
workerQueue = (Queue)jndiContext.lookup(WORKER_QUEUE);
resultQueue = (Queue)jndiContext.lookup(RESULT_QUEUE);
} catch (NamingException e) {
System.err.println("JNDI failed: " + e);
throw e;
}
try {
connection = connectionFactory.createConnection();
connection.start();
} catch (JMSException e) {
System.err.println("Failed to create connection to JMS provider: " + e);
throw e;
}
}
public Session createSession() throws JMSException {
if(session != null) {
return session;
} else {
try {
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
System.err.println("Failed creating session: " + e);
throw e;
}
}
}
public ObjectMessage createMessage(Serializable obj) throws JMSException {
try {
return createSession().createObjectMessage(obj);
} catch (JMSException e) {
System.err.println("Error preparing message: " + e);
throw e;
}
}
public MessageProducer createMasterQueueSender() throws JMSException {
try {
return createSession().createProducer(masterQueue);
} catch (JMSException e) {
System.err.println("Failed sending to queue: " + e);
throw e;
}
}
public MessageConsumer createMasterQueueReader() throws JMSException {
try {
return createSession().createConsumer(masterQueue);
} catch (JMSException e) {
System.err.println("Failed reading from queue: " + e);
throw e;
}
}
public MessageProducer createWorkerQueueSender() throws JMSException {
try {
return createSession().createProducer(workerQueue);
} catch (JMSException e) {
System.err.println("Failed sending to queue: " + e);
throw e;
}
}
public MessageConsumer createWorkerQueueReader() throws JMSException {
try {
return createSession().createConsumer(workerQueue);
} catch (JMSException e) {
System.err.println("Failed reading from queue: " + e);
throw e;
}
}
public MessageProducer createResultQueueSender() throws JMSException {
try {
return createSession().createProducer(resultQueue);
} catch (JMSException e) {
System.err.println("Failed reading from queue: " + e);
throw e;
}
}
public MessageConsumer createResultQueueReader() throws JMSException {
try {
return createSession().createConsumer(resultQueue);
} catch (JMSException e) {
System.err.println("Failed reading from queue: " + e);
throw e;
}
}
}