Skip to content

Commit

Permalink
backend-db: Actually catch serialization errors to retry them
Browse files Browse the repository at this point in the history
  • Loading branch information
3noch committed May 7, 2020
1 parent b4ddbf7 commit b322cfa
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 10 deletions.
6 changes: 3 additions & 3 deletions backend-db/Rhyolite/Backend/DB/PsqlSimple.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ module Rhyolite.Backend.DB.PsqlSimple
, Binary (..), (:.)(..), PGArray (..)
, ToRow (..), FromRow (..)
, ToField (..), FromField (..)
, Query (..), sql, traceQuery, traceExecute, traceExecute_
, Query (..)
, WrappedSqlError (..)
, sql, traceQuery, traceExecute, traceExecute_
, liftWithConn
, queryQ, executeQ, executeQ_, sqlQ, traceQueryQ, traceExecuteQ, traceExecuteQ_
, fromIdRow
Expand Down Expand Up @@ -63,7 +65,6 @@ data WrappedSqlError = WrappedSqlError
, _wrappedSqlError_error :: SqlError
}
deriving Show

instance Exception WrappedSqlError

rethrowWithQuery :: ToRow q => Connection -> Query -> q -> SqlError -> IO a
Expand Down Expand Up @@ -128,7 +129,6 @@ class PostgresRaw m where
default returning :: (m ~ t n, ToRow q, FromRow r, PostgresRaw n, Monad n, MonadTrans t) => Query -> [q] -> m [r]
returning psql qs = lift $ returning psql qs


traceQuery :: (PostgresRaw m, MonadIO m, ToRow q, FromRow r) => Query -> q -> m [r]
traceQuery p q = do
s <- formatQuery p q
Expand Down
64 changes: 57 additions & 7 deletions backend-db/Rhyolite/Backend/DB/Serializable.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,33 @@

module Rhyolite.Backend.DB.Serializable
( Serializable
, SqlSerializationError (..)
, runSerializable
, toDbPersist
, unsafeLiftDbPersist
, unsafeMkSerializable
, unSerializable
, withSqlSerializationErrorWrapping
) where

import qualified Control.Exception as E
import Control.Monad.Base (MonadBase (liftBase))
import Control.Monad.Catch (MonadThrow)
import qualified Control.Monad.Catch as MonadCatch
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Reader (ReaderT, runReaderT, withReaderT)
import Control.Monad.Logger (MonadLogger, LoggingT)
import qualified Control.Monad.State as S
import Data.Coerce (coerce)
import qualified Database.Groundhog.Generic.Migration as Mig
import Database.Groundhog.Postgresql (Postgresql (..))
import qualified Database.PostgreSQL.Simple as Pg
import qualified Database.PostgreSQL.Simple.Transaction as Pg
import Data.Pool (Pool, withResource)
import qualified Database.Groundhog.Core as Hog

import qualified Rhyolite.Backend.DB.PsqlSimple as PsqlSimple
import Rhyolite.Logging (LoggingEnv, runLoggingEnv)

import qualified Control.Monad.State as S

-- | A monad for database transactions with serializable isolation level.
--
-- Because this monad may retry execution of code automatically, it does not lawfully lift any effects other
Expand All @@ -42,7 +45,7 @@ import qualified Control.Monad.State as S
-- It "disallows" (makes harder) arbitrary IO.
-- It "disallows" (makes harder) catching IO exceptions *inside* the transaction.
newtype Serializable a = Serializable (ReaderT Pg.Connection (LoggingT IO) a)
deriving (Functor, Applicative, Monad, MonadThrow, MonadLogger)
deriving (Functor, Applicative, Monad, MonadCatch.MonadThrow, MonadLogger)
-- NOTE: We *intentionally* leave out
-- - 'MonadCatch' so you can't accidentally mask a serialization error from the outer retry logic.
-- - 'MonadBaseControl' (et al) for the same reason.
Expand Down Expand Up @@ -101,6 +104,20 @@ instance Mig.SchemaAnalyzer Serializable where
getMigrationPack i = coerce <$> unsafeLiftDbPersist (Mig.getMigrationPack i)


data SqlSerializationError = SqlSerializationError deriving (Eq, Ord, Show)
instance E.Exception SqlSerializationError

withSqlSerializationErrorWrapping :: forall m a. (MonadCatch.MonadCatch m, MonadCatch.MonadThrow m) => m a -> m a
withSqlSerializationErrorWrapping = flip MonadCatch.catches
[ MonadCatch.Handler $ \(e :: Pg.SqlError) -> convert id e
, MonadCatch.Handler $ \(e :: PsqlSimple.WrappedSqlError) -> convert PsqlSimple._wrappedSqlError_error e
]
where
convert :: E.Exception e => (e -> Pg.SqlError) -> e -> m a
convert toSqlError e = if Pg.isSerializationError (toSqlError e)
then MonadCatch.throwM SqlSerializationError
else MonadCatch.throwM e

unsafeMkSerializable :: ReaderT Pg.Connection (LoggingT IO) a -> Serializable a
unsafeMkSerializable = Serializable

Expand All @@ -111,9 +128,42 @@ toDbPersist :: forall a. Serializable a -> Hog.DbPersist Postgresql (LoggingT IO
toDbPersist (Serializable act) = Hog.DbPersist $ withReaderT coerce act

unsafeLiftDbPersist :: forall a. Hog.DbPersist Postgresql (LoggingT IO) a -> Serializable a
unsafeLiftDbPersist (Hog.DbPersist act) = Serializable $ withReaderT coerce act
unsafeLiftDbPersist (Hog.DbPersist act) = Serializable $ withSqlSerializationErrorWrapping $ withReaderT coerce act

runSerializable :: forall a m. (MonadIO m) => Pool Pg.Connection -> LoggingEnv -> Serializable a -> m a
runSerializable pool logger (Serializable act) = liftIO $ withResource pool $ \c ->
Pg.withTransactionSerializable c $
runLoggingEnv logger $ runReaderT act c
withTransactionModeRetry'
(Pg.TransactionMode{ Pg.isolationLevel = Pg.Serializable, Pg.readWriteMode = Pg.ReadWrite})
(\(_ :: SqlSerializationError) -> True)
c
(runLoggingEnv logger $ runReaderT act c)


-- | Like 'withTransactionMode', but also takes a custom callback to
-- determine if a transaction should be retried if an 'SqlError' occurs.
-- If the callback returns True, then the transaction will be retried.
-- If the callback returns False, or an exception other than an 'SqlError'
-- occurs then the transaction will be rolled back and the exception rethrown.
--
-- This is used to implement 'withTransactionSerializable'.
withTransactionModeRetry' :: forall e a. E.Exception e => Pg.TransactionMode -> (e -> Bool) -> Pg.Connection -> IO a -> IO a
withTransactionModeRetry' mode shouldRetry conn act =
E.mask $ \restore ->
retryLoop $ E.try $ do
a <- restore act `E.onException` rollback_ conn
Pg.commit conn
return a
where
retryLoop :: IO (Either e a) -> IO a
retryLoop act' = do
Pg.beginMode mode conn
r <- act'
case r of
Left e -> case shouldRetry e of
True -> retryLoop act'
False -> E.throwIO e
Right a -> return a

-- | Rollback a transaction, ignoring any @IOErrors@
rollback_ :: Pg.Connection -> IO ()
rollback_ c = Pg.rollback c `E.catch` \(_ :: IOError) -> return ()

0 comments on commit b322cfa

Please sign in to comment.