-
Notifications
You must be signed in to change notification settings - Fork 0
/
ResultsRemote.java
85 lines (78 loc) · 2.7 KB
/
ResultsRemote.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.*;
import java.util.*;
import org.apache.commons.codec.binary.Base64;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.DeleteQueueRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
/** Class for the thread that returns result to client if we use remote workers
*
* @author Tiago Pais
*/
public class ResultsRemote implements Runnable{
private int numTasks,completed=0;
private ServerSocket ss;
private AmazonSQS sqs;
private String resultUrl;
/**Constructor
*
* @param ss Server socket to accept requests
* @param numTasks total number of tasks to perform
* @param sqs object for SQS to access queues
* @param resultUrl url of the results queue
*/
public ResultsRemote(ServerSocket ss, int numTasks, AmazonSQS sqs, String resultUrl){
this.numTasks=numTasks;
this.ss=ss;
this.sqs=sqs;
this.resultUrl=resultUrl;
}
/**Method that runs thread
*
*/
public void run(){
//until returning the result for all the tasks
while(completed<numTasks){
try{
//accept connection from client
Socket s =ss.accept();
DataOutputStream o= new DataOutputStream(s.getOutputStream());
//messages come in group of 10 from queue
List <Message> messages;
//if list has less than 10 we have reached the end of the results
do{
//get 10 results from queue
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(resultUrl);
receiveMessageRequest.setMaxNumberOfMessages(10);
messages=sqs.receiveMessage(receiveMessageRequest).getMessages();
//iterate list
for (Message message : messages) {
//delete message from queue
sqs.deleteMessage(new DeleteMessageRequest(resultUrl, message.getReceiptHandle()));
//send result to client
o.writeUTF(message.getBody());
//update completed tasks
completed++;
}
}while(messages.size()>9);
//let client know that there are no more results
o.writeUTF("END");
//close connection
s.close();
}catch(Exception e){
e.printStackTrace();
}
}
}
}