mirror of
https://codeberg.org/demostf/sync.git
synced 2026-06-03 16:44:07 +02:00
rewrite with tokio-tungstenite
This commit is contained in:
parent
b0ae2c7927
commit
608a4a1bde
6 changed files with 541 additions and 1125 deletions
120
src/client.rs
120
src/client.rs
|
|
@ -1,120 +0,0 @@
|
|||
use enum_dispatch::enum_dispatch;
|
||||
use parity_ws::{util::Token, Result, Sender};
|
||||
|
||||
#[enum_dispatch(Client)]
|
||||
pub(crate) trait ClientTrait {
|
||||
#[allow(clippy::result_large_err)]
|
||||
fn send(&self, msg: &str) -> Result<()>;
|
||||
|
||||
fn token(&self) -> Token;
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Clone, Debug)]
|
||||
pub(crate) struct SenderClient(Sender);
|
||||
|
||||
impl ClientTrait for SenderClient {
|
||||
fn send(&self, msg: &str) -> Result<()> {
|
||||
self.0.send(msg)
|
||||
}
|
||||
|
||||
fn token(&self) -> Token {
|
||||
self.0.token()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Sender> for SenderClient {
|
||||
fn from(sender: Sender) -> Self {
|
||||
SenderClient(sender)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod mock {
|
||||
use crate::client::ClientTrait;
|
||||
use crate::SyncCommand;
|
||||
use parity_ws::{util::Token, Result};
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct MockClient {
|
||||
received: Rc<RefCell<Vec<String>>>,
|
||||
token: Token,
|
||||
}
|
||||
|
||||
impl PartialEq for MockClient {
|
||||
fn eq(&self, other: &MockClient) -> bool {
|
||||
self.token == other.token
|
||||
}
|
||||
}
|
||||
|
||||
impl ClientTrait for MockClient {
|
||||
fn send(&self, msg: &str) -> Result<()> {
|
||||
self.received.borrow_mut().push(msg.into());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn token(&self) -> Token {
|
||||
self.token
|
||||
}
|
||||
}
|
||||
|
||||
impl MockClient {
|
||||
pub fn new(token: usize) -> Self {
|
||||
MockClient {
|
||||
received: Rc::new(RefCell::new(Vec::new())),
|
||||
token: Token(token),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn received_count(&self) -> usize {
|
||||
RefCell::borrow(&self.received).len()
|
||||
}
|
||||
|
||||
pub fn assert_received(&self, expected: Vec<SyncCommand>) {
|
||||
let map = RefCell::borrow(&self.received);
|
||||
|
||||
let received: Vec<_> = map
|
||||
.iter()
|
||||
.map(|msg| serde_json::from_str::<SyncCommand>(msg).expect("invalid message"))
|
||||
.collect();
|
||||
|
||||
assert_eq!(expected, received);
|
||||
}
|
||||
|
||||
pub fn clear(&self) {
|
||||
self.received.borrow_mut().clear()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) use mock::MockClient;
|
||||
|
||||
#[cfg(not(test))]
|
||||
#[enum_dispatch]
|
||||
#[derive(PartialEq, Clone, Debug)]
|
||||
pub(crate) enum Client {
|
||||
Sender(SenderClient),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[enum_dispatch]
|
||||
#[derive(PartialEq, Clone, Debug)]
|
||||
pub(crate) enum Client {
|
||||
Sender(SenderClient),
|
||||
Mock(MockClient),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl Client {
|
||||
pub fn mock(token: usize) -> Self {
|
||||
Client::Mock(MockClient::new(token))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Sender> for Client {
|
||||
fn from(sender: Sender) -> Self {
|
||||
Client::Sender(sender.into())
|
||||
}
|
||||
}
|
||||
|
|
@ -1,175 +0,0 @@
|
|||
use crate::{spawn_local_server, SyncCommand};
|
||||
use parity_ws::Sender;
|
||||
use portpicker::pick_unused_port;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use websocket_lite::{Client, ClientBuilder, Message, NetworkStream};
|
||||
|
||||
const DELAY: Duration = Duration::from_millis(50);
|
||||
|
||||
struct TestHandle {
|
||||
server_sender: Sender,
|
||||
connect: String,
|
||||
}
|
||||
|
||||
impl TestHandle {
|
||||
pub fn new() -> Self {
|
||||
better_panic::install();
|
||||
|
||||
let port = pick_unused_port().expect("No ports free");
|
||||
|
||||
let server_sender = spawn_local_server(port);
|
||||
|
||||
// give the server some time to start
|
||||
sleep(DELAY);
|
||||
|
||||
TestHandle {
|
||||
server_sender,
|
||||
connect: format!("ws://localhost:{}", port),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_client(&self) -> Client<Box<dyn NetworkStream + Sync + Send + 'static>> {
|
||||
ClientBuilder::new(&self.connect)
|
||||
.unwrap()
|
||||
.connect()
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TestHandle {
|
||||
fn drop(&mut self) {
|
||||
self.server_sender.shutdown().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn integration_tests() {
|
||||
let test = TestHandle::new();
|
||||
let mut owner = test.get_client();
|
||||
let mut client = test.get_client();
|
||||
|
||||
send(
|
||||
&mut owner,
|
||||
SyncCommand::Create {
|
||||
session: "foo",
|
||||
token: "bar",
|
||||
},
|
||||
);
|
||||
send(
|
||||
&mut owner,
|
||||
SyncCommand::Tick {
|
||||
session: "foo",
|
||||
tick: 99,
|
||||
},
|
||||
);
|
||||
|
||||
send(&mut client, SyncCommand::Join { session: "foo" });
|
||||
|
||||
assert_receive(
|
||||
&mut client,
|
||||
SyncCommand::Tick {
|
||||
session: "foo",
|
||||
tick: 99,
|
||||
},
|
||||
);
|
||||
assert_receive(
|
||||
&mut client,
|
||||
SyncCommand::Play {
|
||||
session: "foo",
|
||||
play: false,
|
||||
},
|
||||
);
|
||||
|
||||
send(
|
||||
&mut owner,
|
||||
SyncCommand::Play {
|
||||
session: "foo",
|
||||
play: true,
|
||||
},
|
||||
);
|
||||
assert_receive(
|
||||
&mut client,
|
||||
SyncCommand::Play {
|
||||
session: "foo",
|
||||
play: true,
|
||||
},
|
||||
);
|
||||
|
||||
// should be ignored
|
||||
send(
|
||||
&mut client,
|
||||
SyncCommand::Tick {
|
||||
session: "foo",
|
||||
tick: 5,
|
||||
},
|
||||
);
|
||||
|
||||
let mut client2 = test.get_client();
|
||||
|
||||
send(&mut client2, SyncCommand::Join { session: "foo" });
|
||||
|
||||
assert_receive(
|
||||
&mut client2,
|
||||
SyncCommand::Tick {
|
||||
session: "foo",
|
||||
tick: 99,
|
||||
},
|
||||
);
|
||||
assert_receive(
|
||||
&mut client2,
|
||||
SyncCommand::Play {
|
||||
session: "foo",
|
||||
play: true,
|
||||
},
|
||||
);
|
||||
|
||||
// owner reconnecting
|
||||
std::mem::drop(owner);
|
||||
|
||||
let mut owner2 = test.get_client();
|
||||
|
||||
send(
|
||||
&mut owner2,
|
||||
SyncCommand::Create {
|
||||
session: "foo",
|
||||
token: "bar",
|
||||
},
|
||||
);
|
||||
|
||||
send(
|
||||
&mut owner2,
|
||||
SyncCommand::Play {
|
||||
session: "foo",
|
||||
play: false,
|
||||
},
|
||||
);
|
||||
|
||||
assert_receive(
|
||||
&mut client,
|
||||
SyncCommand::Play {
|
||||
session: "foo",
|
||||
play: false,
|
||||
},
|
||||
);
|
||||
assert_receive(
|
||||
&mut client2,
|
||||
SyncCommand::Play {
|
||||
session: "foo",
|
||||
play: false,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
fn send<T: std::io::Write>(client: &mut Client<T>, command: SyncCommand) {
|
||||
client
|
||||
.send(Message::text(serde_json::to_string(&command).unwrap()))
|
||||
.unwrap();
|
||||
sleep(DELAY);
|
||||
}
|
||||
|
||||
fn assert_receive<T: std::io::Read>(client: &mut Client<T>, expected: SyncCommand) {
|
||||
let message = client.receive().unwrap().unwrap();
|
||||
let text = message.as_text().unwrap();
|
||||
assert_eq!(expected, serde_json::from_str(text).unwrap());
|
||||
}
|
||||
723
src/main.rs
723
src/main.rs
|
|
@ -1,14 +1,25 @@
|
|||
mod session;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use parity_ws::{listen, util::Token, CloseCode, Error, Handler, Message, Result};
|
||||
use std::collections::HashMap;
|
||||
use std::rc::Rc;
|
||||
|
||||
mod client;
|
||||
|
||||
use client::{Client, ClientTrait};
|
||||
use std::cell::RefCell;
|
||||
use crate::session::Session;
|
||||
use dashmap::DashMap;
|
||||
use futures_channel::mpsc::{channel, Sender};
|
||||
use futures_util::future::select;
|
||||
use futures_util::StreamExt;
|
||||
use futures_util::TryStreamExt;
|
||||
use main_error::MainResult;
|
||||
use std::net::{Ipv4Addr, SocketAddr};
|
||||
use std::pin::pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
|
||||
type Tx = Sender<Message>;
|
||||
type PeerMap = DashMap<SocketAddr, Tx>;
|
||||
type Sessions = DashMap<String, Session>;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
|
||||
#[serde(tag = "type")]
|
||||
|
|
@ -20,573 +31,153 @@ pub enum SyncCommand<'a> {
|
|||
Play { session: &'a str, play: bool },
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Debug)]
|
||||
struct Session {
|
||||
owner: Token,
|
||||
owner_token: String,
|
||||
clients: HashMap<Token, Client>,
|
||||
tick: u64,
|
||||
playing: bool,
|
||||
owner_left: Option<Instant>,
|
||||
pub struct Server {
|
||||
peers: PeerMap,
|
||||
sessions: Sessions,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
pub fn new(owner: Token, owner_token: String) -> Self {
|
||||
Session {
|
||||
owner,
|
||||
owner_token,
|
||||
clients: HashMap::new(),
|
||||
playing: false,
|
||||
tick: 0,
|
||||
owner_left: None,
|
||||
impl Server {
|
||||
fn new() -> Self {
|
||||
Server {
|
||||
peers: PeerMap::with_capacity(128),
|
||||
sessions: Sessions::with_capacity(64),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn join(&mut self, client: &Client) {
|
||||
self.clients.insert(client.token(), client.clone());
|
||||
}
|
||||
}
|
||||
|
||||
impl Session {
|
||||
pub fn send_command(&self, command: &SyncCommand) {
|
||||
let command_text = serde_json::to_string(command).unwrap();
|
||||
for client in self.clients.values() {
|
||||
client.send(&command_text).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Server {
|
||||
out: Client,
|
||||
sessions: Rc<RefCell<HashMap<String, Session>>>,
|
||||
}
|
||||
|
||||
fn handle_command(
|
||||
command: SyncCommand,
|
||||
sender: &Client,
|
||||
sessions: &RefCell<HashMap<String, Session>>,
|
||||
) {
|
||||
match &command {
|
||||
SyncCommand::Create { session, token } => {
|
||||
sessions
|
||||
.borrow_mut()
|
||||
.entry(session.to_string())
|
||||
.and_modify(|session| {
|
||||
if token == &session.owner_token {
|
||||
session.owner = sender.token();
|
||||
session.owner_left = None;
|
||||
}
|
||||
})
|
||||
.or_insert_with(|| Session::new(sender.token(), token.to_string()));
|
||||
gc_sessions(sessions);
|
||||
}
|
||||
SyncCommand::Join {
|
||||
session: session_name,
|
||||
} => match sessions.borrow_mut().get_mut(*session_name) {
|
||||
Some(session) => {
|
||||
let _ = sender.send(
|
||||
&serde_json::to_string(&SyncCommand::Tick {
|
||||
tick: session.tick,
|
||||
session: session_name,
|
||||
})
|
||||
.unwrap(),
|
||||
);
|
||||
let _ = sender.send(
|
||||
&serde_json::to_string(&SyncCommand::Play {
|
||||
play: session.playing,
|
||||
session: session_name,
|
||||
})
|
||||
.unwrap(),
|
||||
);
|
||||
session.join(sender);
|
||||
fn send_text<S: Into<String>>(&self, peer: &SocketAddr, text: S) {
|
||||
if let Some(mut tx) = self.peers.get_mut(peer) {
|
||||
if let Err(e) = tx.try_send(Message::Text(text.into())) {
|
||||
error!(%peer, ?e, "failed to send message to client")
|
||||
}
|
||||
None => println!("session {} not found", session_name),
|
||||
},
|
||||
SyncCommand::Tick {
|
||||
tick,
|
||||
session: session_name,
|
||||
} => update_session_and_forward(
|
||||
sender,
|
||||
sessions,
|
||||
session_name,
|
||||
|session| session.tick = *tick,
|
||||
&command,
|
||||
),
|
||||
SyncCommand::Play {
|
||||
play,
|
||||
session: session_name,
|
||||
} => update_session_and_forward(
|
||||
sender,
|
||||
sessions,
|
||||
session_name,
|
||||
|session| session.playing = *play,
|
||||
&command,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send_command(&self, peer: &SocketAddr, command: &SyncCommand) {
|
||||
self.send_text(peer, serde_json::to_string(command).unwrap())
|
||||
}
|
||||
|
||||
pub fn send_to_clients(&self, session: &Session, command: &SyncCommand) {
|
||||
let command_text = serde_json::to_string(command).unwrap();
|
||||
for peer in session.clients() {
|
||||
self.send_text(peer, &command_text);
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_command<'a>(&self, command: SyncCommand<'_>, sender: SocketAddr) {
|
||||
match &command {
|
||||
SyncCommand::Create { session, token } => {
|
||||
self.sessions
|
||||
.entry(session.to_string())
|
||||
.and_modify(|session| {
|
||||
if !session.set_owner(sender, token) {
|
||||
warn!(%sender, token, "invalid owner token");
|
||||
}
|
||||
})
|
||||
.or_insert_with(|| Session::new(sender, (*session).into(), token.to_string()));
|
||||
self.gc_sessions();
|
||||
}
|
||||
SyncCommand::Join {
|
||||
session: session_name,
|
||||
} => match self.sessions.get_mut(*session_name) {
|
||||
Some(mut session) => {
|
||||
for initial_command in session.initial_state() {
|
||||
self.send_command(&sender, &initial_command);
|
||||
}
|
||||
session.join(sender);
|
||||
}
|
||||
None => error!(session = session_name, "session not found for command"),
|
||||
},
|
||||
session_command @ (SyncCommand::Play { session, .. }
|
||||
| SyncCommand::Tick { session, .. }) => match self.sessions.get_mut(*session) {
|
||||
Some(mut session) => {
|
||||
if session.owner == sender {
|
||||
session.handle_command(session_command);
|
||||
self.send_to_clients(&session, &command);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
error!(session, "session not found for command");
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// cleanup sessions where the owner hasn't reconnected in 15 minutes
|
||||
fn gc_sessions(&self) {
|
||||
let now = Instant::now();
|
||||
self.sessions
|
||||
.retain(|_, session| match session.inactive_time(now) {
|
||||
Some(inactive) => inactive > TIMEOUT,
|
||||
None => true,
|
||||
});
|
||||
}
|
||||
|
||||
#[instrument(skip(self, raw_stream))]
|
||||
async fn handle_connection(&self, raw_stream: TcpStream, addr: SocketAddr) {
|
||||
debug!("incoming connection");
|
||||
|
||||
let ws_stream = tokio_tungstenite::accept_async(raw_stream)
|
||||
.await
|
||||
.expect("Error during the websocket handshake occurred");
|
||||
info!("connection established");
|
||||
|
||||
// Insert the write part of this peer to the peer map.
|
||||
let (tx, rx) = channel(16);
|
||||
self.peers.insert(addr, tx);
|
||||
|
||||
let (outgoing, incoming) = ws_stream.split();
|
||||
|
||||
let handle_messages = incoming.try_for_each(|msg| async move {
|
||||
if let Ok(message) = msg.to_text() {
|
||||
match serde_json::from_str(message) {
|
||||
Ok(command) => {
|
||||
debug!(sender = %addr, message = ?command, "Received a message");
|
||||
self.handle_command(command, addr).await;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(sender = %addr, message, error = %e, "Error while decoding message");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!("ignoring non-text message");
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let receive_from_others = rx.map(Ok).forward(outgoing);
|
||||
|
||||
let handle_messages = pin!(handle_messages);
|
||||
let receive_from_others = pin!(receive_from_others);
|
||||
select(handle_messages, receive_from_others).await;
|
||||
|
||||
info!(%addr, "disconnected");
|
||||
self.peers.remove(&addr);
|
||||
}
|
||||
}
|
||||
|
||||
const TIMEOUT: Duration = Duration::from_secs(15 * 60);
|
||||
|
||||
/// cleanup sessions where the owner hasn't reconnected in 15 minutes
|
||||
fn gc_sessions(sessions: &RefCell<HashMap<String, Session>>) {
|
||||
let now = Instant::now();
|
||||
sessions
|
||||
.borrow_mut()
|
||||
.retain(|_, session| match session.owner_left {
|
||||
Some(left) => now.duration_since(left) > TIMEOUT,
|
||||
None => true,
|
||||
});
|
||||
#[tokio::main]
|
||||
async fn main() -> MainResult {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let port: u16 = std::env::var("PORT")
|
||||
.unwrap_or_else(|_| "80".to_string())
|
||||
.parse()?;
|
||||
let listen_address = SocketAddr::from((Ipv4Addr::UNSPECIFIED, port));
|
||||
|
||||
let state = Arc::new(Server::new());
|
||||
|
||||
// Create the event loop and TCP listener we'll accept connections on.
|
||||
let listener = TcpListener::bind(&listen_address).await.expect("Failed to bind");
|
||||
|
||||
info!("listening on: {:?}", listen_address);
|
||||
|
||||
// Let's spawn the handling of each connection in a separate task.
|
||||
while let Ok((stream, addr)) = listener.accept().await {
|
||||
let state = state.clone();
|
||||
tokio::spawn(async move { state.handle_connection(stream, addr).await });
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update_session_and_forward<F>(
|
||||
sender: &Client,
|
||||
sessions: &RefCell<HashMap<String, Session>>,
|
||||
session_name: &str,
|
||||
mut update_fn: F,
|
||||
command: &SyncCommand,
|
||||
) where
|
||||
F: FnMut(&mut Session),
|
||||
{
|
||||
match sessions.borrow_mut().get_mut(session_name) {
|
||||
Some(session) => {
|
||||
if session.owner == sender.token() {
|
||||
update_fn(session);
|
||||
session.send_command(command);
|
||||
}
|
||||
}
|
||||
None => println!("session {} not found", session_name),
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler for Server {
|
||||
fn on_message(&mut self, msg: Message) -> Result<()> {
|
||||
match serde_json::from_str::<SyncCommand>(msg.as_text().unwrap_or_default()) {
|
||||
Ok(command) => {
|
||||
handle_command(command, &self.out, &self.sessions);
|
||||
Ok(())
|
||||
}
|
||||
Err(_) => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
fn on_close(&mut self, _: CloseCode, _: &str) {
|
||||
let mut sessions = self.sessions.borrow_mut();
|
||||
let token = self.out.token();
|
||||
|
||||
for session in sessions.values_mut() {
|
||||
if session.owner == token {
|
||||
session.owner_left = Some(Instant::now())
|
||||
}
|
||||
|
||||
session.clients.remove(&token);
|
||||
}
|
||||
}
|
||||
|
||||
fn on_error(&mut self, err: Error) {
|
||||
println!("The server encountered an error: {:?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
/// Used to spawn a server in integration tests
|
||||
#[cfg(test)]
|
||||
pub fn spawn_local_server(port: u16) -> parity_ws::Sender {
|
||||
use parity_ws::WebSocket;
|
||||
use std::sync::mpsc::channel;
|
||||
use std::thread::spawn;
|
||||
|
||||
let listen_address = format!("localhost:{}", port);
|
||||
|
||||
let (tx, rx) = channel();
|
||||
|
||||
spawn(move || {
|
||||
let sessions: Rc<RefCell<HashMap<String, Session>>> = Rc::default();
|
||||
|
||||
let ws = WebSocket::new(|out: parity_ws::Sender| Server {
|
||||
out: out.into(),
|
||||
sessions: sessions.clone(),
|
||||
})
|
||||
.unwrap();
|
||||
let ws = ws.bind(listen_address).unwrap();
|
||||
|
||||
tx.send(ws.broadcaster()).unwrap();
|
||||
|
||||
ws.run().unwrap();
|
||||
});
|
||||
|
||||
rx.recv().unwrap()
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let port = std::env::var("PORT").unwrap_or_else(|_| "80".to_string());
|
||||
let listen_address = format!("0.0.0.0:{}", port);
|
||||
|
||||
println!("listening on: {:?}", listen_address);
|
||||
|
||||
let sessions: Rc<RefCell<HashMap<String, Session>>> = Rc::default();
|
||||
|
||||
listen(listen_address, |out| Server {
|
||||
out: out.into(),
|
||||
sessions: sessions.clone(),
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use maplit::hashmap;
|
||||
|
||||
#[test]
|
||||
fn test_deserialize() {
|
||||
let input = "{\"type\": \"create\", \"session\": \"foo\", \"token\": \"bar\"}";
|
||||
assert_eq!(
|
||||
SyncCommand::Create {
|
||||
session: "foo",
|
||||
token: "bar",
|
||||
},
|
||||
serde_json::from_str(input).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create() {
|
||||
let sessions: RefCell<HashMap<String, Session>> = RefCell::new(HashMap::new());
|
||||
let sender = Client::mock(1);
|
||||
let command = SyncCommand::Create {
|
||||
session: "test",
|
||||
token: "bar",
|
||||
};
|
||||
|
||||
handle_command(command, &sender, &sessions);
|
||||
|
||||
assert_eq!(
|
||||
hashmap! {
|
||||
"test".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![],
|
||||
tick: 0,
|
||||
playing: false,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
}
|
||||
},
|
||||
sessions.into_inner()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_play_owner() {
|
||||
let sessions: RefCell<HashMap<String, Session>> = RefCell::new(hashmap! {
|
||||
"test".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![],
|
||||
tick: 0,
|
||||
playing: false,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
}
|
||||
});
|
||||
let sender = Client::mock(1);
|
||||
let command = SyncCommand::Play {
|
||||
session: "test",
|
||||
play: true,
|
||||
};
|
||||
|
||||
handle_command(command, &sender, &sessions);
|
||||
|
||||
assert_eq!(
|
||||
hashmap! {
|
||||
"test".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![],
|
||||
tick: 0,
|
||||
playing: true,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
}
|
||||
},
|
||||
sessions.into_inner()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_play_not_owner() {
|
||||
let sessions: RefCell<HashMap<String, Session>> = RefCell::new(hashmap! {
|
||||
"test".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![],
|
||||
tick: 0,
|
||||
playing: false,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
}
|
||||
});
|
||||
let sender = Client::mock(2);
|
||||
let command = SyncCommand::Play {
|
||||
session: "test",
|
||||
play: true,
|
||||
};
|
||||
|
||||
handle_command(command, &sender, &sessions);
|
||||
|
||||
assert_eq!(
|
||||
hashmap! {
|
||||
"test".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![],
|
||||
tick: 0,
|
||||
playing: false,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
}
|
||||
},
|
||||
sessions.into_inner()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tick_owner() {
|
||||
let sessions: RefCell<HashMap<String, Session>> = RefCell::new(hashmap! {
|
||||
"test".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![],
|
||||
tick: 0,
|
||||
playing: false,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
}
|
||||
});
|
||||
let sender = Client::mock(1);
|
||||
let command = SyncCommand::Tick {
|
||||
session: "test",
|
||||
tick: 99,
|
||||
};
|
||||
|
||||
handle_command(command, &sender, &sessions);
|
||||
|
||||
assert_eq!(
|
||||
hashmap! {
|
||||
"test".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![],
|
||||
tick: 99,
|
||||
playing: false,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
}
|
||||
},
|
||||
sessions.into_inner()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tick_not_owner() {
|
||||
let sessions: RefCell<HashMap<String, Session>> = RefCell::new(hashmap! {
|
||||
"test".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![],
|
||||
tick: 0,
|
||||
playing: false,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
}
|
||||
});
|
||||
let sender = Client::mock(2);
|
||||
let command = SyncCommand::Tick {
|
||||
session: "test",
|
||||
tick: 99,
|
||||
};
|
||||
|
||||
handle_command(command, &sender, &sessions);
|
||||
|
||||
assert_eq!(
|
||||
hashmap! {
|
||||
"test".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![],
|
||||
tick: 0,
|
||||
playing: false,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
}
|
||||
},
|
||||
sessions.into_inner()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_join() {
|
||||
let sessions: RefCell<HashMap<String, Session>> = RefCell::new(hashmap! {
|
||||
"test".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![],
|
||||
tick: 99,
|
||||
playing: true,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
}
|
||||
});
|
||||
let sender = Client::mock(2);
|
||||
let command = SyncCommand::Join { session: "test" };
|
||||
|
||||
handle_command(command, &sender, &sessions);
|
||||
|
||||
assert_eq!(
|
||||
hashmap! {
|
||||
"test".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![Token(2) => sender.clone()],
|
||||
tick: 99,
|
||||
playing: true,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
}
|
||||
},
|
||||
sessions.into_inner()
|
||||
);
|
||||
|
||||
if let Client::Mock(mock) = sender {
|
||||
mock.assert_received(vec![
|
||||
SyncCommand::Tick {
|
||||
session: "test",
|
||||
tick: 99,
|
||||
},
|
||||
SyncCommand::Play {
|
||||
session: "test",
|
||||
play: true,
|
||||
},
|
||||
]);
|
||||
};
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_join_non_existing() {
|
||||
let sessions: RefCell<HashMap<String, Session>> = RefCell::new(hashmap! {
|
||||
"test".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![],
|
||||
tick: 0,
|
||||
playing: false,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
}
|
||||
});
|
||||
let sender = Client::mock(2);
|
||||
let command = SyncCommand::Join { session: "test2" };
|
||||
|
||||
handle_command(command, &sender, &sessions);
|
||||
|
||||
assert_eq!(
|
||||
hashmap! {
|
||||
"test".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![],
|
||||
tick: 0,
|
||||
playing: false,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
}
|
||||
},
|
||||
sessions.into_inner()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_forward() {
|
||||
let sessions: RefCell<HashMap<String, Session>> = RefCell::new(hashmap! {
|
||||
"test".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![],
|
||||
tick: 99,
|
||||
playing: true,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
},
|
||||
"test2".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![],
|
||||
tick: 99,
|
||||
playing: true,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
}
|
||||
});
|
||||
let owner = Client::mock(1);
|
||||
let sender1 = Client::mock(2);
|
||||
let sender2 = Client::mock(3);
|
||||
let command = SyncCommand::Join { session: "test" };
|
||||
|
||||
handle_command(command, &sender1, &sessions);
|
||||
|
||||
let command = SyncCommand::Join { session: "test2" };
|
||||
handle_command(command, &sender2, &sessions);
|
||||
|
||||
if let Client::Mock(mock) = &sender1 {
|
||||
mock.clear();
|
||||
}
|
||||
if let Client::Mock(mock) = &sender2 {
|
||||
mock.clear();
|
||||
}
|
||||
|
||||
let command = SyncCommand::Tick {
|
||||
session: "test",
|
||||
tick: 999,
|
||||
};
|
||||
|
||||
handle_command(command, &owner, &sessions);
|
||||
|
||||
if let Client::Mock(mock) = sender1 {
|
||||
mock.assert_received(vec![SyncCommand::Tick {
|
||||
session: "test",
|
||||
tick: 999,
|
||||
}]);
|
||||
};
|
||||
if let Client::Mock(mock) = sender2 {
|
||||
assert_eq!(0, mock.received_count());
|
||||
};
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_forward_non_owner() {
|
||||
let sessions: RefCell<HashMap<String, Session>> = RefCell::new(hashmap! {
|
||||
"test".into() => Session {
|
||||
owner: Token(1),
|
||||
clients: hashmap![],
|
||||
tick: 99,
|
||||
playing: true,
|
||||
owner_token: "bar".to_string(),
|
||||
owner_left: None
|
||||
}
|
||||
});
|
||||
let sender1 = Client::mock(2);
|
||||
let sender2 = Client::mock(3);
|
||||
let command = SyncCommand::Join { session: "test" };
|
||||
|
||||
handle_command(command.clone(), &sender1, &sessions);
|
||||
handle_command(command.clone(), &sender2, &sessions);
|
||||
|
||||
if let Client::Mock(mock) = &sender1 {
|
||||
mock.clear();
|
||||
}
|
||||
if let Client::Mock(mock) = &sender2 {
|
||||
mock.clear();
|
||||
}
|
||||
|
||||
let command = SyncCommand::Tick {
|
||||
session: "test",
|
||||
tick: 999,
|
||||
};
|
||||
|
||||
handle_command(command, &sender1, &sessions);
|
||||
|
||||
if let Client::Mock(mock) = sender1 {
|
||||
assert_eq!(0, mock.received_count());
|
||||
};
|
||||
if let Client::Mock(mock) = sender2 {
|
||||
assert_eq!(0, mock.received_count());
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod integration_tests;
|
||||
|
|
|
|||
78
src/session.rs
Normal file
78
src/session.rs
Normal file
|
|
@ -0,0 +1,78 @@
|
|||
use crate::SyncCommand;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Session {
|
||||
pub owner: SocketAddr,
|
||||
owner_token: String,
|
||||
clients: Vec<SocketAddr>,
|
||||
tick: u64,
|
||||
playing: bool,
|
||||
owner_left: Option<Instant>,
|
||||
token: String,
|
||||
}
|
||||
|
||||
impl PartialEq for Session {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.token.eq(&other.token)
|
||||
}
|
||||
}
|
||||
|
||||
impl Session {
|
||||
pub fn new(owner: SocketAddr, token: String, owner_token: String) -> Self {
|
||||
Session {
|
||||
owner,
|
||||
owner_token,
|
||||
clients: Vec::new(),
|
||||
playing: false,
|
||||
tick: 0,
|
||||
owner_left: None,
|
||||
token,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn join(&mut self, client: SocketAddr) {
|
||||
self.clients.push(client);
|
||||
}
|
||||
|
||||
pub fn set_owner(&mut self, owner: SocketAddr, owner_token: &str) -> bool {
|
||||
if owner_token == self.owner_token {
|
||||
self.owner = owner;
|
||||
self.owner_left = None;
|
||||
}
|
||||
owner_token == self.owner_token
|
||||
}
|
||||
|
||||
pub fn inactive_time(&self, now: Instant) -> Option<Duration> {
|
||||
self.owner_left.map(|left| left.duration_since(now))
|
||||
}
|
||||
|
||||
pub fn initial_state(&self) -> impl Iterator<Item = SyncCommand> {
|
||||
[
|
||||
SyncCommand::Tick {
|
||||
session: &self.token,
|
||||
tick: self.tick,
|
||||
},
|
||||
SyncCommand::Play {
|
||||
session: &self.token,
|
||||
play: self.playing,
|
||||
},
|
||||
]
|
||||
.into_iter()
|
||||
}
|
||||
|
||||
pub fn clients(&self) -> impl Iterator<Item = &SocketAddr> {
|
||||
self.clients.iter()
|
||||
}
|
||||
|
||||
pub fn handle_command(&mut self, command: &SyncCommand) {
|
||||
match command {
|
||||
SyncCommand::Tick { tick, .. } => {
|
||||
self.tick = *tick;
|
||||
}
|
||||
SyncCommand::Play { play, .. } => self.playing = *play,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue