Skip to content

Commit

Permalink
When exiting cleanly shut_down any github connections in the broker
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbobbio committed Jan 1, 2025
1 parent 9bb580f commit 986c03e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
18 changes: 16 additions & 2 deletions crates/maelstrom-broker/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
IdVendor,
};
use anyhow::Result;
use futures::FutureExt as _;
use maelstrom_base::{
proto::{ClientToBroker, Hello},
ClientId, MonitorId, WorkerId,
Expand All @@ -13,7 +14,11 @@ use maelstrom_github::{GitHubClient, GitHubQueue, GitHubQueueAcceptor};
use maelstrom_util::net::{self, AsRawFdExt};
use serde::Serialize;
use slog::{debug, error, info, o, warn, Logger};
use std::{future::Future, sync::Arc, thread};
use std::{
future::Future,
sync::{Arc, Mutex},
thread,
};
use tokio::{
io::BufReader,
net::{TcpListener, TcpStream},
Expand Down Expand Up @@ -335,6 +340,7 @@ pub async fn github_acceptor_main<TempFileT>(
scheduler_sender: SchedulerSender<TempFileT>,
id_vendor: Arc<IdVendor>,
log: Logger,
tasks: Arc<Mutex<JoinSet<()>>>,
) where
TempFileT: Send + Sync + 'static,
{
Expand All @@ -352,7 +358,15 @@ pub async fn github_acceptor_main<TempFileT>(
Ok(queue) => {
let log = log.new(o!("peer_addr" => "github peer"));
debug!(log, "new connection");
task::spawn(unassigned_github_connection_main(

let mut tasks = tasks.lock().unwrap();

// Remove any completed tasks from the set
while let Some(Some(_)) =
tokio::task::unconstrained(tasks.join_next()).now_or_never()
{}

tasks.spawn(unassigned_github_connection_main(
queue,
scheduler_sender.clone(),
id_vendor.clone(),
Expand Down
11 changes: 10 additions & 1 deletion crates/maelstrom-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
process,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
Arc, Mutex,
},
};
use tokio::{
Expand Down Expand Up @@ -90,6 +90,7 @@ where
id: AtomicU32::new(0),
});

let github_connection_tasks = Arc::new(Mutex::new(JoinSet::new()));
let mut join_set = JoinSet::new();

join_set.spawn(http::listener_main(
Expand All @@ -111,6 +112,7 @@ where
scheduler_task.scheduler_sender().clone(),
id_vendor,
log.clone(),
github_connection_tasks.clone(),
));
} else {
info!(log, "not listening for GitHub connections");
Expand All @@ -131,6 +133,13 @@ where

join_set.join_next().await;

drop(join_set);
let github_connection_tasks = Arc::into_inner(github_connection_tasks)
.unwrap()
.into_inner()
.unwrap();
github_connection_tasks.join_all().await;

Ok(())
}

Expand Down

0 comments on commit 986c03e

Please sign in to comment.