Skip to content

Commit

Permalink
Merge pull request #117 from DuncanUszkay1/mutual-connection
Browse files Browse the repository at this point in the history
Allow for mutual connections [WIP]
  • Loading branch information
DuncanUszkay1 authored Jan 6, 2020
2 parents 76be9a9 + 63447c9 commit 275e855
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 72 deletions.
18 changes: 17 additions & 1 deletion src/interfaces/patchwork.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use super::map::Peer;
use super::map::{Peer, PeerConnection};
use super::packet::Packet;
use std::sync::mpsc::Sender;
use uuid::Uuid;

pub trait PatchworkState {
fn new_map(&self, peer: Peer);
fn route_player_packet(&self, packet: Packet, conn_id: Uuid);
fn connect_map(&self, map_index: usize, conn_id: PeerConnection);
fn report(&self);
}

Expand All @@ -15,6 +16,14 @@ impl PatchworkState for Sender<PatchworkStateOperations> {
.unwrap();
}

fn connect_map(&self, map_index: usize, peer_connection: PeerConnection) {
self.send(PatchworkStateOperations::ConnectMap(ConnectMapMessage {
map_index,
peer_connection,
}))
.unwrap();
}

fn route_player_packet(&self, packet: Packet, conn_id: Uuid) {
self.send(PatchworkStateOperations::RoutePlayerPacket(RouteMessage {
packet,
Expand All @@ -31,6 +40,7 @@ impl PatchworkState for Sender<PatchworkStateOperations> {
pub enum PatchworkStateOperations {
New(NewMapMessage),
RoutePlayerPacket(RouteMessage),
ConnectMap(ConnectMapMessage),
Report,
}

Expand All @@ -39,6 +49,12 @@ pub struct NewMapMessage {
pub peer: Peer,
}

#[derive(Debug)]
pub struct ConnectMapMessage {
pub map_index: usize,
pub peer_connection: PeerConnection,
}

#[derive(Debug, Clone)]
pub struct RouteMessage {
pub packet: Packet,
Expand Down
94 changes: 45 additions & 49 deletions src/models/map.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use super::interfaces::messenger::Messenger;
use super::interfaces::packet_processor::PacketProcessor;
use super::interfaces::patchwork::PatchworkState;
use super::packet::{Handshake, Packet};
use super::server;
use super::translation::TranslationUpdates;
use std::io;

use std::net::TcpStream;
use std::thread;
use uuid::Uuid;

Expand Down Expand Up @@ -35,6 +37,7 @@ pub struct Position {
impl Map {
pub fn report<M: Messenger>(&self, messenger: M) {
if let Some(peer_connection) = &self.peer_connection {
trace!("Reporting map {:?}", self);
messenger.send_packet(
peer_connection.conn_id,
Packet::Handshake(Handshake {
Expand All @@ -58,63 +61,56 @@ impl Map {
pub fn connect<
M: 'static + Messenger + Clone + Send,
PP: 'static + PacketProcessor + Clone + Send,
PA: 'static + PatchworkState + Clone + Send,
>(
&self,
messenger: M,
inbound_packet_processor: PP,
peer: Peer,
) -> Result<Map, io::Error> {
patchwork_state: PA,
map_index: usize,
) {
let conn_id = Uuid::new_v4();
let stream = server::new_connection(peer.address.clone(), peer.port)?;
messenger.new_connection(conn_id, stream.try_clone().unwrap());
inbound_packet_processor.set_translation_data(
conn_id,
vec![
TranslationUpdates::State(5),
TranslationUpdates::EntityIdBlock(self.entity_id_block),
TranslationUpdates::XOrigin(self.position.x),
],
);
let translation_updates = vec![
TranslationUpdates::State(5),
TranslationUpdates::EntityIdBlock(self.entity_id_block),
TranslationUpdates::XOrigin(self.position.x),
];
let peer_clone = peer.clone();
let on_connection = move |stream: TcpStream| {
messenger.new_connection(conn_id, stream.try_clone().unwrap());
inbound_packet_processor.set_translation_data(conn_id, translation_updates);

let messenger_clone = messenger.clone();
let inbound_packet_processor_clone = inbound_packet_processor.clone();
thread::spawn(move || {
server::handle_connection(
stream.try_clone().unwrap(),
inbound_packet_processor_clone,
messenger_clone,
let messenger_clone = messenger.clone();
let inbound_packet_processor_clone = inbound_packet_processor.clone();
thread::spawn(move || {
server::handle_connection(
stream.try_clone().unwrap(),
inbound_packet_processor_clone,
messenger_clone,
conn_id,
|| {},
);
});
messenger.send_packet(
conn_id,
|| {},
Packet::Handshake(Handshake {
protocol_version: 404,
server_address: String::from(""),
server_port: 0,
next_state: 6,
}),
);
patchwork_state.connect_map(
map_index,
PeerConnection {
peer: peer_clone,
conn_id,
},
);
});
let map = Map {
peer_connection: Some(PeerConnection { peer, conn_id }),
position: self.position,
entity_id_block: self.entity_id_block,
};
messenger.send_packet(
conn_id,
Packet::Handshake(Handshake {
protocol_version: 404,
server_address: String::from(""),
server_port: 0,
next_state: 6,
}),
);

//we send two packets because our protocol requires at least two packets to be sent
//before it can do anything- the first is a handshake, then the second one it can
//actually response to (it ignores the type of packet, so we just send it random data)
//to be changed later with a real request packet
messenger.send_packet(
conn_id,
Packet::Handshake(Handshake {
protocol_version: 404,
server_address: String::from(""),
server_port: 0,
next_state: 6,
}),
);
Ok(map)
thread::spawn(move || {
server::wait_for_connection(peer.address.clone(), peer.port, on_connection);
});
}
}
2 changes: 2 additions & 0 deletions src/packet_handlers/peer_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub fn handle_subscriber_packet<M: Messenger, P: PlayerState, B: BlockState>(
//Everytime a subscriber sends us a packet, we subscribe them to our messages and report our
//state to them

trace!("Reporting state to peer");

messenger.subscribe(conn_id, SubscriberType::LocalOnly);
player_state.report(conn_id);
block_state.report(conn_id);
Expand Down
23 changes: 23 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use std::io::ErrorKind::UnexpectedEof;
use std::io::{Cursor, Error, Read};
use std::net::{TcpListener, TcpStream};
use std::thread;
use std::thread::sleep;
use std::time;

use uuid::Uuid;

Expand Down Expand Up @@ -79,6 +81,27 @@ pub fn handle_connection<M: Messenger, PP: PacketProcessor, F: Fn()>(
}
}

//Just doing a simple linear backoff for now, probably want something a little more sophisticated
//eventually
pub fn wait_for_connection<F: FnOnce(TcpStream)>(
peer_address: String,
peer_port: u16,
on_connection: F,
) {
let backoff = 1;
loop {
if let Ok(connection) = new_connection(peer_address.clone(), peer_port) {
trace!("Connection Established");
on_connection(connection);
break;
} else {
let backoff = if backoff < 10 { backoff + 1 } else { backoff };
trace!("Failed to connect- retrying in {:?}s", backoff);
sleep(time::Duration::from_secs(backoff));
}
}
}

pub fn new_connection(peer_address: String, peer_port: u16) -> Result<TcpStream, Error> {
let peer_info = format!("{}:{}", peer_address, peer_port.to_string());
TcpStream::connect(peer_info)
Expand Down
8 changes: 6 additions & 2 deletions src/services/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::interfaces::messenger::Messenger;
use super::minecraft_types::ChunkSection;
use super::packet::{ChunkData, Packet};

use std::sync::mpsc::Receiver;
use std::sync::mpsc::{Receiver, Sender};

// We don't really have any meaningful block state yet- it cannot be changed or be particularly
// complicated. We can build this up later
Expand All @@ -24,7 +24,11 @@ fn fill_dummy_block_ids(ids: &mut Vec<i32>) {
}
}

pub fn start<M: Messenger>(receiver: Receiver<BlockStateOperations>, messenger: M) {
pub fn start<M: Messenger>(
receiver: Receiver<BlockStateOperations>,
_sender: Sender<BlockStateOperations>,
messenger: M,
) {
while let Ok(msg) = receiver.recv() {
match msg {
BlockStateOperations::Report(msg) => {
Expand Down
3 changes: 2 additions & 1 deletion src/services/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::interfaces::packet_processor::PacketProcessor;
use super::interfaces::patchwork::PatchworkState;
use super::interfaces::player::PlayerState;

use std::sync::mpsc::Receiver;
use std::sync::mpsc::{Receiver, Sender};

pub fn start<
M: Messenger + Clone,
Expand All @@ -13,6 +13,7 @@ pub fn start<
PP: 'static + PacketProcessor + Clone + Send,
>(
receiver: Receiver<ConnectionOperations>,
_sender: Sender<ConnectionOperations>,
messenger: M,
player_state: P,
_patchwork_state: PA,
Expand Down
3 changes: 2 additions & 1 deletion src/services/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ macro_rules! define_services {
$(
paste::expr! {
$(let [<$dependency _clone>] = $dependency.sender(););*
let sender = $service_instance.sender();
let receiver = $service_instance.receiver();
thread::spawn(move || $service(receiver, $({[<$dependency _clone>]}),*));
thread::spawn(move || $service(receiver, sender, $({[<$dependency _clone>]}),*));
}
)*
)
Expand Down
4 changes: 2 additions & 2 deletions src/services/keep_alive.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use super::interfaces::messenger::Messenger;
use super::packet::{KeepAlive, Packet};
use std::sync::mpsc::Receiver;
use std::sync::mpsc::{Receiver, Sender};
use std::thread::sleep;
use std::time;

const KEEP_ALIVE_PERIOD: u64 = 15;
const KEEP_ALIVE_VALUE: i64 = 16;

pub fn start<M: Messenger>(_: Receiver<i32>, messenger: M) {
pub fn start<M: Messenger>(_: Receiver<i32>, _: Sender<i32>, messenger: M) {
loop {
sleep(time::Duration::from_secs(KEEP_ALIVE_PERIOD));
messenger.broadcast_packet(
Expand Down
4 changes: 2 additions & 2 deletions src/services/messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use super::packet::{translate_outgoing, write};
use super::translation::TranslationInfo;
use std::collections::{HashMap, HashSet};
use std::net::TcpStream;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::{Receiver, Sender};
use uuid::Uuid;

pub fn start(receiver: Receiver<MessengerOperations>) {
pub fn start(receiver: Receiver<MessengerOperations>, _sender: Sender<MessengerOperations>) {
let mut connection_map = HashMap::<Uuid, TcpStream>::new();
let mut local_only_broadcast_list = HashSet::<Uuid>::new();
let mut all_broadcast_list = HashSet::<Uuid>::new();
Expand Down
3 changes: 2 additions & 1 deletion src/services/packet_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::packet_handlers::packet_router;
use super::translation::{TranslationInfo, TranslationUpdates};
use std::collections::HashMap;

use std::sync::mpsc::Receiver;
use std::sync::mpsc::{Receiver, Sender};
use uuid::Uuid;

pub fn start_inbound<
Expand All @@ -19,6 +19,7 @@ pub fn start_inbound<
B: BlockState + Clone,
>(
receiver: Receiver<PacketProcessorOperations>,
_sender: Sender<PacketProcessorOperations>,
messenger: M,
player_state: P,
block_state: B,
Expand Down
30 changes: 24 additions & 6 deletions src/services/patchwork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use super::interfaces::messenger::Messenger;
use super::interfaces::packet_processor::PacketProcessor;
use super::interfaces::patchwork::PatchworkStateOperations;
use super::interfaces::player::PlayerState;
use super::map::{Map, Peer, Position};
use super::map::{Map, Peer, PeerConnection, Position};
use super::packet;
use super::packet::Packet;
use super::packet_handlers::gameplay_router;
use super::server;

use std::collections::HashMap;
use std::io;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::{Receiver, Sender};

use uuid::Uuid;

Expand All @@ -20,6 +20,7 @@ pub fn start<
PP: 'static + PacketProcessor + Clone + Send,
>(
receiver: Receiver<PatchworkStateOperations>,
sender: Sender<PatchworkStateOperations>,
messenger: M,
inbound_packet_processor: PP,
player_state: P,
Expand All @@ -34,8 +35,12 @@ pub fn start<
msg.peer,
messenger.clone(),
inbound_packet_processor.clone(),
sender.clone(),
)
}
PatchworkStateOperations::ConnectMap(msg) => {
patchwork.connect_map(msg.map_index, msg.peer_connection, messenger.clone());
}
PatchworkStateOperations::RoutePlayerPacket(msg) => {
let patchwork_clone = patchwork.clone();
let anchor = patchwork
Expand Down Expand Up @@ -172,6 +177,16 @@ impl Patchwork {
.expect("Could not find map for position")
}

pub fn connect_map<M: Messenger + Clone>(
&mut self,
map_index: usize,
peer_connection: PeerConnection,
messenger: M,
) {
self.maps[map_index].peer_connection = Some(peer_connection);
self.maps[map_index].report(messenger.clone());
}

pub fn add_peer_map<
M: 'static + Messenger + Send + Clone,
PP: 'static + PacketProcessor + Send + Clone,
Expand All @@ -180,14 +195,17 @@ impl Patchwork {
peer: Peer,
messenger: M,
inbound_packet_processor: PP,
patchwork_state: Sender<PatchworkStateOperations>,
) {
if let Ok(map) = Map::new(self.next_position(), self.next_entity_id_block()).connect(
let map = Map::new(self.next_position(), self.next_entity_id_block());
self.maps.push(map.clone());
map.connect(
messenger,
inbound_packet_processor,
peer,
) {
self.maps.push(map);
}
patchwork_state,
self.maps.len() - 1,
);
}

pub fn report<M: Messenger + Clone>(self, messenger: M) {
Expand Down
Loading

0 comments on commit 275e855

Please sign in to comment.