Skip to content

Commit

Permalink
Change timeout mechanism to try to avoid treadmilling
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbobbio committed Jan 1, 2025
1 parent 7806572 commit e6d6a96
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions crates/maelstrom-util/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use maelstrom_github::{GitHubReadQueue, GitHubWriteQueue};
use maelstrom_linux::{self as linux, Fd};
use serde::{de::DeserializeOwned, Serialize};
use slog::{debug, Logger};
use std::time::Duration;
use std::{
fmt::Debug,
io::{Read, Write},
Expand All @@ -14,6 +15,7 @@ use std::{
use tokio::{
io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _},
sync::mpsc::{UnboundedReceiver, UnboundedSender},
time::Instant,
};

fn write_message_to_vec(msg: impl Serialize) -> Result<Vec<u8>> {
Expand Down Expand Up @@ -226,21 +228,20 @@ where
MessageT: Debug + Serialize,
{
let mut to_send = vec![];
let mut next = Instant::now() + Duration::from_millis(10);
loop {
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(10)) => {
match tokio::time::timeout_at(next, channel.recv()).await {
Err(_) => {
if !to_send.is_empty() {
write_many_messages_to_github_queue(&mut queue, &to_send, log).await?;
to_send.clear();
}
},
msg = channel.recv() => {
if let Some(msg) = msg {
to_send.push(msg);
} else {
break;
}
next = Instant::now() + Duration::from_millis(10);
}
Ok(Some(msg)) => {
to_send.push(msg);
}
Ok(None) => break,
}
}
queue.shut_down().await?;
Expand Down

0 comments on commit e6d6a96

Please sign in to comment.