Skip to content

Commit

Permalink
Merge pull request #120 from MortezaBashsiz/morteza/issue_119
Browse files Browse the repository at this point in the history
Morteza/issue 119
  • Loading branch information
MortezaBashsiz authored Nov 5, 2024
2 parents a553ef9 + 1cfad8c commit ba58232
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 44 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ temp
build
.idea
/cmake-build-debug/
test
4 changes: 2 additions & 2 deletions core/src/agenthandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void AgentHandler::handle() {
Log::Level::DEBUG);

client_->doWrite(readBuffer_);
client_->doRead();
client_->doHandle();

if (client_->readBuffer().size() > 0) {
if (request_->httpType() != HTTP::HttpType::connect) {
Expand Down Expand Up @@ -146,7 +146,7 @@ void AgentHandler::continueRead() {
request_->genHttpRestPostReqString());
copyStringToStreambuf(newReq, readBuffer_);
client_->doWrite(readBuffer_);
client_->doRead();
client_->doHandle();
if (client_->readBuffer().size() > 0) {
if (request_->httpType() != HTTP::HttpType::connect) {
HTTP::pointer response =
Expand Down
2 changes: 1 addition & 1 deletion core/src/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ HTTP::HTTP(const HTTP &http)
HTTP::~HTTP() {}

bool HTTP::detectType() {
std::string requestStr(hexStreambufToStr(buffer_));
std::string requestStr{hexStreambufToStr(buffer_)};
std::string tmpStr;
unsigned short pos = 0;
tmpStr = requestStr.substr(pos, 2);
Expand Down
2 changes: 1 addition & 1 deletion core/src/runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void Runner::run() {

auto tcpServer = TCPServer::create(io_context_, config_, log_);

for (auto i = 0; i < config_->threads(); ++i) {
for (auto i = 0; i < config_->threads() * 50; ++i) {
threadPool_.emplace_back([this] { workerThread(); });
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/serverhandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void ServerHandler::handle() {
client_->doConnect(request_->dstIP(), request_->dstPort());
}
client_->doWrite(readBuffer_);
client_->doRead();
client_->doHandle();
end_ = client_->end_;
if (client_->readBuffer().size() > 0) {
BoolStr encryption{false, std::string("FAILED")};
Expand Down Expand Up @@ -163,7 +163,7 @@ void ServerHandler::handle() {

void ServerHandler::continueRead() {
std::lock_guard<std::mutex> lock(mutex_);
client_->doRead();
client_->doHandle();
end_ = client_->end_;
if (client_->readBuffer().size() > 0) {
BoolStr encryption{false, std::string("FAILED")};
Expand Down
80 changes: 57 additions & 23 deletions core/src/tcpclient.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "tcpclient.hpp"

#include <boost/asio/steady_timer.hpp>

TCPClient::TCPClient(boost::asio::io_context &io_context,
const std::shared_ptr<Config> &config,
Expand Down Expand Up @@ -93,32 +92,57 @@ void TCPClient::doWrite(boost::asio::streambuf &buffer) {
}

void TCPClient::doRead() {
boost::system::error_code error;
end_ = false;
std::lock_guard<std::mutex> lock(mutex_);
boost::asio::streambuf tempBuff;
bool isTlsRecord{false};
try {
if (!socket_.is_open()) {
log_->write("[" + to_string(uuid_) + "] [TCPClient doRead] Socket is not OPEN",
Log::Level::DEBUG);
return;
}
if (buffer_.size() >= config_->general().chunkSize) {
end_ = true;
return;
}

readBuffer_.consume(readBuffer_.size());
boost::system::error_code error;
resetTimeout();
if (config_->runMode() == RunMode::agent) {

resetTimeout();
boost::asio::read(socket_, tempBuff, boost::asio::transfer_at_least(1),
error);
cancelTimeout();
} else {
resetTimeout();
boost::asio::read(socket_, tempBuff, boost::asio::transfer_exactly(2),
error);
std::string bufStr{hexStreambufToStr(tempBuff)};
if (bufStr == "1034" || bufStr == "1503" || bufStr == "1603" || bufStr == "1703")
isTlsRecord = true;
cancelTimeout();
}

boost::asio::read(socket_, readBuffer_, boost::asio::transfer_at_least(1),
error);
cancelTimeout();
if (error == boost::asio::error::eof) {
log_->write("[" + to_string(uuid_) + "] [TCPClient doRead] [EOF] Connection closed by peer.",
Log::Level::TRACE);
socketShutdown();
return;
} else if (error) {
log_->write(
std::string("[" + to_string(uuid_) + "] [TCPClient doRead] [error] ") + error.message(),
Log::Level::ERROR);
socketShutdown();
return;
}

boost::asio::steady_timer timer(io_context_);
for (auto i = 0; i <= config_->general().repeatWait; i++) {
while (true) {
if (config_->runMode() == RunMode::server && readBuffer_.size() >= config_->general().chunkSize) {
break;
}
if (socket_.available() == 0) break;
resetTimeout();
boost::asio::read(socket_, readBuffer_,
boost::asio::read(socket_, tempBuff,
boost::asio::transfer_at_least(1), error);
cancelTimeout();
if (error == boost::asio::error::eof) {
Expand All @@ -136,27 +160,25 @@ void TCPClient::doRead() {
}
timer.expires_after(std::chrono::milliseconds(config_->general().timeWait));
timer.wait();
if (config_->runMode() == RunMode::server && readBuffer_.size() >= config_->general().chunkSize) {
break;
}
}

if (config_->runMode() == RunMode::server && socket_.available() == 0) {
if (config_->runMode() == RunMode::server && isTlsRecord) {
end_ = true;
}

if (readBuffer_.size() > 0) {
if (tempBuff.size() > 0) {
copyStreambuf(tempBuff, buffer_);
try {

log_->write("[" + to_string(uuid_) + "] [TCPClient doRead] [SRC " +
socket_.remote_endpoint().address().to_string() + ":" +
std::to_string(socket_.remote_endpoint().port()) +
"] [Bytes " + std::to_string(readBuffer_.size()) + "] ",
"] [Bytes " + std::to_string(tempBuff.size()) + "] ",
Log::Level::DEBUG);
log_->write("[" + to_string(uuid_) + "] [Read from] [SRC " +
socket_.remote_endpoint().address().to_string() + ":" +
std::to_string(socket_.remote_endpoint().port()) +
"] " + "[Bytes " + std::to_string(readBuffer_.size()) +
"] " + "[Bytes " + std::to_string(tempBuff.size()) +
"] ",
Log::Level::TRACE);
} catch (std::exception &error) {
Expand All @@ -178,12 +200,25 @@ void TCPClient::doRead() {
}
}

void TCPClient::resetTimeout() {
void TCPClient::doHandle() {
buffer_.consume(buffer_.size());
readBuffer_.consume(readBuffer_.size());
doRead();
std::string bufStr{hexStreambufToStr(buffer_)};
std::string tmpStr;
unsigned short pos = 0;
tmpStr = bufStr.substr(pos, 4);
if (tmpStr == "1703" || (tmpStr != "1403" && tmpStr != "1503" && tmpStr != "1603" && tmpStr != "1703")) {
while (socket_.available() > 0 && buffer_.size() < config_->general().chunkSize) {
doRead();
}
}
copyStreambuf(buffer_, readBuffer_);
}

void TCPClient::resetTimeout() {
if (!config_->general().timeout)
return;


timeout_.expires_from_now(boost::posix_time::seconds(config_->general().timeout));
timeout_.async_wait(boost::bind(&TCPClient::onTimeout,
shared_from_this(),
Expand All @@ -197,15 +232,14 @@ void TCPClient::cancelTimeout() {
}

void TCPClient::onTimeout(const boost::system::error_code &error) {

if (error || error == boost::asio::error::operation_aborted) return;


log_->write(
std::string("[" + to_string(uuid_) + "] [TCPClient onTimeout] [expiration] ") +
std::to_string(+config_->general().timeout) +
" seconds has passed, and the timeout has expired",
Log::Level::TRACE);
socketShutdown();
}

void TCPClient::socketShutdown() {
Expand Down
5 changes: 3 additions & 2 deletions core/src/tcpclient.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/bind/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
Expand Down Expand Up @@ -32,6 +33,7 @@ class TCPClient : public boost::enable_shared_from_this<TCPClient> {
bool doConnect(const std::string &dstIP, const unsigned short &dstPort);
void doWrite(boost::asio::streambuf &buffer);
void doRead();
void doHandle();
void socketShutdown();
boost::uuids::uuid uuid_;
bool end_;
Expand All @@ -48,8 +50,7 @@ class TCPClient : public boost::enable_shared_from_this<TCPClient> {
const std::shared_ptr<Log> &log_;
boost::asio::io_context &io_context_;
boost::asio::ip::tcp::socket socket_;
boost::asio::streambuf writeBuffer_;
boost::asio::streambuf readBuffer_;
boost::asio::streambuf buffer_, writeBuffer_, readBuffer_;
boost::asio::ip::tcp::resolver resolver_;
boost::asio::deadline_timer timeout_;
mutable std::mutex mutex_;
Expand Down
22 changes: 14 additions & 8 deletions core/src/tcpserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ TCPServer::TCPServer(boost::asio::io_context &io_context,
acceptor_(io_context,
boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(config->listenIp()),
config->listenPort())) {
acceptor_.set_option(boost::asio::socket_base::reuse_address(true));
startAccept();
}

void TCPServer::startAccept() {
auto client = TCPClient::create(io_context_, config_, log_);
auto connection = TCPConnection::create(io_context_, config_, log_, client);

acceptor_.set_option(boost::asio::socket_base::reuse_address(true));
acceptor_.async_accept(
connection->socket(),
[this, connection](const boost::system::error_code &error) {
Expand All @@ -27,13 +26,20 @@ void TCPServer::startAccept() {

void TCPServer::handleAccept(TCPConnection::pointer connection,
const boost::system::error_code &error) {
if (!error) {

connection->start();
} else {

log_->write("[TCPServer handleAccept] " + std::string(error.message()),
try {
if (!error) {
connection->start();
} else {
log_->write("[TCPServer handleAccept] Error: " + error.message(),
Log::Level::ERROR);
}
} catch (const std::exception &ex) {
log_->write("[TCPServer handleAccept] Exception: " + std::string(ex.what()),
Log::Level::ERROR);
} catch (...) {
log_->write("[TCPServer handleAccept] Unknown exception",
Log::Level::ERROR);
}

startAccept();
}
10 changes: 5 additions & 5 deletions nipovpn/etc/nipovpn/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,34 @@ general:
# timeWait: unsigned int(0-4,294,967,295)
# Defines the time wait between each repeat to read from socket.
# This directive will be useful when you expect to read very long stream from socket
timeWait: 1
timeWait: 5
# timeout: unsigned short
# Defines the timeout for I/O Operation in seconds. 0 indicates no timeout.
# Useful to automatically close stalled connections.
timeout: 10
# repeatWait: unsigned short(1-65,635)
# Defines the loop count which will try to repeat read from socket.
# Same as timeWait
repeatWait: 3
repeatWait: 10
# chunkHeader: String
# Defines the chunk header that you want to inform Agent/Server if it is end or not.
chunkHeader: "END"
# chunkSize: unsigned short(512-65,535)
# Defines the chunk size that you want to read from socket.
chunkSize: 4096
chunkSize: 8152

# This block is to define log directives
log:
# logLevel: "INFO|TRACE|DEBUG"
logLevel: "TRACE"
logLevel: "INFO"
# logFile: "/var/log/nipo/nipo.log"
# Path of log file
logFile: "/var/log/nipovpn/nipovpn.log"

# This block is to define server directives
server:
# threads: number of threads in server
threads: 16
threads: 8
# listenIp: which must be IPv4 which is used on server
listenIp: "0.0.0.0"
# listenPort: the port number which will be used on server
Expand Down

0 comments on commit ba58232

Please sign in to comment.