Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

modification for M1 chip #37

Open
wants to merge 10 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,6 @@ _build/

# environment
.env
.idea/
cmake-build-debug/
log_*.txt
16 changes: 14 additions & 2 deletions Networking/CryptoPlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,18 @@ CryptoPlayer::CryptoPlayer(const Names& Nms, const string& id_base) :
receivers[i] = 0;
continue;
}
if (N.get_name(i).empty()) continue;

senders[i] = new Sender<ssl_socket*>(i < my_num() ? sockets[i] : other_sockets[i]);
receivers[i] = new Receiver<ssl_socket*>(i < my_num() ? other_sockets[i] : sockets[i]);
// senders[i] = new Sender<ssl_socket*>(i < my_num() ? sockets[i] : other_sockets[i]);
// receivers[i] = new Receiver<ssl_socket*>(i < my_num() ? other_sockets[i] : sockets[i]);
senders[i] = new Sender<ssl_socket*>(i < my_num() ? sockets[i] : other_sockets[i], "P" + to_string(my_num()), "P" + to_string(i));
receivers[i] = new Receiver<ssl_socket*>(i < my_num() ? other_sockets[i] : sockets[i], "P" + to_string(i), "P" + to_string(my_num()));
}
}

void CryptoPlayer::connect(int i, vector<int>* plaintext_sockets)
{
if (N.get_name(i).empty()) return;
sockets[i] = new ssl_socket(io_service, ctx, plaintext_sockets[0][i],
"P" + to_string(i), "P" + to_string(my_num()), i < my_num());
other_sockets[i] = new ssl_socket(io_service, ctx, plaintext_sockets[1][i],
Expand Down Expand Up @@ -187,6 +191,8 @@ void CryptoPlayer::send_receive_all_no_stats(const vector<vector<bool>>& channel
for (int offset = 1; offset < num_players(); offset++)
{
int other = get_player(offset);
if (N.get_name(other).empty()) continue;
// TODO: send to offline nodes
bool receive = channels[other][my_num()];
if (channels[my_num()][other])
this->senders[other]->request(to_send[other]);
Expand All @@ -196,6 +202,8 @@ void CryptoPlayer::send_receive_all_no_stats(const vector<vector<bool>>& channel
for (int offset = 1; offset < num_players(); offset++)
{
int other = get_player(offset);
if (N.get_name(other).empty()) continue;
// TODO: send to offline nodes
bool receive = channels[other][my_num()];
if (channels[my_num()][other])
this->senders[other]->wait(to_send[other]);
Expand All @@ -212,6 +220,7 @@ void CryptoPlayer::partial_broadcast(const vector<bool>& my_senders,
for (int offset = 1; offset < num_players(); offset++)
{
int other = get_player(offset);
if (N.get_name(other).empty()) continue;
bool receive = my_senders[other];
if (my_receivers[other])
{
Expand All @@ -224,6 +233,7 @@ void CryptoPlayer::partial_broadcast(const vector<bool>& my_senders,
for (int offset = 1; offset < num_players(); offset++)
{
int other = get_player(offset);
if (N.get_name(other).empty()) continue;
bool receive = my_senders[other];
if (my_receivers[other])
this->senders[other]->wait(os[my_num()]);
Expand All @@ -237,13 +247,15 @@ void CryptoPlayer::Broadcast_Receive_no_stats(vector<octetStream>& os) const
for (int offset = 1; offset < num_players(); offset++)
{
int other = get_player(offset);
if (N.get_name(other).empty()) continue;
this->senders[other]->request(os[my_num()]);
receivers[other]->request(os[other]);
}

for (int offset = 1; offset < num_players(); offset++)
{
int other = get_player(offset);
if (N.get_name(other).empty()) continue;
this->senders[other]->wait(os[my_num()]);
receivers[other]->wait(os[other]);
}
Expand Down
11 changes: 8 additions & 3 deletions Networking/Player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ void PlainPlayer::setup_sockets(const vector<string>& names,
sockets.resize(nplayers);
// Set up the client side
for (int i=player_no; i<nplayers; i++) {
if (names[i].empty()) continue;
auto pn=id_base+"P"+to_string(player_no);
if (i==player_no) {
const char* localhost = "127.0.0.1";
Expand All @@ -301,6 +302,7 @@ void PlainPlayer::setup_sockets(const vector<string>& names,
send_to_self_socket = sockets[player_no];
// Setting up the server side
for (int i=0; i<=player_no; i++) {
if (names[i].empty()) continue;
auto id=id_base+"P"+to_string(i);
#ifdef DEBUG_NETWORKING
fprintf(stderr,
Expand All @@ -311,6 +313,7 @@ void PlainPlayer::setup_sockets(const vector<string>& names,
}

for (int i = 0; i < nplayers; i++) {
if (names[i].empty()) continue;
// timeout of 5 minutes
struct timeval tv;
tv.tv_sec = 300;
Expand Down Expand Up @@ -359,7 +362,7 @@ void Player::send_all(const octetStream& o) const
{
TimeScope ts(comm_stats["Sending to all"].add(o));
for (int i=0; i<nplayers; i++)
{ if (i!=player_no)
{ if (i!=player_no && !N.names[i].empty())
send_to_no_stats(i, o);
}
sent += o.get_length() * (num_players() - 1);
Expand Down Expand Up @@ -479,7 +482,9 @@ void MultiPlayer<T>::Broadcast_Receive_no_stats(vector<octetStream>& o) const
for (int i=1; i<nplayers; i++)
{
int send_to = (my_num() + i) % num_players();
while (!N.get_name(send_to).empty()) send_to = (send_to + 1) % num_players();
int receive_from = (my_num() + num_players() - i) % num_players();
while (!N.get_name(receive_from).empty()) receive_from = (receive_from - 1) % num_players();
exchangers.push_back({sockets[send_to], o[my_num()], sockets[receive_from], o[receive_from]});
}

Expand All @@ -503,7 +508,7 @@ void Player::Broadcast_Receive(vector<octetStream>& o) const
{
unchecked_broadcast(o);
{ for (int i=0; i<nplayers; i++)
{ hash_update(&ctx,o[i].get_data(),o[i].get_length()); }
if (!N.get_name(i).empty()) { hash_update(&ctx,o[i].get_data(),o[i].get_length()); }
}
}

Expand All @@ -517,7 +522,7 @@ void Player::Check_Broadcast() const

unchecked_broadcast(h);
for (int i=0; i<nplayers; i++)
{ if (i!=player_no)
{ if (i!=player_no && !N.get_name(i).empty())
{ if (!h[i].equals(h[player_no]))
{ throw broadcast_invalid(); }
}
Expand Down
9 changes: 8 additions & 1 deletion Networking/Receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ Receiver<T>::Receiver(T socket) : socket(socket), thread(0)
start();
}

template<class T>
Receiver<T>::Receiver(T socket, string sender, string receiver) : socket(socket), sender(sender), receiver(receiver), cnt(0), thread(0)
{
start();
}

template<class T>
Receiver<T>::~Receiver()
{
Expand Down Expand Up @@ -55,7 +61,8 @@ void Receiver<T>::run()
timer.start();
RunningTimer mytimer;
#endif
os->Receive(socket);
if (sender.empty()) os->Receive(socket);
else os->Receive(sender, receiver, ++cnt);
#ifdef VERBOSE_SSL
cout << "receiving " << os->get_length() * 1e-6 << " MB on " << socket
<< " took " << mytimer.elapsed() << ", total "
Expand Down
4 changes: 4 additions & 0 deletions Networking/Receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ template<class T>
class Receiver
{
T socket;
string sender;
string receiver;
int cnt;
WaitQueue<octetStream*> in;
WaitQueue<octetStream*> out;
pthread_t thread;
Expand All @@ -33,6 +36,7 @@ class Receiver
Timer timer;

Receiver(T socket);
Receiver(T socket, string sender, string receiver);
~Receiver();

void request(octetStream& os);
Expand Down
9 changes: 8 additions & 1 deletion Networking/Sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ Sender<T>::Sender(T socket) : socket(socket), thread(0)
start();
}

template<class T>
Sender<T>::Sender(T socket, string sender, string receiver) : socket(socket), sender(sender), receiver(receiver), cnt(0), thread(0)
{
start();
}

template<class T>
Sender<T>::~Sender()
{
Expand Down Expand Up @@ -51,7 +57,8 @@ void Sender<T>::run()
timer.start();
RunningTimer mytimer;
#endif
os->Send(socket);
if (sender.empty()) os->Send(socket);
else os->Send(sender, receiver, ++cnt);
#ifdef VERBOSE_SSL
cout << "sending " << os->get_length() * 1e-6 << " MB on " << socket
<< " took " << mytimer.elapsed() << ", total "
Expand Down
4 changes: 4 additions & 0 deletions Networking/Sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ template<class T>
class Sender
{
T socket;
string sender;
string receiver;
int cnt;
WaitQueue<const octetStream*> in;
WaitQueue<const octetStream*> out;
pthread_t thread;
Expand All @@ -33,6 +36,7 @@ class Sender
Timer timer;

Sender(T socket);
Sender(T socket, string sender, string receiver);
~Sender();

void request(const octetStream& os);
Expand Down
8 changes: 5 additions & 3 deletions Networking/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ void Server::send_names()
addresses.store(ports);
for (int i=0; i<nmachines; i++)
{
addresses.Send(socket_num[i]);
if (socket_num[i] != 0)
addresses.Send(socket_num[i]);
}
}

Expand Down Expand Up @@ -131,13 +132,14 @@ void Server::start()
}

// get names
for (i=0; i<nmachines; i++)
get_name(i);
for (i=0; i<nmachines; i++) if (socket_num[i] != 0)
get_name(i);

// check setup, party 0 doesn't matter
bool all_on_local = true, none_on_local = true;
for (i = 1; i < nmachines; i++)
{
if (names[i].empty()) continue;
bool on_local = names[i].compare("127.0.0.1");
all_on_local &= on_local;
none_on_local &= not on_local;
Expand Down
7 changes: 6 additions & 1 deletion Networking/ServerSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,12 @@ int ServerSocket::get_connection_socket(const string& id)
while (clients.find(id) == clients.end())
{
if (data_signal.wait(60) == ETIMEDOUT)
throw runtime_error("No client after one minute");
// throw runtime_error("No client after one minute");

{
cerr << id << " is not present after one minute" << endl;
break;
}
}

int client_socket = clients[id];
Expand Down
72 changes: 72 additions & 0 deletions Networking/colink-vt-dummy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#ifndef COLINK_VT_DUMMY_H
#define COLINK_VT_DUMMY_H

#include <random>
#include <iostream>
#include <fstream>
#include <iomanip>
#include <string>
#include <vector>
#include <chrono>
#include <thread>
#include "picosha2.h"

using namespace std;

// dummy helper functions
void _save(const string &key, const vector<uint8_t> &payload)
{
// preprocess the key to be fixed length
std::string hash = "";
picosha2::hash256_hex_string(key.begin(), key.end(), hash); // hash now has length=64
// next, store it in a temporary file
std::ofstream ofs;
ofs.open(hash + ".comm.bin", std::ofstream::out | std::ofstream::trunc);
ofs.write((char *)payload.data(), payload.size());
ofs.close();
}
vector<uint8_t> _load(const string &key)
{
// preprocess the key to be fixed length
std::string hash = "";
picosha2::hash256_hex_string(key.begin(), key.end(), hash); // hash now has length=64
// next, check if the file exists and wait until it exists
while (true)
{
std::ifstream ifs;
ifs.open(hash + ".comm.bin", std::ifstream::in);
if (ifs.good())
{
ifs.seekg(0, std::ios::end);
size_t size = ifs.tellg();
vector<uint8_t> payload(size);
ifs.seekg(0, std::ios::beg);
ifs.read((char *)payload.data(), size);
ifs.close();
return payload;
}
ifs.close();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
// exposed dummy functions
void set_variable(const string &task_id,
const string &key,
const vector<uint8_t> &payload,
const string &sender,
const vector<string> &receivers)
{
for (auto &receiver : receivers)
{
_save(task_id + "." + sender + "." + receiver + "." + key, payload);
}
}
vector<uint8_t> get_variable(const string &task_id,
const string &key,
const string &sender,
const string &receiver)
{
return _load(task_id + "." + sender + "." + receiver + "." + key);
}

#endif // COLINK_VT_DUMMY_H
30 changes: 30 additions & 0 deletions Networking/colink-vt-dummy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#ifndef COLINK_VT_DUMMY_H
#define COLINK_VT_DUMMY_H

#include <random>
#include <iostream>
#include <fstream>
#include <iomanip>
#include <string>
#include <vector>
#include <chrono>
#include <thread>
#include "picosha2.h"

using namespace std;

// dummy helper functions
void _save(const string &key, const vector<uint8_t> &payload);
vector<uint8_t> _load(const string &key);
// exposed dummy functions
void set_variable(const string &task_id,
const string &key,
const vector<uint8_t> &payload,
const string &sender,
const vector<string> &receivers);
vector<uint8_t> get_variable(const string &task_id,
const string &key,
const string &sender,
const string &receiver);

#endif // COLINK_VT_DUMMY_H
Loading