-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathe2sar_seg_live_test.cpp
238 lines (193 loc) · 8.88 KB
/
e2sar_seg_live_test.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#define BOOST_TEST_MODULE DPSegLiveTests
#include <stdlib.h>
#include <iostream>
#include <cmath>
#include <boost/asio.hpp>
#include <boost/chrono.hpp>
#include <boost/thread/thread.hpp>
#include <vector>
#include <boost/test/included/unit_test.hpp>
#include <boost/program_options.hpp>
#include "e2sar.hpp"
using namespace e2sar;
namespace po = boost::program_options;
namespace pt = boost::posix_time;
BOOST_AUTO_TEST_SUITE(DPSegLiveTests)
// these tests test the sync thread and the sending of
// the sync messages against live UDPLBd.
BOOST_AUTO_TEST_CASE(DPSegLiveTest1)
{
std::cout << "DPSegLiveTest1: test segmenter (and sync thread) against UDPLBd by sending 5 events via event queue with default MTU so 5 frames are sent" << std::endl;
// parse URI from env variable
// it needs to have the sync address/port
auto uri_r = EjfatURI::getFromEnv();
if (uri_r.has_error())
std::cout << "URI Error: " << uri_r.error().message() << std::endl;
BOOST_CHECK(!uri_r.has_error());
auto uri = uri_r.value();
// create LBManager
auto lbman = LBManager(uri, false);
// reserve an LB to get sync address
auto duration_v = pt::duration_from_string("01");
std::string lbname{"mylb"};
std::vector<std::string> senders{"192.168.100.1"s, "192.168.100.2"s};
// call reserve
auto res = lbman.reserveLB(lbname, duration_v, senders);
BOOST_CHECK(!res.has_error());
BOOST_CHECK(!lbman.get_URI().get_InstanceToken().value().empty());
BOOST_CHECK(lbman.get_URI().has_syncAddr());
BOOST_CHECK(lbman.get_URI().has_dataAddr());
u_int16_t dataId = 0x0505;
u_int32_t eventSrcId = 0x11223344;
Segmenter::SegmenterFlags sflags;
sflags.syncPeriodMs= 1000; // in ms
sflags.syncPeriods = 5; // number of sync periods to use for sync
// create a segmenter and start the threads
// using the updated URI with sync info
std::cout << "Creating segmenter using returned URI: " <<
lbman.get_URI().to_string(EjfatURI::TokenType::instance) << std::endl;
Segmenter seg(lbman.get_URI(), dataId, eventSrcId, sflags);
auto res1 = seg.openAndStart();
if (res1.has_error())
std::cout << "Error encountered opening sockets and starting threads: " << res1.error().message() << std::endl;
BOOST_CHECK(!res1.has_error());
std::cout << "Running data test for 10 seconds against sync " <<
lbman.get_URI().get_syncAddr().value().first << ":" <<
lbman.get_URI().get_syncAddr().value().second << " and data " <<
lbman.get_URI().get_dataAddrv4().value().first << ":" <<
lbman.get_URI().get_dataAddrv4().value().second <<
std::endl;
std::string eventString{"THIS IS A VERY LONG EVENT MESSAGE WE WANT TO SEND EVERY 2 SECONDS."s};
std::cout << "The event data is string '" << eventString << "' of length " << eventString.length() << std::endl;
//
// send one event message per 2 seconds that fits into a single frame
//
auto sendStats = seg.getSendStats();
if (sendStats.get<1>() != 0)
{
std::cout << "Error encountered after opening send socket: " << strerror(sendStats.get<2>()) << std::endl;
}
for(auto i=0; i<5;i++) {
auto sendres = seg.addToSendQueue(reinterpret_cast<u_int8_t*>(eventString.data()), eventString.length());
BOOST_CHECK(!sendres.has_error());
sendStats = seg.getSendStats();
if (sendStats.get<1>() != 0)
{
std::cout << "Error encountered sending event frames: " << strerror(sendStats.get<2>()) << std::endl;
}
// sleep for a second
boost::this_thread::sleep_for(boost::chrono::seconds(2));
}
// check the sync stats
auto syncStats = seg.getSyncStats();
sendStats = seg.getSendStats();
if (syncStats.get<1>() != 0)
{
std::cout << "Error encountered sending sync frames: " << strerror(syncStats.get<2>()) << std::endl;
}
// send 10 sync messages and no errors
std::cout << "Sent " << syncStats.get<0>() << " sync frames" << std::endl;
BOOST_CHECK(syncStats.get<0>() >= 10);
BOOST_CHECK(syncStats.get<1>() == 0);
// check the send stats
std::cout << "Sent " << sendStats.get<0>() << " data frames" << std::endl;
// send 5 event messages and no errors
BOOST_CHECK(sendStats.get<0>() == 5);
BOOST_CHECK(sendStats.get<1>() == 0);
// call free - this will correctly use the admin token (even though instance token
// is added by reserve call and updated URI inside with LB ID added to it
auto res2 = lbman.freeLB();
BOOST_CHECK(!res2.has_error());
// stop threads and exit
}
// these tests test the sync thread and the sending of
// the sync messages against live UDPLBd.
BOOST_AUTO_TEST_CASE(DPSegLiveTest2)
{
std::cout << "DPSegLiveTest2: test segmenter (and sync thread) against UDPLBd by sending 5 events via event queue small MTU so 20 frames are sent" << std::endl;
// parse URI from env variable
// it needs to have the sync address/port
auto uri_r = EjfatURI::getFromEnv();
if (uri_r.has_error())
std::cout << "URI Error encountered: " << uri_r.error().message() << std::endl;
BOOST_CHECK(!uri_r.has_error());
auto uri = uri_r.value();
// create LBManager
auto lbman = LBManager(uri, false);
// reserve an LB to get sync address
auto duration_v = pt::duration_from_string("01");
std::string lbname{"mylb"};
std::vector<std::string> senders{"192.168.100.1"s, "192.168.100.2"s};
// call reserve
auto res = lbman.reserveLB(lbname, duration_v, senders);
BOOST_CHECK(!res.has_error());
BOOST_CHECK(!lbman.get_URI().get_InstanceToken().value().empty());
BOOST_CHECK(lbman.get_URI().has_syncAddr());
BOOST_CHECK(lbman.get_URI().has_dataAddr());
u_int16_t dataId = 0x0505;
u_int32_t eventSrcId = 0x11223344;
Segmenter::SegmenterFlags sflags;
sflags.syncPeriodMs = 500; // in ms
sflags.syncPeriods = 5; // number of sync periods to use for sync
sflags.mtu = 64 + 40;
// create a segmenter using URI sync and data info
// and start the threads, send MTU is set to force
// breaking up event payload into multiple frames
// 64 is the length of all headers (IP, UDP, LB, RE)
Segmenter seg(lbman.get_URI(), dataId, eventSrcId, sflags);
std::cout << "Creating segmenter using returned URI: " <<
lbman.get_URI().to_string(EjfatURI::TokenType::instance) << std::endl;
auto res1 = seg.openAndStart();
if (res1.has_error())
std::cout << "Error encountered opening sockets and starting threads: " << res1.error().message() << std::endl;
BOOST_CHECK(!res1.has_error());
std::cout << "Running data test against sync " <<
lbman.get_URI().get_syncAddr().value().first << ":" <<
lbman.get_URI().get_syncAddr().value().second << " and data " <<
lbman.get_URI().get_dataAddrv4().value().first << ":" <<
lbman.get_URI().get_dataAddrv4().value().second <<
std::endl;
std::string eventString{"THIS IS A VERY LONG EVENT MESSAGE WE WANT TO SEND EVERY 1/2 SECONDS."s};
std::cout << "The event data is string '" << eventString << "' of length " << eventString.length() << std::endl;
//
// send one event message per 2 seconds that fits into a single frame
//
auto sendStats = seg.getSendStats();
if (sendStats.get<1>() != 0)
{
std::cout << "Error encountered after opening send socket: " << strerror(sendStats.get<2>()) << std::endl;
}
for(auto i=0; i<10;i++) {
auto sendres = seg.addToSendQueue(reinterpret_cast<u_int8_t*>(eventString.data()), eventString.length());
BOOST_CHECK(!sendres.has_error());
sendStats = seg.getSendStats();
if (sendStats.get<1>() != 0)
{
std::cout << "Error encountered sending event frames: " << strerror(sendStats.get<2>()) << std::endl;
}
// sleep for a second
boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
}
// check the sync stats
auto syncStats = seg.getSyncStats();
sendStats = seg.getSendStats();
if (syncStats.get<1>() != 0)
{
std::cout << "Error encountered sending sync frames: " << strerror(syncStats.get<2>()) << std::endl;
}
// send 10 sync messages and no errors
std::cout << "Sent " << syncStats.get<0>() << " sync frames" << std::endl;
BOOST_CHECK(syncStats.get<0>() >= 10);
BOOST_CHECK(syncStats.get<1>() == 0);
// check the send stats
std::cout << "Sent " << sendStats.get<0>() << " data frames" << std::endl;
// send 5 event messages and no errors
BOOST_CHECK(sendStats.get<0>() == 20);
BOOST_CHECK(sendStats.get<1>() == 0);
// call free - this will correctly use the admin token (even though instance token
// is added by reserve call and updated URI inside with LB ID added to it
auto res2 = lbman.freeLB();
BOOST_CHECK(!res2.has_error());
// stop threads and exit
}
BOOST_AUTO_TEST_SUITE_END()