{- Persistent sqlite database handles. - - Copyright 2015-2018 Joey Hess - - Licensed under the GNU GPL version 3 or higher. -} module Database.Handle ( DbHandle, DbConcurrency(..), openDb, TableName, queryDb, closeDb, commitDb, commitDb', ) where import Utility.Exception import Utility.FileSystemEncoding import Database.Persist.Sqlite import qualified Database.Sqlite as Sqlite import Control.Monad import Control.Monad.IO.Class (liftIO) import Control.Concurrent import Control.Concurrent.Async import Control.Exception (throwIO, BlockedIndefinitelyOnMVar(..)) import qualified Data.Text as T import Control.Monad.Trans.Resource (runResourceT) import Control.Monad.Logger (runNoLoggingT) import System.IO {- A DbHandle is a reference to a worker thread that communicates with - the database. It has a MVar which Jobs are submitted to. -} data DbHandle = DbHandle DbConcurrency (Async ()) (MVar Job) {- Name of a table that should exist once the database is initialized. -} type TableName = String {- Sqlite only allows a single write to a database at a time; a concurrent - write will crash. - - MultiWrter works around this limitation. - The downside of using MultiWriter is that after writing a change to the - database, the a query using the same DbHandle will not immediately see - the change! This is because the change is actually written using a - separate database connection, and caching can prevent seeing the change. - Also, consider that if multiple processes are writing to a database, - you can't rely on seeing values you've just written anyway, as another - process may change them. - - When a database can only be written to by a single process (enforced by - a lock file), use SingleWriter. Changes written to the database will - always be immediately visible then. Multiple threads can write; their - writes will be serialized. -} data DbConcurrency = SingleWriter | MultiWriter {- Opens the database, but does not perform any migrations. Only use - once the database is known to exist and have the right tables. -} openDb :: DbConcurrency -> FilePath -> TableName -> IO DbHandle openDb dbconcurrency db tablename = do jobs <- newEmptyMVar worker <- async (workerThread (T.pack db) tablename jobs) -- work around https://github.com/yesodweb/persistent/issues/474 liftIO $ fileEncoding stderr return $ DbHandle dbconcurrency worker jobs {- This is optional; when the DbHandle gets garbage collected it will - auto-close. -} closeDb :: DbHandle -> IO () closeDb (DbHandle _ worker jobs) = do putMVar jobs CloseJob wait worker {- Makes a query using the DbHandle. This should not be used to make - changes to the database! - - Note that the action is not run by the calling thread, but by a - worker thread. Exceptions are propigated to the calling thread. - - Only one action can be run at a time against a given DbHandle. - If called concurrently in the same process, this will block until - it is able to run. - - Note that when the DbHandle was opened in MultiWriter mode, recent - writes may not be seen by queryDb. -} queryDb :: DbHandle -> SqlPersistM a -> IO a queryDb (DbHandle _ _ jobs) a = do res <- newEmptyMVar putMVar jobs $ QueryJob $ liftIO . putMVar res =<< tryNonAsync a (either throwIO return =<< takeMVar res) `catchNonAsync` (const $ error "sqlite query crashed") {- Writes a change to the database. - - In MultiWriter mode, writes can fail if another write is happening - concurrently. So write failures are caught and retried repeatedly - for up to 10 seconds, which should avoid all but the most exceptional - problems. -} commitDb :: DbHandle -> SqlPersistM () -> IO () commitDb h wa = robustly Nothing 100 (commitDb' h wa) where robustly :: Maybe SomeException -> Int -> IO (Either SomeException ()) -> IO () robustly e 0 _ = error $ "failed to commit changes to sqlite database: " ++ show e robustly _ n a = do r <- a case r of Right _ -> return () Left e -> do threadDelay 100000 -- 1/10th second robustly (Just e) (n-1) a commitDb' :: DbHandle -> SqlPersistM () -> IO (Either SomeException ()) commitDb' (DbHandle MultiWriter _ jobs) a = do res <- newEmptyMVar putMVar jobs $ RobustChangeJob $ \runner -> liftIO $ putMVar res =<< tryNonAsync (runner a) takeMVar res commitDb' (DbHandle SingleWriter _ jobs) a = do res <- newEmptyMVar putMVar jobs $ ChangeJob $ liftIO . putMVar res =<< tryNonAsync a takeMVar res `catchNonAsync` (const $ error "sqlite commit crashed") data Job = QueryJob (SqlPersistM ()) | ChangeJob (SqlPersistM ()) | RobustChangeJob ((SqlPersistM () -> IO ()) -> IO ()) | CloseJob workerThread :: T.Text -> TableName -> MVar Job -> IO () workerThread db tablename jobs = go where go = do v <- tryNonAsync (runSqliteRobustly tablename db loop) case v of Left e -> hPutStrLn stderr $ "sqlite worker thread crashed: " ++ show e Right True -> go Right False -> return () getjob :: IO (Either BlockedIndefinitelyOnMVar Job) getjob = try $ takeMVar jobs loop = do job <- liftIO getjob case job of -- Exception is thrown when the MVar is garbage -- collected, which means the whole DbHandle -- is not used any longer. Shutdown cleanly. Left BlockedIndefinitelyOnMVar -> return False Right CloseJob -> return False Right (QueryJob a) -> a >> loop Right (ChangeJob a) -> do a -- Exit this sqlite transaction so the -- database gets updated on disk. return True -- Change is run in a separate database connection -- since sqlite only supports a single writer at a -- time, and it may crash the database connection -- that the write is made to. Right (RobustChangeJob a) -> do liftIO (a (runSqliteRobustly tablename db)) loop -- Like runSqlite, but more robust. -- -- New database connections can sometimes take a while to become usable. -- This may be due to WAL mode recovering after a crash, or perhaps a bug -- like described in blob 500f777a6ab6c45ca5f9790e0a63575f8e3cb88f. -- So, loop until a select succeeds; once one succeeds the connection will -- stay usable. -- -- And sqlite sometimes throws ErrorIO when there's not really an IO problem, -- but perhaps just a short read(). That's caught and retried several times. runSqliteRobustly :: TableName -> T.Text -> (SqlPersistM a) -> IO a runSqliteRobustly tablename db a = do conn <- opensettle maxretries go conn maxretries where maxretries = 100 :: Int rethrow msg e = throwIO $ userError $ show e ++ "(" ++ msg ++ ")" go conn retries = do r <- try $ runResourceT $ runNoLoggingT $ withSqlConn (wrapConnection conn) $ runSqlConn a case r of Right v -> return v Left ex@(Sqlite.SqliteException { Sqlite.seError = e }) | e == Sqlite.ErrorIO -> let retries' = retries - 1 in if retries' < 1 then rethrow "after successful open" ex else go conn retries' | otherwise -> rethrow "after successful open" ex opensettle retries = do conn <- Sqlite.open db settle conn retries settle conn retries = do r <- try $ do stmt <- Sqlite.prepare conn nullselect void $ Sqlite.step stmt void $ Sqlite.finalize stmt case r of Right _ -> return conn Left ex@(Sqlite.SqliteException { Sqlite.seError = e }) | e == Sqlite.ErrorBusy -> do -- Wait and retry any number of times; it -- will stop being busy eventually. briefdelay settle conn retries | e == Sqlite.ErrorIO -> do -- Could be a real IO error, -- so don't retry indefinitely. Sqlite.close conn briefdelay let retries' = retries - 1 if retries' < 1 then rethrow "while opening database connection" ex else opensettle retries' | otherwise -> rethrow "while opening database connection" ex -- This should succeed for any table. nullselect = T.pack $ "SELECT null from " ++ tablename ++ " limit 1" briefdelay = threadDelay 1000 -- 1/1000th second