Skip to content

Commit

Permalink
Fix S3 not interruptting
Browse files Browse the repository at this point in the history
  • Loading branch information
RossComputerGuy committed Jan 10, 2025
1 parent 2d9b213 commit ec286ca
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/libstore/filetransfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ struct curlFileTransfer : public FileTransfer

size_t writeCallback(void * contents, size_t size, size_t nmemb)
{
checkInterrupt();

try {
size_t realSize = size * nmemb;
result.bodySize += realSize;
Expand Down Expand Up @@ -203,6 +205,8 @@ struct curlFileTransfer : public FileTransfer

size_t headerCallback(void * contents, size_t size, size_t nmemb)
{
checkInterrupt();

size_t realSize = size * nmemb;
std::string line((char *) contents, realSize);
printMsg(lvlVomit, "got header for '%s': %s", request.uri, trim(line));
Expand Down Expand Up @@ -263,6 +267,8 @@ struct curlFileTransfer : public FileTransfer

int progressCallback(double dltotal, double dlnow)
{
checkInterrupt();

try {
act.progress(dlnow, dltotal);
} catch (nix::Interrupted &) {
Expand All @@ -286,6 +292,8 @@ struct curlFileTransfer : public FileTransfer
size_t readOffset = 0;
size_t readCallback(char *buffer, size_t size, size_t nitems)
{
checkInterrupt();

if (readOffset == request.data->length())
return 0;
auto count = std::min(size * nitems, request.data->length() - readOffset);
Expand Down Expand Up @@ -710,6 +718,8 @@ struct curlFileTransfer : public FileTransfer

void enqueueItem(std::shared_ptr<TransferItem> item)
{
debug("transfer item for '%s' has been queued", item->request.uri);

if (item->request.data
&& !hasPrefix(item->request.uri, "http://")
&& !hasPrefix(item->request.uri, "https://"))
Expand Down Expand Up @@ -858,6 +868,7 @@ void FileTransfer::download(
});

request.dataCallback = [_state](std::string_view data) {
checkInterrupt();

auto state(_state->lock());

Expand Down
15 changes: 15 additions & 0 deletions src/libstore/s3-binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ class RetryStrategy : public Aws::Client::DefaultRetryStrategy
bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error, long attemptedRetries) const override
{
checkInterrupt();
if (getInterrupted())
return false;

auto retry = Aws::Client::DefaultRetryStrategy::ShouldRetry(error, attemptedRetries);
if (retry)
printError("AWS error '%s' (%s; request id: %s), will retry in %d ms",
Expand Down Expand Up @@ -159,6 +162,8 @@ ref<Aws::Client::ClientConfiguration> S3Helper::makeConfig(
S3Helper::FileTransferResult S3Helper::getObject(
const std::string & bucketName, const std::string & key)
{
checkInterrupt();

debug("fetching 's3://%s/%s'...", bucketName, key);

auto request =
Expand All @@ -167,6 +172,7 @@ S3Helper::FileTransferResult S3Helper::getObject(
.WithKey(key);

request.SetResponseStreamFactory([&]() {
checkInterrupt();
return Aws::New<std::stringstream>("STRINGSTREAM");
});

Expand All @@ -179,10 +185,13 @@ S3Helper::FileTransferResult S3Helper::getObject(
auto result = checkAws(fmt("AWS error fetching '%s'", key),
client->GetObject(request));

checkInterrupt();

res.data = decompress(result.GetContentEncoding(),
dynamic_cast<std::stringstream &>(result.GetBody()).str());

} catch (S3Error & e) {
checkInterrupt();
if ((e.err != Aws::S3::S3Errors::NO_SUCH_KEY) &&
(e.err != Aws::S3::S3Errors::ACCESS_DENIED)) throw;
}
Expand Down Expand Up @@ -283,6 +292,8 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual

bool fileExists(const std::string & path) override
{
checkInterrupt();

stats.head++;

auto res = s3Helper.client->HeadObject(
Expand Down Expand Up @@ -422,6 +433,8 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual

void getFile(const std::string & path, Sink & sink) override
{
checkInterrupt();

stats.get++;

// FIXME: stream output to sink.
Expand All @@ -441,6 +454,8 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual

StorePathSet queryAllValidPaths() override
{
checkInterrupt();

StorePathSet paths;
std::string marker;

Expand Down

0 comments on commit ec286ca

Please sign in to comment.