-
Notifications
You must be signed in to change notification settings - Fork 10
/
consumer_demo.cc
83 lines (69 loc) · 2.35 KB
/
consumer_demo.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#include <unistd.h>
#include <string>
#include "PolarisManager.h"
#include "workflow/WFFacilities.h"
using namespace polaris;
WFFacilities::WaitGroup wait_group(1);
int main(int argc, char *argv[])
{
if (argc != 5 && argc != 7) {
fprintf(stderr, "USAGE:\n\t%s <polaris_cluster> <namespace> <service_name>"
" [<platform_id> <platform_token>] <query URL>\n\n"
"QUERY URL FORMAT:\n"
"\thttp://callee_service_namespace.callee_service_name:port"
"#k1=v1&caller_service_namespace.caller_service_name\n\n"
"EXAMPLE:\n\t%s http://127.0.0.1:8090 "
"default workflow.polaris.service.b "
"\"http://default.workflow.polaris.service.b:8080"
"#k1_env=v1_base&k2_number=v2_prime&a_namespace.a\"\n\n",
argv[0], argv[0]);
exit(1);
}
std::string polaris_url = argv[1];
std::string service_namespace = argv[2];
std::string service_name = argv[3];
std::string platform_id;
std::string platform_token;
const char *query_url;
if (argc == 5)
query_url = argv[4];
else
{
platform_id = argv[4];
platform_token = argv[5];
query_url = argv[6];
}
if (strncasecmp(argv[1], "http://", 7) != 0 &&
strncasecmp(argv[1], "https://", 8) != 0) {
polaris_url = "http://" + polaris_url;
}
// 1. Construct PolarisManager.
// PolarisManager mgr(polaris_url);
PolarisManager mgr(polaris_url, platform_id, platform_token, ""/* .yaml */);
// 2. Watch a service, will fill Workflow with instances automatically.
int ret = mgr.watch_service(service_namespace, service_name);
fprintf(stderr, "Watch %s %s ret=%d.\n", service_namespace.c_str(),
service_name.c_str(), ret);
if (ret < 0)
{
fprintf(stderr, "Watch failed. error=%d\n", mgr.get_error());
return 0;
}
// 3. Use Workflow as usual. Tasks will be send to the right instances.
fprintf(stderr, "Query URL : %s\n", query_url);
WFHttpTask *task = WFTaskFactory::create_http_task(query_url,
3, /* REDIRECT_MAX */
5, /* RETRY_MAX */
[](WFHttpTask *task) {
fprintf(stderr, "Query callback. state = %d error = %d\n",
task->get_state(), task->get_error());
wait_group.done();
});
task->start();
wait_group.wait();
// 4. Unwatch the service when existing the progress.
ret = mgr.unwatch_service(service_namespace, service_name);
fprintf(stderr, "Unwatch %s %s ret=%d.\n", service_namespace.c_str(),
service_name.c_str(), ret);
return 0;
}