-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathBroker.h
78 lines (65 loc) · 1.89 KB
/
Broker.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#pragma once
// https://rfc.zeromq.org/spec:7/MDP/
#include <zmqpp/zmqpp.hpp>
#include <chrono>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <type_traits>
#include <vector>
#include "BrokerTasks.h"
#include "MDP.h"
#include "WorkerPool.h"
#include "ZMQBrokerContext.h"
#include "ZMQIdentity.h"
struct Broker
{
static
constexpr std::chrono::milliseconds timeout = MutualHeartbeatMonitor::period;
using MessageHandle = MDP::MessageHandle;
using ZMQContext = ZMQBrokerContext;
using ZMQContextHandle = std::unique_ptr<ZMQContext>;
enum class Tag
{
ClientRequest,
ClientReply,
WorkerReady,
WorkerRequest,
WorkerReply,
WorkerHeartbeat,
WorkerDisconnect,
Unsupported
};
template <Tag value>
struct Tagged
{
static constexpr Tag tag = value;
MessageHandle handle;
Tagged() {}
Tagged(MessageHandle h): handle{std::move(h)} {}
Tagged(zmqpp::message m): handle{MDP::makeMessageHandle(std::move(m))} {}
};
void exec(const std::string &address);
private:
ZMQContextHandle zmqContextHandle_;
WorkerPool workerPool_;
BrokerTasks brokerTasks_;
void onMessage(MessageHandle);
void onClientMessage(MessageHandle);
void onWorkerMessage(MessageHandle);
/* Client */
void dispatch(Tagged<Tag::ClientRequest>);
void dispatch(Tagged<Tag::ClientReply>);
/* Worker */
void dispatch(Tagged<Tag::WorkerReady>);
void dispatch(Tagged<Tag::WorkerRequest>, WorkerPool::Worker &, ZMQIdentity clientIdentity);
void dispatch(Tagged<Tag::WorkerReply>);
void dispatch(Tagged<Tag::WorkerHeartbeat>);
void dispatch(Tagged<Tag::WorkerDisconnect>);
/* Misc */
void checkExpired();
void purge(ZMQIdentity);
void sendHeartbeatIfNeeded();
void dispatch(Tagged<Tag::Unsupported>);
};