diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc index 5c5080f8aa9..6533b2f58c4 100644 --- a/src/libstore/daemon.cc +++ b/src/libstore/daemon.cc @@ -269,26 +269,21 @@ struct ClientSettings }; static void performOp(TunnelLogger * logger, ref store, - TrustedFlag trusted, RecursiveFlag recursive, WorkerProto::Version clientVersion, - Source & from, BufferedSink & to, WorkerProto::Op op) + TrustedFlag trusted, RecursiveFlag recursive, + WorkerProto::BasicServerConnection & conn, + WorkerProto::Op op) { - WorkerProto::ReadConn rconn { - .from = from, - .version = clientVersion, - }; - WorkerProto::WriteConn wconn { - .to = to, - .version = clientVersion, - }; + WorkerProto::ReadConn rconn(conn); + WorkerProto::WriteConn wconn(conn); switch (op) { case WorkerProto::Op::IsValidPath: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); bool result = store->isValidPath(path); logger->stopWork(); - to << result; + conn.to << result; break; } @@ -296,8 +291,8 @@ static void performOp(TunnelLogger * logger, ref store, auto paths = WorkerProto::Serialise::read(*store, rconn); SubstituteFlag substitute = NoSubstitute; - if (GET_PROTOCOL_MINOR(clientVersion) >= 27) { - substitute = readInt(from) ? Substitute : NoSubstitute; + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 27) { + substitute = readInt(conn.from) ? Substitute : NoSubstitute; } logger->startWork(); @@ -311,13 +306,13 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::HasSubstitutes: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); StorePathSet paths; // FIXME paths.insert(path); auto res = store->querySubstitutablePaths(paths); logger->stopWork(); - to << (res.count(path) != 0); + conn.to << (res.count(path) != 0); break; } @@ -331,11 +326,11 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::QueryPathHash: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); auto hash = store->queryPathInfo(path)->narHash; logger->stopWork(); - to << hash.to_string(HashFormat::Base16, false); + conn.to << hash.to_string(HashFormat::Base16, false); break; } @@ -343,7 +338,7 @@ static void performOp(TunnelLogger * logger, ref store, case WorkerProto::Op::QueryReferrers: case WorkerProto::Op::QueryValidDerivers: case WorkerProto::Op::QueryDerivationOutputs: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); StorePathSet paths; if (op == WorkerProto::Op::QueryReferences) @@ -360,16 +355,16 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::QueryDerivationOutputNames: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); auto names = store->readDerivation(path).outputNames(); logger->stopWork(); - to << names; + conn.to << names; break; } case WorkerProto::Op::QueryDerivationOutputMap: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); auto outputs = store->queryPartialDerivationOutputMap(path); logger->stopWork(); @@ -378,37 +373,37 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::QueryDeriver: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); auto info = store->queryPathInfo(path); logger->stopWork(); - to << (info->deriver ? store->printStorePath(*info->deriver) : ""); + conn.to << (info->deriver ? store->printStorePath(*info->deriver) : ""); break; } case WorkerProto::Op::QueryPathFromHashPart: { - auto hashPart = readString(from); + auto hashPart = readString(conn.from); logger->startWork(); auto path = store->queryPathFromHashPart(hashPart); logger->stopWork(); - to << (path ? store->printStorePath(*path) : ""); + conn.to << (path ? store->printStorePath(*path) : ""); break; } case WorkerProto::Op::AddToStore: { - if (GET_PROTOCOL_MINOR(clientVersion) >= 25) { - auto name = readString(from); - auto camStr = readString(from); + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 25) { + auto name = readString(conn.from); + auto camStr = readString(conn.from); auto refs = WorkerProto::Serialise::read(*store, rconn); bool repairBool; - from >> repairBool; + conn.from >> repairBool; auto repair = RepairFlag{repairBool}; logger->startWork(); auto pathInfo = [&]() { // NB: FramedSource must be out of scope before logger->stopWork(); auto [contentAddressMethod, hashAlgo] = ContentAddressMethod::parseWithAlgo(camStr); - FramedSource source(from); + FramedSource source(conn.from); FileSerialisationMethod dumpMethod; switch (contentAddressMethod.getFileIngestionMethod()) { case FileIngestionMethod::Flat: @@ -439,7 +434,7 @@ static void performOp(TunnelLogger * logger, ref store, bool fixed; uint8_t recursive; std::string hashAlgoRaw; - from >> baseName >> fixed /* obsolete */ >> recursive >> hashAlgoRaw; + conn.from >> baseName >> fixed /* obsolete */ >> recursive >> hashAlgoRaw; if (recursive > true) throw Error("unsupported FileIngestionMethod with value of %i; you may need to upgrade nix-daemon", recursive); method = recursive @@ -459,11 +454,11 @@ static void performOp(TunnelLogger * logger, ref store, so why all this extra work? We still parse the NAR so that we aren't sending arbitrary data to `saved` unwittingly`, and we know when the NAR ends so we don't - consume the rest of `from` and can't parse another + consume the rest of `conn.from` and can't parse another command. (We don't trust `addToStoreFromDump` to not eagerly consume the entire stream it's given, past the length of the Nar. */ - TeeSource savedNARSource(from, saved); + TeeSource savedNARSource(conn.from, saved); NullFileSystemObjectSink sink; /* just parse the NAR */ parseDump(sink, savedNARSource); }); @@ -472,20 +467,20 @@ static void performOp(TunnelLogger * logger, ref store, *dumpSource, baseName, FileSerialisationMethod::NixArchive, method, hashAlgo); logger->stopWork(); - to << store->printStorePath(path); + conn.to << store->printStorePath(path); } break; } case WorkerProto::Op::AddMultipleToStore: { bool repair, dontCheckSigs; - from >> repair >> dontCheckSigs; + conn.from >> repair >> dontCheckSigs; if (!trusted && dontCheckSigs) dontCheckSigs = false; logger->startWork(); { - FramedSource source(from); + FramedSource source(conn.from); store->addMultipleToStore(source, RepairFlag{repair}, dontCheckSigs ? NoCheckSigs : CheckSigs); @@ -495,8 +490,8 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::AddTextToStore: { - std::string suffix = readString(from); - std::string s = readString(from); + std::string suffix = readString(conn.from); + std::string s = readString(conn.from); auto refs = WorkerProto::Serialise::read(*store, rconn); logger->startWork(); auto path = ({ @@ -504,37 +499,37 @@ static void performOp(TunnelLogger * logger, ref store, store->addToStoreFromDump(source, suffix, FileSerialisationMethod::Flat, ContentAddressMethod::Raw::Text, HashAlgorithm::SHA256, refs, NoRepair); }); logger->stopWork(); - to << store->printStorePath(path); + conn.to << store->printStorePath(path); break; } case WorkerProto::Op::ExportPath: { - auto path = store->parseStorePath(readString(from)); - readInt(from); // obsolete + auto path = store->parseStorePath(readString(conn.from)); + readInt(conn.from); // obsolete logger->startWork(); - TunnelSink sink(to); + TunnelSink sink(conn.to); store->exportPath(path, sink); logger->stopWork(); - to << 1; + conn.to << 1; break; } case WorkerProto::Op::ImportPaths: { logger->startWork(); - TunnelSource source(from, to); + TunnelSource source(conn.from, conn.to); auto paths = store->importPaths(source, trusted ? NoCheckSigs : CheckSigs); logger->stopWork(); Strings paths2; for (auto & i : paths) paths2.push_back(store->printStorePath(i)); - to << paths2; + conn.to << paths2; break; } case WorkerProto::Op::BuildPaths: { auto drvs = WorkerProto::Serialise::read(*store, rconn); BuildMode mode = bmNormal; - if (GET_PROTOCOL_MINOR(clientVersion) >= 15) { + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 15) { mode = WorkerProto::Serialise::read(*store, rconn); /* Repairing is not atomic, so disallowed for "untrusted" @@ -552,7 +547,7 @@ static void performOp(TunnelLogger * logger, ref store, logger->startWork(); store->buildPaths(drvs, mode); logger->stopWork(); - to << 1; + conn.to << 1; break; } @@ -578,7 +573,7 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::BuildDerivation: { - auto drvPath = store->parseStorePath(readString(from)); + auto drvPath = store->parseStorePath(readString(conn.from)); BasicDerivation drv; /* * Note: unlike wopEnsurePath, this operation reads a @@ -589,7 +584,7 @@ static void performOp(TunnelLogger * logger, ref store, * it cannot be trusted that its outPath was calculated * correctly. */ - readDerivation(from, *store, drv, Derivation::nameFromPath(drvPath)); + readDerivation(conn.from, *store, drv, Derivation::nameFromPath(drvPath)); auto buildMode = WorkerProto::Serialise::read(*store, rconn); logger->startWork(); @@ -655,20 +650,20 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::EnsurePath: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); store->ensurePath(path); logger->stopWork(); - to << 1; + conn.to << 1; break; } case WorkerProto::Op::AddTempRoot: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); store->addTempRoot(path); logger->stopWork(); - to << 1; + conn.to << 1; break; } @@ -678,24 +673,24 @@ static void performOp(TunnelLogger * logger, ref store, "you are not privileged to create perm roots\n\n" "hint: you can just do this client-side without special privileges, and probably want to do that instead."); auto storePath = WorkerProto::Serialise::read(*store, rconn); - Path gcRoot = absPath(readString(from)); + Path gcRoot = absPath(readString(conn.from)); logger->startWork(); auto & localFSStore = require(*store); localFSStore.addPermRoot(storePath, gcRoot); logger->stopWork(); - to << gcRoot; + conn.to << gcRoot; break; } case WorkerProto::Op::AddIndirectRoot: { - Path path = absPath(readString(from)); + Path path = absPath(readString(conn.from)); logger->startWork(); auto & indirectRootStore = require(*store); indirectRootStore.addIndirectRoot(path); logger->stopWork(); - to << 1; + conn.to << 1; break; } @@ -703,7 +698,7 @@ static void performOp(TunnelLogger * logger, ref store, case WorkerProto::Op::SyncWithGC: { logger->startWork(); logger->stopWork(); - to << 1; + conn.to << 1; break; } @@ -717,24 +712,24 @@ static void performOp(TunnelLogger * logger, ref store, for (auto & i : roots) size += i.second.size(); - to << size; + conn.to << size; for (auto & [target, links] : roots) for (auto & link : links) - to << link << store->printStorePath(target); + conn.to << link << store->printStorePath(target); break; } case WorkerProto::Op::CollectGarbage: { GCOptions options; - options.action = (GCOptions::GCAction) readInt(from); + options.action = (GCOptions::GCAction) readInt(conn.from); options.pathsToDelete = WorkerProto::Serialise::read(*store, rconn); - from >> options.ignoreLiveness >> options.maxFreed; + conn.from >> options.ignoreLiveness >> options.maxFreed; // obsolete fields - readInt(from); - readInt(from); - readInt(from); + readInt(conn.from); + readInt(conn.from); + readInt(conn.from); GCResults results; @@ -745,7 +740,7 @@ static void performOp(TunnelLogger * logger, ref store, gcStore.collectGarbage(options, results); logger->stopWork(); - to << results.paths << results.bytesFreed << 0 /* obsolete */; + conn.to << results.paths << results.bytesFreed << 0 /* obsolete */; break; } @@ -754,24 +749,24 @@ static void performOp(TunnelLogger * logger, ref store, ClientSettings clientSettings; - clientSettings.keepFailed = readInt(from); - clientSettings.keepGoing = readInt(from); - clientSettings.tryFallback = readInt(from); - clientSettings.verbosity = (Verbosity) readInt(from); - clientSettings.maxBuildJobs = readInt(from); - clientSettings.maxSilentTime = readInt(from); - readInt(from); // obsolete useBuildHook - clientSettings.verboseBuild = lvlError == (Verbosity) readInt(from); - readInt(from); // obsolete logType - readInt(from); // obsolete printBuildTrace - clientSettings.buildCores = readInt(from); - clientSettings.useSubstitutes = readInt(from); - - if (GET_PROTOCOL_MINOR(clientVersion) >= 12) { - unsigned int n = readInt(from); + clientSettings.keepFailed = readInt(conn.from); + clientSettings.keepGoing = readInt(conn.from); + clientSettings.tryFallback = readInt(conn.from); + clientSettings.verbosity = (Verbosity) readInt(conn.from); + clientSettings.maxBuildJobs = readInt(conn.from); + clientSettings.maxSilentTime = readInt(conn.from); + readInt(conn.from); // obsolete useBuildHook + clientSettings.verboseBuild = lvlError == (Verbosity) readInt(conn.from); + readInt(conn.from); // obsolete logType + readInt(conn.from); // obsolete printBuildTrace + clientSettings.buildCores = readInt(conn.from); + clientSettings.useSubstitutes = readInt(conn.from); + + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 12) { + unsigned int n = readInt(conn.from); for (unsigned int i = 0; i < n; i++) { - auto name = readString(from); - auto value = readString(from); + auto name = readString(conn.from); + auto value = readString(conn.from); clientSettings.overrides.emplace(name, value); } } @@ -788,20 +783,20 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::QuerySubstitutablePathInfo: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); SubstitutablePathInfos infos; store->querySubstitutablePathInfos({{path, std::nullopt}}, infos); logger->stopWork(); auto i = infos.find(path); if (i == infos.end()) - to << 0; + conn.to << 0; else { - to << 1 + conn.to << 1 << (i->second.deriver ? store->printStorePath(*i->second.deriver) : ""); WorkerProto::write(*store, wconn, i->second.references); - to << i->second.downloadSize - << i->second.narSize; + conn.to << i->second.downloadSize + << i->second.narSize; } break; } @@ -809,7 +804,7 @@ static void performOp(TunnelLogger * logger, ref store, case WorkerProto::Op::QuerySubstitutablePathInfos: { SubstitutablePathInfos infos; StorePathCAMap pathsMap = {}; - if (GET_PROTOCOL_MINOR(clientVersion) < 22) { + if (GET_PROTOCOL_MINOR(conn.protoVersion) < 22) { auto paths = WorkerProto::Serialise::read(*store, rconn); for (auto & path : paths) pathsMap.emplace(path, std::nullopt); @@ -818,12 +813,12 @@ static void performOp(TunnelLogger * logger, ref store, logger->startWork(); store->querySubstitutablePathInfos(pathsMap, infos); logger->stopWork(); - to << infos.size(); + conn.to << infos.size(); for (auto & i : infos) { - to << store->printStorePath(i.first) - << (i.second.deriver ? store->printStorePath(*i.second.deriver) : ""); + conn.to << store->printStorePath(i.first) + << (i.second.deriver ? store->printStorePath(*i.second.deriver) : ""); WorkerProto::write(*store, wconn, i.second.references); - to << i.second.downloadSize << i.second.narSize; + conn.to << i.second.downloadSize << i.second.narSize; } break; } @@ -837,22 +832,22 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::QueryPathInfo: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); std::shared_ptr info; logger->startWork(); try { info = store->queryPathInfo(path); } catch (InvalidPath &) { - if (GET_PROTOCOL_MINOR(clientVersion) < 17) throw; + if (GET_PROTOCOL_MINOR(conn.protoVersion) < 17) throw; } logger->stopWork(); if (info) { - if (GET_PROTOCOL_MINOR(clientVersion) >= 17) - to << 1; + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 17) + conn.to << 1; WorkerProto::write(*store, wconn, static_cast(*info)); } else { - assert(GET_PROTOCOL_MINOR(clientVersion) >= 17); - to << 0; + assert(GET_PROTOCOL_MINOR(conn.protoVersion) >= 17); + conn.to << 0; } break; } @@ -861,61 +856,61 @@ static void performOp(TunnelLogger * logger, ref store, logger->startWork(); store->optimiseStore(); logger->stopWork(); - to << 1; + conn.to << 1; break; case WorkerProto::Op::VerifyStore: { bool checkContents, repair; - from >> checkContents >> repair; + conn.from >> checkContents >> repair; logger->startWork(); if (repair && !trusted) throw Error("you are not privileged to repair paths"); bool errors = store->verifyStore(checkContents, (RepairFlag) repair); logger->stopWork(); - to << errors; + conn.to << errors; break; } case WorkerProto::Op::AddSignatures: { - auto path = store->parseStorePath(readString(from)); - StringSet sigs = readStrings(from); + auto path = store->parseStorePath(readString(conn.from)); + StringSet sigs = readStrings(conn.from); logger->startWork(); store->addSignatures(path, sigs); logger->stopWork(); - to << 1; + conn.to << 1; break; } case WorkerProto::Op::NarFromPath: { - auto path = store->parseStorePath(readString(from)); + auto path = store->parseStorePath(readString(conn.from)); logger->startWork(); logger->stopWork(); - dumpPath(store->toRealPath(path), to); + dumpPath(store->toRealPath(path), conn.to); break; } case WorkerProto::Op::AddToStoreNar: { bool repair, dontCheckSigs; - auto path = store->parseStorePath(readString(from)); - auto deriver = readString(from); - auto narHash = Hash::parseAny(readString(from), HashAlgorithm::SHA256); + auto path = store->parseStorePath(readString(conn.from)); + auto deriver = readString(conn.from); + auto narHash = Hash::parseAny(readString(conn.from), HashAlgorithm::SHA256); ValidPathInfo info { path, narHash }; if (deriver != "") info.deriver = store->parseStorePath(deriver); info.references = WorkerProto::Serialise::read(*store, rconn); - from >> info.registrationTime >> info.narSize >> info.ultimate; - info.sigs = readStrings(from); - info.ca = ContentAddress::parseOpt(readString(from)); - from >> repair >> dontCheckSigs; + conn.from >> info.registrationTime >> info.narSize >> info.ultimate; + info.sigs = readStrings(conn.from); + info.ca = ContentAddress::parseOpt(readString(conn.from)); + conn.from >> repair >> dontCheckSigs; if (!trusted && dontCheckSigs) dontCheckSigs = false; if (!trusted) info.ultimate = false; - if (GET_PROTOCOL_MINOR(clientVersion) >= 23) { + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 23) { logger->startWork(); { - FramedSource source(from); + FramedSource source(conn.from); store->addToStore(info, source, (RepairFlag) repair, dontCheckSigs ? NoCheckSigs : CheckSigs); } @@ -925,10 +920,10 @@ static void performOp(TunnelLogger * logger, ref store, else { std::unique_ptr source; StringSink saved; - if (GET_PROTOCOL_MINOR(clientVersion) >= 21) - source = std::make_unique(from, to); + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 21) + source = std::make_unique(conn.from, conn.to); else { - TeeSource tee { from, saved }; + TeeSource tee { conn.from, saved }; NullFileSystemObjectSink ether; parseDump(ether, tee); source = std::make_unique(saved.s); @@ -956,15 +951,15 @@ static void performOp(TunnelLogger * logger, ref store, WorkerProto::write(*store, wconn, willBuild); WorkerProto::write(*store, wconn, willSubstitute); WorkerProto::write(*store, wconn, unknown); - to << downloadSize << narSize; + conn.to << downloadSize << narSize; break; } case WorkerProto::Op::RegisterDrvOutput: { logger->startWork(); - if (GET_PROTOCOL_MINOR(clientVersion) < 31) { - auto outputId = DrvOutput::parse(readString(from)); - auto outputPath = StorePath(readString(from)); + if (GET_PROTOCOL_MINOR(conn.protoVersion) < 31) { + auto outputId = DrvOutput::parse(readString(conn.from)); + auto outputPath = StorePath(readString(conn.from)); store->registerDrvOutput(Realisation{ .id = outputId, .outPath = outputPath}); } else { @@ -977,10 +972,10 @@ static void performOp(TunnelLogger * logger, ref store, case WorkerProto::Op::QueryRealisation: { logger->startWork(); - auto outputId = DrvOutput::parse(readString(from)); + auto outputId = DrvOutput::parse(readString(conn.from)); auto info = store->queryRealisation(outputId); logger->stopWork(); - if (GET_PROTOCOL_MINOR(clientVersion) < 31) { + if (GET_PROTOCOL_MINOR(conn.protoVersion) < 31) { std::set outPaths; if (info) outPaths.insert(info->outPath); WorkerProto::write(*store, wconn, outPaths); @@ -993,19 +988,19 @@ static void performOp(TunnelLogger * logger, ref store, } case WorkerProto::Op::AddBuildLog: { - StorePath path{readString(from)}; + StorePath path{readString(conn.from)}; logger->startWork(); if (!trusted) throw Error("you are not privileged to add logs"); auto & logStore = require(*store); { - FramedSource source(from); + FramedSource source(conn.from); StringSink sink; source.drainInto(sink); logStore.addBuildLog(path, sink.s); } logger->stopWork(); - to << 1; + conn.to << 1; break; } @@ -1020,8 +1015,8 @@ static void performOp(TunnelLogger * logger, ref store, void processConnection( ref store, - FdSource & from, - FdSink & to, + FdSource && from, + FdSink && to, TrustedFlag trusted, RecursiveFlag recursive) { @@ -1037,7 +1032,12 @@ void processConnection( if (clientVersion < 0x10a) throw Error("the Nix client version is too old"); - auto tunnelLogger = new TunnelLogger(to, clientVersion); + WorkerProto::BasicServerConnection conn; + conn.to = std::move(to); + conn.from = std::move(from); + conn.protoVersion = clientVersion; + + auto tunnelLogger = new TunnelLogger(conn.to, clientVersion); auto prevLogger = nix::logger; // FIXME if (!recursive) @@ -1050,12 +1050,6 @@ void processConnection( printMsgUsing(prevLogger, lvlDebug, "%d operations", opCount); }); - WorkerProto::BasicServerConnection conn { - .to = to, - .from = from, - .clientVersion = clientVersion, - }; - conn.postHandshake(*store, { .daemonNixVersion = nixVersion, // We and the underlying store both need to trust the client for @@ -1071,13 +1065,13 @@ void processConnection( try { tunnelLogger->stopWork(); - to.flush(); + conn.to.flush(); /* Process client requests. */ while (true) { WorkerProto::Op op; try { - op = (enum WorkerProto::Op) readInt(from); + op = (enum WorkerProto::Op) readInt(conn.from); } catch (Interrupted & e) { break; } catch (EndOfFile & e) { @@ -1091,7 +1085,7 @@ void processConnection( debug("performing daemon worker op: %d", op); try { - performOp(tunnelLogger, store, trusted, recursive, clientVersion, from, to, op); + performOp(tunnelLogger, store, trusted, recursive, conn, op); } catch (Error & e) { /* If we're not in a state where we can send replies, then something went wrong processing the input of the @@ -1107,19 +1101,19 @@ void processConnection( throw; } - to.flush(); + conn.to.flush(); assert(!tunnelLogger->state_.lock()->canSendStderr); }; } catch (Error & e) { tunnelLogger->stopWork(&e); - to.flush(); + conn.to.flush(); return; } catch (std::exception & e) { auto ex = Error(e.what()); tunnelLogger->stopWork(&ex); - to.flush(); + conn.to.flush(); return; } } diff --git a/src/libstore/daemon.hh b/src/libstore/daemon.hh index 1964c0d997c..a8ce32d8deb 100644 --- a/src/libstore/daemon.hh +++ b/src/libstore/daemon.hh @@ -10,8 +10,8 @@ enum RecursiveFlag : bool { NotRecursive = false, Recursive = true }; void processConnection( ref store, - FdSource & from, - FdSink & to, + FdSource && from, + FdSink && to, TrustedFlag trusted, RecursiveFlag recursive); diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 6e8931ca2e4..ebb0864c555 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -73,7 +73,7 @@ void RemoteStore::initConnection(Connection & conn) StringSink saved; TeeSource tee(conn.from, saved); try { - conn.daemonVersion = WorkerProto::BasicClientConnection::handshake( + conn.protoVersion = WorkerProto::BasicClientConnection::handshake( conn.to, tee, PROTOCOL_VERSION); } catch (SerialisationError & e) { /* In case the other side is waiting for our input, close @@ -115,7 +115,7 @@ void RemoteStore::setOptions(Connection & conn) << settings.buildCores << settings.useSubstitutes; - if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 12) { + if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 12) { std::map overrides; settings.getSettings(overrides, true); // libstore settings fileTransferSettings.getSettings(overrides, true); @@ -175,7 +175,7 @@ bool RemoteStore::isValidPathUncached(const StorePath & path) StorePathSet RemoteStore::queryValidPaths(const StorePathSet & paths, SubstituteFlag maybeSubstitute) { auto conn(getConnection()); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 12) { StorePathSet res; for (auto & i : paths) if (isValidPath(i)) res.insert(i); @@ -198,7 +198,7 @@ StorePathSet RemoteStore::queryAllValidPaths() StorePathSet RemoteStore::querySubstitutablePaths(const StorePathSet & paths) { auto conn(getConnection()); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 12) { StorePathSet res; for (auto & i : paths) { conn->to << WorkerProto::Op::HasSubstitutes << printStorePath(i); @@ -221,7 +221,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S auto conn(getConnection()); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 12) { for (auto & i : pathsMap) { SubstitutablePathInfo info; @@ -241,7 +241,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S } else { conn->to << WorkerProto::Op::QuerySubstitutablePathInfos; - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 22) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 22) { StorePathSet paths; for (auto & path : pathsMap) paths.insert(path.first); @@ -368,7 +368,7 @@ ref RemoteStore::addCAToStore( std::optional conn_(getConnection()); auto & conn = *conn_; - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 25) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 25) { conn->to << WorkerProto::Op::AddToStore @@ -485,7 +485,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, { auto conn(getConnection()); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 18) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 18) { auto source2 = sinkToSource([&](Sink & sink) { sink << 1 // == path follows ; @@ -513,11 +513,11 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, << info.ultimate << info.sigs << renderContentAddress(info.ca) << repair << !checkSigs; - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 23) { conn.withFramedSink([&](Sink & sink) { copyNAR(source, sink); }); - } else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) { + } else if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 21) { conn.processStderr(0, &source); } else { copyNAR(source, conn->to); @@ -554,7 +554,7 @@ void RemoteStore::addMultipleToStore( RepairFlag repair, CheckSigsFlag checkSigs) { - if (GET_PROTOCOL_MINOR(getConnection()->daemonVersion) >= 32) { + if (GET_PROTOCOL_MINOR(getConnection()->protoVersion) >= 32) { auto conn(getConnection()); conn->to << WorkerProto::Op::AddMultipleToStore @@ -572,7 +572,7 @@ void RemoteStore::registerDrvOutput(const Realisation & info) { auto conn(getConnection()); conn->to << WorkerProto::Op::RegisterDrvOutput; - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 31) { conn->to << info.id.to_string(); conn->to << std::string(info.outPath.to_string()); } else { @@ -587,7 +587,7 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id, try { auto conn(getConnection()); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 27) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 27) { warn("the daemon is too old to support content-addressed derivations, please upgrade it to 2.4"); return callback(nullptr); } @@ -597,7 +597,7 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id, conn.processStderr(); auto real = [&]() -> std::shared_ptr { - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 31) { auto outPaths = WorkerProto::Serialise>::read( *this, *conn); if (outPaths.empty()) @@ -644,9 +644,9 @@ void RemoteStore::buildPaths(const std::vector & drvPaths, BuildMod auto conn(getConnection()); conn->to << WorkerProto::Op::BuildPaths; - assert(GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13); + assert(GET_PROTOCOL_MINOR(conn->protoVersion) >= 13); WorkerProto::write(*this, *conn, drvPaths); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15) + if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 15) conn->to << buildMode; else /* Old daemons did not take a 'buildMode' parameter, so we @@ -667,7 +667,7 @@ std::vector RemoteStore::buildPathsWithResults( std::optional conn_(getConnection()); auto & conn = *conn_; - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 34) { + if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 34) { conn->to << WorkerProto::Op::BuildPathsWithResults; WorkerProto::write(*this, *conn, paths); conn->to << buildMode; @@ -841,7 +841,7 @@ void RemoteStore::queryMissing(const std::vector & targets, { { auto conn(getConnection()); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 19) + if (GET_PROTOCOL_MINOR(conn->protoVersion) < 19) // Don't hold the connection handle in the fallback case // to prevent a deadlock. goto fallback; @@ -889,7 +889,7 @@ void RemoteStore::connect() unsigned int RemoteStore::getProtocol() { auto conn(connections->get()); - return conn->daemonVersion; + return conn->protoVersion; } std::optional RemoteStore::isTrustedClient() diff --git a/src/libstore/unix/build/local-derivation-goal.cc b/src/libstore/unix/build/local-derivation-goal.cc index f968bbc5b7f..0dd102200a5 100644 --- a/src/libstore/unix/build/local-derivation-goal.cc +++ b/src/libstore/unix/build/local-derivation-goal.cc @@ -1526,10 +1526,11 @@ void LocalDerivationGoal::startDaemon() debug("received daemon connection"); auto workerThread = std::thread([store, remote{std::move(remote)}]() { - FdSource from(remote.get()); - FdSink to(remote.get()); try { - daemon::processConnection(store, from, to, + daemon::processConnection( + store, + FdSource(remote.get()), + FdSink(remote.get()), NotTrusted, daemon::Recursive); debug("terminated daemon connection"); } catch (SystemError &) { diff --git a/src/libstore/worker-protocol-connection.cc b/src/libstore/worker-protocol-connection.cc index 3a640051ebc..93d13d48e55 100644 --- a/src/libstore/worker-protocol-connection.cc +++ b/src/libstore/worker-protocol-connection.cc @@ -58,7 +58,7 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink } else if (msg == STDERR_ERROR) { - if (GET_PROTOCOL_MINOR(daemonVersion) >= 26) { + if (GET_PROTOCOL_MINOR(protoVersion) >= 26) { ex = std::make_exception_ptr(readError(from)); } else { auto error = readString(from); @@ -114,7 +114,7 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink // explain to users what's going on when their daemon is // older than #4628 (2023). if (experimentalFeatureSettings.isEnabled(Xp::DynamicDerivations) - && GET_PROTOCOL_MINOR(daemonVersion) <= 35) { + && GET_PROTOCOL_MINOR(protoVersion) <= 35) { auto m = e.msg(); if (m.find("parsing derivation") != std::string::npos && m.find("expected string") != std::string::npos && m.find("Derive([") != std::string::npos) @@ -172,15 +172,15 @@ WorkerProto::ClientHandshakeInfo WorkerProto::BasicClientConnection::postHandsha { WorkerProto::ClientHandshakeInfo res; - if (GET_PROTOCOL_MINOR(daemonVersion) >= 14) { + if (GET_PROTOCOL_MINOR(protoVersion) >= 14) { // Obsolete CPU affinity. to << 0; } - if (GET_PROTOCOL_MINOR(daemonVersion) >= 11) + if (GET_PROTOCOL_MINOR(protoVersion) >= 11) to << false; // obsolete reserveSpace - if (GET_PROTOCOL_MINOR(daemonVersion) >= 33) + if (GET_PROTOCOL_MINOR(protoVersion) >= 33) to.flush(); return WorkerProto::Serialise::read(store, *this); @@ -188,12 +188,12 @@ WorkerProto::ClientHandshakeInfo WorkerProto::BasicClientConnection::postHandsha void WorkerProto::BasicServerConnection::postHandshake(const StoreDirConfig & store, const ClientHandshakeInfo & info) { - if (GET_PROTOCOL_MINOR(clientVersion) >= 14 && readInt(from)) { + if (GET_PROTOCOL_MINOR(protoVersion) >= 14 && readInt(from)) { // Obsolete CPU affinity. readInt(from); } - if (GET_PROTOCOL_MINOR(clientVersion) >= 11) + if (GET_PROTOCOL_MINOR(protoVersion) >= 11) readInt(from); // obsolete reserveSpace WorkerProto::write(store, *this, info); @@ -211,7 +211,7 @@ UnkeyedValidPathInfo WorkerProto::BasicClientConnection::queryPathInfo( throw InvalidPath(std::move(e.info())); throw; } - if (GET_PROTOCOL_MINOR(daemonVersion) >= 17) { + if (GET_PROTOCOL_MINOR(protoVersion) >= 17) { bool valid; from >> valid; if (!valid) @@ -223,10 +223,10 @@ UnkeyedValidPathInfo WorkerProto::BasicClientConnection::queryPathInfo( StorePathSet WorkerProto::BasicClientConnection::queryValidPaths( const StoreDirConfig & store, bool * daemonException, const StorePathSet & paths, SubstituteFlag maybeSubstitute) { - assert(GET_PROTOCOL_MINOR(daemonVersion) >= 12); + assert(GET_PROTOCOL_MINOR(protoVersion) >= 12); to << WorkerProto::Op::QueryValidPaths; WorkerProto::write(store, *this, paths); - if (GET_PROTOCOL_MINOR(daemonVersion) >= 27) { + if (GET_PROTOCOL_MINOR(protoVersion) >= 27) { to << maybeSubstitute; } processStderr(daemonException); diff --git a/src/libstore/worker-protocol-connection.hh b/src/libstore/worker-protocol-connection.hh index 9dd723fd089..38287d08e31 100644 --- a/src/libstore/worker-protocol-connection.hh +++ b/src/libstore/worker-protocol-connection.hh @@ -6,7 +6,7 @@ namespace nix { -struct WorkerProto::BasicClientConnection +struct WorkerProto::BasicConnection { /** * Send with this. @@ -19,14 +19,45 @@ struct WorkerProto::BasicClientConnection FdSource from; /** - * Worker protocol version used for the connection. + * The protocol version agreed by both sides. + */ + WorkerProto::Version protoVersion; + + /** + * Coercion to `WorkerProto::ReadConn`. This makes it easy to use the + * factored out serve protocol serializers with a + * `LegacySSHStore::Connection`. + * + * The serve protocol connection types are unidirectional, unlike + * this type. + */ + operator WorkerProto::ReadConn() + { + return WorkerProto::ReadConn{ + .from = from, + .version = protoVersion, + }; + } + + /** + * Coercion to `WorkerProto::WriteConn`. This makes it easy to use the + * factored out serve protocol serializers with a + * `LegacySSHStore::Connection`. * - * Despite its name, it is actually the maximum version both - * sides support. (If the maximum doesn't exist, we would fail to - * establish a connection and produce a value of this type.) + * The serve protocol connection types are unidirectional, unlike + * this type. */ - WorkerProto::Version daemonVersion; + operator WorkerProto::WriteConn() + { + return WorkerProto::WriteConn{ + .to = to, + .version = protoVersion, + }; + } +}; +struct WorkerProto::BasicClientConnection : WorkerProto::BasicConnection +{ /** * Flush to direction */ @@ -60,38 +91,6 @@ struct WorkerProto::BasicClientConnection */ ClientHandshakeInfo postHandshake(const StoreDirConfig & store); - /** - * Coercion to `WorkerProto::ReadConn`. This makes it easy to use the - * factored out serve protocol serializers with a - * `LegacySSHStore::Connection`. - * - * The serve protocol connection types are unidirectional, unlike - * this type. - */ - operator WorkerProto::ReadConn() - { - return WorkerProto::ReadConn{ - .from = from, - .version = daemonVersion, - }; - } - - /** - * Coercion to `WorkerProto::WriteConn`. This makes it easy to use the - * factored out serve protocol serializers with a - * `LegacySSHStore::Connection`. - * - * The serve protocol connection types are unidirectional, unlike - * this type. - */ - operator WorkerProto::WriteConn() - { - return WorkerProto::WriteConn{ - .to = to, - .version = daemonVersion, - }; - } - void addTempRoot(const StoreDirConfig & remoteStore, bool * daemonException, const StorePath & path); StorePathSet queryValidPaths( @@ -124,43 +123,8 @@ struct WorkerProto::BasicClientConnection void importPaths(const StoreDirConfig & store, bool * daemonException, Source & source); }; -struct WorkerProto::BasicServerConnection +struct WorkerProto::BasicServerConnection : WorkerProto::BasicConnection { - /** - * Send with this. - */ - FdSink & to; - - /** - * Receive with this. - */ - FdSource & from; - - /** - * Worker protocol version used for the connection. - * - * Despite its name, it is actually the maximum version both - * sides support. (If the maximum doesn't exist, we would fail to - * establish a connection and produce a value of this type.) - */ - WorkerProto::Version clientVersion; - - operator WorkerProto::ReadConn() - { - return WorkerProto::ReadConn{ - .from = from, - .version = clientVersion, - }; - } - - operator WorkerProto::WriteConn() - { - return WorkerProto::WriteConn{ - .to = to, - .version = clientVersion, - }; - } - /** * Establishes connection, negotiating version. * diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh index 62a12d18201..9fc16d0153e 100644 --- a/src/libstore/worker-protocol.hh +++ b/src/libstore/worker-protocol.hh @@ -82,6 +82,7 @@ struct WorkerProto * * @todo remove once Hydra uses Store abstraction consistently. */ + struct BasicConnection; struct BasicClientConnection; struct BasicServerConnection; @@ -183,8 +184,7 @@ enum struct WorkerProto::Op : uint64_t struct WorkerProto::ClientHandshakeInfo { /** - * The version of the Nix daemon that is processing our requests -. + * The version of the Nix daemon that is processing our requests. * * Do note, it may or may not communicating with another daemon, * rather than being an "end" `LocalStore` or similar. diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index 18f4a79c398..8254a5f899d 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -159,13 +159,7 @@ struct FdSource : BufferedSource FdSource(Descriptor fd) : fd(fd) { } FdSource(FdSource &&) = default; - FdSource & operator=(FdSource && s) - { - fd = s.fd; - s.fd = INVALID_DESCRIPTOR; - read = s.read; - return *this; - } + FdSource & operator=(FdSource && s) = default; bool good() override; protected: diff --git a/src/nix/unix/daemon.cc b/src/nix/unix/daemon.cc index 4a7997b1fb0..66d8dbcf092 100644 --- a/src/nix/unix/daemon.cc +++ b/src/nix/unix/daemon.cc @@ -370,9 +370,12 @@ static void daemonLoop(std::optional forceTrustClientOpt) } // Handle the connection. - FdSource from(remote.get()); - FdSink to(remote.get()); - processConnection(openUncachedStore(), from, to, trusted, NotRecursive); + processConnection( + openUncachedStore(), + FdSource(remote.get()), + FdSink(remote.get()), + trusted, + NotRecursive); exit(0); }, options); @@ -437,9 +440,11 @@ static void forwardStdioConnection(RemoteStore & store) { */ static void processStdioConnection(ref store, TrustedFlag trustClient) { - FdSource from(STDIN_FILENO); - FdSink to(STDOUT_FILENO); - processConnection(store, from, to, trustClient, NotRecursive); + processConnection( + store, + FdSource(STDIN_FILENO), + FdSink(STDOUT_FILENO), + trustClient, NotRecursive); } /**