1
0
Fork 0
mirror of https://codeberg.org/demostf/sync.git synced 2026-06-03 16:44:07 +02:00

better peer id

This commit is contained in:
Robin Appelman 2024-11-29 20:11:28 +01:00
commit 659cc0a1f1
4 changed files with 133 additions and 32 deletions

59
Cargo.lock generated
View file

@ -96,6 +96,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "comma-separated"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ac26892eaac40bd0a28b3a1ea93da165ef30f8ffbc3ac6fea430daf7091de58"
[[package]] [[package]]
name = "console" name = "console"
version = "0.15.8" version = "0.15.8"
@ -190,6 +196,12 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "either"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0"
[[package]] [[package]]
name = "encode_unicode" name = "encode_unicode"
version = "0.3.6" version = "0.3.6"
@ -526,6 +538,21 @@ dependencies = [
"icu_properties", "icu_properties",
] ]
[[package]]
name = "ipnet"
version = "2.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708"
[[package]]
name = "itertools"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186"
dependencies = [
"either",
]
[[package]] [[package]]
name = "itoa" name = "itoa"
version = "1.0.13" version = "1.0.13"
@ -806,6 +833,19 @@ dependencies = [
"getrandom", "getrandom",
] ]
[[package]]
name = "real-ip"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffb010e63dcf1e1360c7b99830d96e56aa37abe14b98c56b0eee492f1ffe091d"
dependencies = [
"comma-separated",
"http",
"ipnet",
"itertools",
"rfc7239",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.5.7" version = "0.5.7"
@ -815,6 +855,15 @@ dependencies = [
"bitflags", "bitflags",
] ]
[[package]]
name = "rfc7239"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c681391a1de47aad1d2bbd57074cbb36b2f230192e96504480493fbfc728779"
dependencies = [
"uncased",
]
[[package]] [[package]]
name = "rustc-demangle" name = "rustc-demangle"
version = "0.1.24" version = "0.1.24"
@ -1005,6 +1054,7 @@ dependencies = [
"main_error", "main_error",
"maplit", "maplit",
"portpicker", "portpicker",
"real-ip",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
@ -1221,6 +1271,15 @@ version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "uncased"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697"
dependencies = [
"version_check",
]
[[package]] [[package]]
name = "unicode-ident" name = "unicode-ident"
version = "1.0.14" version = "1.0.14"

View file

@ -16,6 +16,7 @@ main_error = "0.1.2"
futures-channel = "0.3.31" futures-channel = "0.3.31"
log = "0.4.22" log = "0.4.22"
futures-util = "0.3.31" futures-util = "0.3.31"
real-ip = "0.1.0"
[dev-dependencies] [dev-dependencies]
maplit = "1" maplit = "1"

View file

@ -1,6 +1,7 @@
mod session; mod session;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use crate::session::Session; use crate::session::Session;
use dashmap::DashMap; use dashmap::DashMap;
@ -9,16 +10,19 @@ use futures_util::future::select;
use futures_util::StreamExt; use futures_util::StreamExt;
use futures_util::TryStreamExt; use futures_util::TryStreamExt;
use main_error::MainResult; use main_error::MainResult;
use std::net::{Ipv4Addr, SocketAddr}; use real_ip::{real_ip, IpNet};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::pin::pin; use std::pin::pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::tungstenite::handshake::server::{ErrorResponse, Request, Response};
use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::Message;
use tracing::{debug, error, info, instrument, warn}; use tracing::{debug, error, info, warn};
type Tx = Sender<Message>; type Tx = Sender<Message>;
type PeerMap = DashMap<SocketAddr, Tx>; type PeerMap = DashMap<PeerId, Tx>;
type Sessions = DashMap<String, Session>; type Sessions = DashMap<String, Session>;
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
@ -32,7 +36,17 @@ pub enum SyncCommand<'a> {
Clients { session: &'a str, count: usize }, Clients { session: &'a str, count: usize },
} }
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
pub struct PeerId(IpAddr, u64);
impl Display for PeerId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}#{}", self.0, self.1)
}
}
pub struct Server { pub struct Server {
id_counter: AtomicU64,
peers: PeerMap, peers: PeerMap,
sessions: Sessions, sessions: Sessions,
} }
@ -40,12 +54,17 @@ pub struct Server {
impl Server { impl Server {
fn new() -> Self { fn new() -> Self {
Server { Server {
id_counter: AtomicU64::default(),
peers: PeerMap::with_capacity(128), peers: PeerMap::with_capacity(128),
sessions: Sessions::with_capacity(64), sessions: Sessions::with_capacity(64),
} }
} }
fn send_text<S: Into<String>>(&self, peer: &SocketAddr, text: S) { fn next_peer_id(&self) -> u64 {
self.id_counter.fetch_add(1, Ordering::Relaxed)
}
fn send_text<S: Into<String>>(&self, peer: &PeerId, text: S) {
if let Some(mut tx) = self.peers.get_mut(peer) { if let Some(mut tx) = self.peers.get_mut(peer) {
if let Err(e) = tx.try_send(Message::Text(text.into())) { if let Err(e) = tx.try_send(Message::Text(text.into())) {
error!(%peer, ?e, "failed to send message to client") error!(%peer, ?e, "failed to send message to client")
@ -53,7 +72,7 @@ impl Server {
} }
} }
pub fn send_command(&self, peer: &SocketAddr, command: &SyncCommand) { pub fn send_command(&self, peer: &PeerId, command: &SyncCommand) {
self.send_text(peer, serde_json::to_string(command).unwrap()) self.send_text(peer, serde_json::to_string(command).unwrap())
} }
@ -64,7 +83,7 @@ impl Server {
} }
} }
fn handle_command(&self, command: SyncCommand, sender: SocketAddr) { fn handle_command(&self, command: SyncCommand, sender: PeerId) {
match &command { match &command {
SyncCommand::Create { session, token } => { SyncCommand::Create { session, token } => {
self.sessions self.sessions
@ -111,13 +130,17 @@ impl Server {
} }
} }
fn handle_disconnect(&self, peer: &SocketAddr) { fn handle_disconnect(&self, peer: &PeerId) {
self.peers.remove(peer);
for mut session in self.sessions.iter_mut() { for mut session in self.sessions.iter_mut() {
session.remove_client(peer); session.remove_client(peer);
self.send_command(&session.owner, &SyncCommand::Clients { self.send_command(
&session.owner,
&SyncCommand::Clients {
session: &session.token, session: &session.token,
count: session.clients().count(), count: session.clients().count(),
}) },
)
} }
} }
@ -131,18 +154,33 @@ impl Server {
}); });
} }
#[instrument(skip(self, raw_stream))]
async fn handle_connection(&self, raw_stream: TcpStream, addr: SocketAddr) { async fn handle_connection(&self, raw_stream: TcpStream, addr: SocketAddr) {
debug!("incoming connection"); debug!("incoming connection");
let ws_stream = tokio_tungstenite::accept_async(raw_stream) let mut remote_ip = addr.ip();
.await
.expect("Error during the websocket handshake occurred"); let ws_stream_res =
info!("connection established"); tokio_tungstenite::accept_hdr_async(raw_stream, |req: &Request, response: Response| {
if let Some(ip) = real_ip(req.headers(), addr.ip(), TRUSTED_PROXIES) {
remote_ip = ip;
}
Ok::<_, ErrorResponse>(response)
})
.await;
let peer_id = PeerId(remote_ip, self.next_peer_id());
let ws_stream = match ws_stream_res {
Ok(ws_stream) => ws_stream,
Err(error) => {
error!(?error, %peer_id, "error while performing websocket handshake");
return;
}
};
info!(peer = %peer_id, "connection established");
// Insert the write part of this peer to the peer map. // Insert the write part of this peer to the peer map.
let (tx, rx) = channel(16); let (tx, rx) = channel(16);
self.peers.insert(addr, tx); self.peers.insert(peer_id, tx);
let (outgoing, incoming) = ws_stream.split(); let (outgoing, incoming) = ws_stream.split();
@ -150,11 +188,11 @@ impl Server {
if let Ok(message) = msg.to_text() { if let Ok(message) = msg.to_text() {
match serde_json::from_str(message) { match serde_json::from_str(message) {
Ok(command) => { Ok(command) => {
debug!(sender = %addr, message = ?command, "Received a message"); debug!(sender = %peer_id, message = ?command, "Received a message");
self.handle_command(command, addr); self.handle_command(command, peer_id);
} }
Err(e) => { Err(e) => {
warn!(sender = %addr, message, error = %e, "Error while decoding message"); warn!(sender = %peer_id, message, error = %e, "Error while decoding message");
} }
} }
} else { } else {
@ -169,9 +207,8 @@ impl Server {
let receive_from_others = pin!(receive_from_others); let receive_from_others = pin!(receive_from_others);
select(handle_messages, receive_from_others).await; select(handle_messages, receive_from_others).await;
info!(%addr, "disconnected"); info!(%peer_id, "disconnected");
self.peers.remove(&addr); self.handle_disconnect(&peer_id);
self.handle_disconnect(&addr);
} }
} }
@ -203,3 +240,8 @@ async fn main() -> MainResult {
Ok(()) Ok(())
} }
const TRUSTED_PROXIES: &[IpNet] = &[IpNet::new_assert(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 0)),
8,
)];

View file

@ -1,12 +1,11 @@
use crate::SyncCommand; use crate::{PeerId, SyncCommand};
use std::net::SocketAddr;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
#[derive(Debug)] #[derive(Debug)]
pub struct Session { pub struct Session {
pub owner: SocketAddr, pub owner: PeerId,
owner_token: String, owner_token: String,
clients: Vec<SocketAddr>, clients: Vec<PeerId>,
tick: u64, tick: u64,
playing: bool, playing: bool,
owner_left: Option<Instant>, owner_left: Option<Instant>,
@ -20,7 +19,7 @@ impl PartialEq for Session {
} }
impl Session { impl Session {
pub fn new(owner: SocketAddr, token: String, owner_token: String) -> Self { pub fn new(owner: PeerId, token: String, owner_token: String) -> Self {
Session { Session {
owner, owner,
owner_token, owner_token,
@ -32,11 +31,11 @@ impl Session {
} }
} }
pub fn join(&mut self, client: SocketAddr) { pub fn join(&mut self, client: PeerId) {
self.clients.push(client); self.clients.push(client);
} }
pub fn set_owner(&mut self, owner: SocketAddr, owner_token: &str) -> bool { pub fn set_owner(&mut self, owner: PeerId, owner_token: &str) -> bool {
if owner_token == self.owner_token { if owner_token == self.owner_token {
self.owner = owner; self.owner = owner;
self.owner_left = None; self.owner_left = None;
@ -62,11 +61,11 @@ impl Session {
.into_iter() .into_iter()
} }
pub fn clients(&self) -> impl Iterator<Item = &SocketAddr> { pub fn clients(&self) -> impl Iterator<Item = &PeerId> {
self.clients.iter() self.clients.iter()
} }
pub fn remove_client(&mut self, peer: &SocketAddr) { pub fn remove_client(&mut self, peer: &PeerId) {
self.clients.retain(|client| client != peer) self.clients.retain(|client| client != peer)
} }