Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pipes to stream logs #52

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions hnix-store-core/src/System/Nix/Util.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module System.Nix.Util where
import Control.Monad
import Data.Binary.Get
import Data.Binary.Put
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LBS

putInt :: Integral a => a -> Put
Expand All @@ -16,18 +17,17 @@ getInt :: Integral a => Get a
getInt = fromIntegral <$> getWord64le

-- length prefixed string packing with padding to 8 bytes
putByteStringLen :: LBS.ByteString -> Put
putByteStringLen :: BS.ByteString -> Put
putByteStringLen x = do
putInt $ fromIntegral $ len
putLazyByteString x
when (len `mod` 8 /= 0) $
pad $ fromIntegral $ 8 - (len `mod` 8)
where len = LBS.length x
pad x = forM_ (take x $ cycle [0]) putWord8
putInt $ len
putByteString x
pad $ 8 - (len `mod` 8)
where len = BS.length x
pad x = replicateM_ x (putWord8 0)

putByteStrings :: Foldable t => t LBS.ByteString -> Put
putByteStrings :: Foldable t => t BS.ByteString -> Put
putByteStrings xs = do
putInt $ fromIntegral $ length xs
putInt $ length xs
mapM_ putByteStringLen xs

getByteStringLen :: Get LBS.ByteString
Expand All @@ -38,11 +38,10 @@ getByteStringLen = do
pads <- unpad $ fromIntegral $ 8 - (len `mod` 8)
unless (all (==0) pads) $ fail $ "No zeroes" ++ show (st, len, pads)
return st
where unpad x = sequence $ replicate x getWord8
where unpad x = replicateM x getWord8

getByteStrings :: Get [LBS.ByteString]
getByteStrings = do
count <- getInt
res <- sequence $ replicate count getByteStringLen
return res

13 changes: 3 additions & 10 deletions hnix-store-remote/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,10 @@ via `nix-daemon`.
## Example

```haskell

import Control.Monad.IO.Class (liftIO)
import Data.HashSet as HS
import System.Nix.Store.Remote

main = do
runStore $ do
main =
runStore_ $ do
syncWithGC
roots <- findRoots
liftIO $ print roots

res <- addTextToStore "hnix-store" "test" (HS.fromList []) False
liftIO $ print res
optimiseStore
```
1 change: 1 addition & 0 deletions hnix-store-remote/hnix-store-remote.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ library
, unix
, network
, mtl
, pipes
, unordered-containers
-- , pretty-simple
-- , base16-bytestring
Expand Down
39 changes: 31 additions & 8 deletions hnix-store-remote/src/System/Nix/Store/Remote.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module System.Nix.Store.Remote (
runStore
BuildMode(..)
, runStore
, syncWithGC
, optimiseStore
, verifyStore
, buildPaths
) where

import Control.Monad

import Data.Binary.Put (Put, putInthost)
import Data.ByteString (ByteString)
import System.Nix.Util
import System.Nix.Store.Remote.Types
import System.Nix.Store.Remote.Protocol
import System.Nix.Store.Remote.Util
Expand All @@ -22,13 +25,33 @@ type RepairFlag = Bool
type CheckFlag = Bool

syncWithGC :: MonadStore ()
syncWithGC = void $ simpleOp SyncWithGC
syncWithGC = runOp_ SyncWithGC

optimiseStore :: MonadStore ()
optimiseStore = void $ simpleOp OptimiseStore
optimiseStore = runOp_ OptimiseStore

-- returns True on errors
verifyStore :: CheckFlag -> RepairFlag -> MonadStore Bool
verifyStore check repair = simpleOpArgs VerifyStore $ do
verifyStore :: CheckFlag -> RepairFlag -> MonadStore ()
verifyStore check repair = runOpArgs_ VerifyStore $ do
putBool check
putBool repair

data BuildMode = Normal | Repair | Check
deriving (Eq, Show)

putBuildMode :: BuildMode -> Put
putBuildMode mode = putInthost $
case mode of
Normal -> 0
Repair -> 1
Check -> 2

buildPaths ::
-- forall storeDir . (KnownStoreDir storeDir) =>
-- [StorePath storeDir]
[ByteString] -> BuildMode -> MonadStore ()
buildPaths drvs mode =
runOpArgs_ BuildPaths args
where
args = do
putByteStrings drvs
putBuildMode mode
57 changes: 33 additions & 24 deletions hnix-store-remote/src/System/Nix/Store/Remote/Logger.hs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
module System.Nix.Store.Remote.Logger (
Logger(..)
, Field(..)
, processOutput)
where
, streamLogs
) where

import Control.Monad.Except (throwError)
import Control.Monad (replicateM)
import Control.Monad.Reader (ask, liftIO)
import Data.Binary.Get

import Network.Socket.ByteString (recv)

import Pipes (lift, yield)
import System.Nix.Store.Remote.Types
import System.Nix.Util

Expand All @@ -26,30 +27,38 @@ controlParser = do
0x52534c54 -> Result <$> getInt <*> getInt <*> getFields
x -> fail $ "Invalid control message received:" ++ show x

processOutput :: MonadStore [Logger]
processOutput = go decoder
where decoder = runGetIncremental controlParser
go :: Decoder Logger -> MonadStore [Logger]
go (Done _leftover _consumed ctrl) = do
case ctrl of
e@(Error _ _) -> return [e]
Last -> return [Last]
-- we should probably handle Read here as well
x -> do
next <- go decoder
return $ x:next
go (Partial k) = do
soc <- ask
chunk <- liftIO (Just <$> recv soc 8)
go (k chunk)
logger :: Logger -> MonadStore ()
logger = lift . yield

streamLogs :: MonadStore ()
streamLogs = go decoder
where
go :: Decoder Logger -> MonadStore ()
go (Done _leftover _consumed ctrl) = do
case ctrl of
e@(Error status err) -> do
logger e
throwError (LogError status err)
Last ->
logger Last
-- we should probably handle Read here as well
x -> do
logger x
go decoder
go (Partial cont) = do
soc <- ask
chunk <- liftIO (recv soc 8)
go (cont (Just chunk))
go (Fail _leftover _consumed msg) =
throwError (ParseError msg)

go (Fail _leftover _consumed msg) = do
error msg
decoder :: Decoder Logger
decoder = runGetIncremental controlParser

getFields :: Get [Field]
getFields = do
cnt <- getInt
sequence $ replicate cnt getField
count <- getInt
replicateM count getField

getField :: Get Field
getField = do
Expand Down
106 changes: 50 additions & 56 deletions hnix-store-remote/src/System/Nix/Store/Remote/Protocol.hs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
{-# LANGUAGE TypeApplications #-}
module System.Nix.Store.Remote.Protocol (
WorkerOp(..)
, simpleOp
, simpleOpArgs
, runOp
, runOp_
, runOpArgs
, runStore) where
, runOpArgs_
, runStore
, runStore_
) where

import Control.Exception (bracket)
import Control.Monad.Except
import Control.Exception (SomeException, bracket, catch, displayException)
import Control.Monad.Except (throwError, runExceptT)
import Control.Monad.Reader
import Control.Monad.State

import Data.Binary.Get
import Data.Binary.Put
import qualified Data.ByteString.Char8 as BSC
import qualified Data.ByteString.Lazy as LBS
import Data.Binary.Get

import Network.Socket

import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString (recv)
import Pipes
import qualified Pipes.Prelude as Pipes

import System.Nix.Store.Remote.Logger
import System.Nix.Store.Remote.Types
Expand Down Expand Up @@ -106,65 +108,57 @@ opNum NarFromPath = 38
opNum AddToStoreNar = 39
opNum QueryMissing = 40

runOp :: WorkerOp -> Get a -> MonadStore a
runOp op result = runOpArgs op mempty result

simpleOp :: WorkerOp -> MonadStore Bool
simpleOp op = do
simpleOpArgs op $ return ()

simpleOpArgs :: WorkerOp -> Put -> MonadStore Bool
simpleOpArgs op args = do
runOpArgs op args
err <- gotError
case err of
True -> do
Error _num msg <- head <$> getError
throwError $ BSC.unpack $ LBS.toStrict msg
False -> do
sockGetBool

runOp :: WorkerOp -> MonadStore ()
runOp op = runOpArgs op $ return ()

runOpArgs :: WorkerOp -> Put -> MonadStore ()
runOpArgs op args = do

-- Temporary hack for printing the messages destined for nix-daemon socket
when False $
liftIO $ LBS.writeFile "mytestfile2" $ runPut $ do
putInt $ opNum op
args
runOp_ :: WorkerOp -> MonadStore ()
runOp_ op = runOp op (skip 8)

runOpArgs :: WorkerOp -> Put -> Get a -> MonadStore a
runOpArgs op args result = do
sockPut $ do
putInt $ opNum op
putInt (opNum op)
args
streamLogs
sockGet result

out <- processOutput
modify (++out)
err <- gotError
when err $ do
Error _num msg <- head <$> getError
throwError $ BSC.unpack $ LBS.toStrict msg
runOpArgs_ :: WorkerOp -> Put -> MonadStore ()
runOpArgs_ op args = runOpArgs op args (skip 8)

runStore :: MonadStore a -> IO (Either String a, [Logger])
runStore code = do
bracket (open sockPath) close run
runStore :: Consumer Logger IO (Either Error a) -> MonadStore a -> IO (Either Error a)
runStore sink code =
bracket (open sockPath) close run `catch` onException
where
open path = do
soc <- socket AF_UNIX Stream 0
connect soc (SockAddrUnix path)
return soc
sock <- socket AF_UNIX Stream 0
connect sock (SockAddrUnix path)
return sock

greet = do
sockPut $ putInt workerMagic1
soc <- ask
vermagic <- liftIO $ recv soc 16
let (magic2, daemonProtoVersion) = flip runGet (LBS.fromStrict vermagic) $ (,) <$> getInt <*> getInt
unless (magic2 == workerMagic2) $ error "Worker magic 2 mismatch"

magic2 <- sockGetInt
_ <- sockGetInt -- daemonVersion

unless (magic2 == workerMagic2) $
throwError (ConnError "Worker magic 2 mismatch")

sockPut $ putInt protoVersion -- clientVersion
sockPut $ putInt (0 :: Int) -- affinity
sockPut $ putInt (0 :: Int) -- obsolete reserveSpace

processOutput
streamLogs -- receive startup error messages, if any

run sock =
flip runReaderT sock $ flip runStateT [] $ runExceptT (greet >> code)
let producer =
runExceptT $ do
greet
code
effect = producer >-> hoist liftIO sink
in runReaderT (runEffect effect) sock

onException :: SomeException -> IO (Either Error a)
onException = return . Left . ConnError . displayException

runStore_ :: MonadStore a -> IO (Either Error a)
runStore_ = runStore Pipes.drain
Loading