-- Copyright (c) Gree, Inc. 2013 -- License: MIT-style {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} module Network.JobQueue.Backend.Sqlite3 (openSqlite3Backend, newSqlite3Backend) where import qualified Data.ByteString.Char8 as BS import Database.HDBC import Database.HDBC.Sqlite3 import Network.JobQueue.Backend.Types import Network.JobQueue.Backend.Class import Control.Concurrent.MVar import Control.Exception data Sqlite3Queue = Sqlite3Queue { conn :: Connection , queueName :: String , mlock :: MVar () } instance BackendQueue Sqlite3Queue where readQueue = readDBQueue peekQueue = peekDBQueue updateQueue = updateDBQueue deleteQueue = deleteDBQueue writeQueue = writeDBQueue listQueue = listDBQueue itemsQueue = itemsDBQueue countQueue = countDBQueue closeQueue = const $ return () openSqlite3Backend :: String -> IO Backend openSqlite3Backend filePath = do c <- connectSqlite3 filePath m <- newMVar () return $ Backend { bOpenQueue = \qn -> do _ <- withLock m $ withTransaction c $ \c' -> do run c' ("CREATE TABLE IF NOT EXISTS '" ++ qn ++ "' (key INTEGER PRIMARY KEY AUTOINCREMENT, prio INTEGER, value TEXT, version INTEGER)") [] return (Sqlite3Queue c qn m) , bClose = disconnect c } newSqlite3Backend :: Connection -> IO Backend newSqlite3Backend c = do m <- newMVar () return $ Backend { bOpenQueue = \qn -> do return (Sqlite3Queue c qn m) , bClose = return () } readDBQueue :: Sqlite3Queue -> IO (Maybe (BS.ByteString, String)) readDBQueue Sqlite3Queue {..} = withLock mlock $ withTransaction conn $ \conn' -> do sqlvalues <- quickQuery' conn' ("SELECT key, value FROM '" ++ queueName ++ "' ORDER BY prio, key LIMIT 1") [] case sqlvalues of ((key:value:_):_) -> do _ <- run conn' ("DELETE FROM '" ++ queueName ++ "' WHERE key = ?") [toSql key] return (Just (fromSql value, fromSql key)) _ -> return (Nothing) peekDBQueue :: Sqlite3Queue -> IO (Maybe (BS.ByteString, String, String, Int)) peekDBQueue Sqlite3Queue {..} = withLock mlock $ withTransaction conn $ \conn' -> do sqlvalues <- quickQuery' conn' ("SELECT key, value, version FROM '" ++ queueName ++ "' ORDER BY prio, key LIMIT 1") [] case sqlvalues of ((key:value:version:_):_) -> return (Just (fromSql value, fromSql key, fromSql key, fromSql version)) _ -> return (Nothing) writeDBQueue :: Sqlite3Queue -> BS.ByteString -> Int -> IO (String) writeDBQueue Sqlite3Queue {..} value prio = do withLock mlock $ withTransaction conn $ \conn' -> do _ <- run conn' ("INSERT INTO '" ++ queueName ++ "'(prio, value, version) VALUES (?,?,0)") [toSql prio, toSql value] sqlvalues <- quickQuery' conn' ("SELECT seq FROM sqlite_sequence where name = '" ++ queueName ++ "'") [] case sqlvalues of ((key:_):_) -> do return (fromSql key) _ -> return ("") deleteDBQueue :: Sqlite3Queue -> String -> IO (Bool) deleteDBQueue Sqlite3Queue {..} key = withLock mlock $ withTransaction conn $ \conn' -> do _ <- run conn' ("DELETE FROM '" ++ queueName ++ "' WHERE key = ?") [toSql key] return (True) updateDBQueue :: Sqlite3Queue -> String -> BS.ByteString -> Int -> IO (Bool) updateDBQueue Sqlite3Queue {..} key value version = do withLock mlock $ withTransaction conn $ \conn' -> do nrows <- run conn' ("UPDATE '" ++ queueName ++ "' SET value = ?, version = ? WHERE key = ? AND version = ?") [toSql value, toSql (version+1), toSql key, toSql version] return $ if nrows > 0 then True else False countDBQueue :: Sqlite3Queue -> IO (Int) countDBQueue Sqlite3Queue {..} = withLock mlock $ withTransaction conn $ \conn' -> do sqlvalues <- quickQuery' conn' ("SELECT COUNT (*) FROM '" ++ queueName ++ "' ORDER BY prio, key LIMIT 1") [] case sqlvalues of ((count:_):_) -> return (fromSql count) _ -> return (0) itemsDBQueue :: Sqlite3Queue -> IO ([String]) itemsDBQueue Sqlite3Queue {..} = withLock mlock $ withTransaction conn $ \conn' -> do sqlvalues <- quickQuery' conn' ("SELECT key FROM '" ++ queueName ++ "' ORDER BY prio, key") [] case sqlvalues of keys -> return (map (fromSql . head) keys) listDBQueue :: Sqlite3Queue -> IO ([BS.ByteString]) listDBQueue Sqlite3Queue {..} = withLock mlock $ withTransaction conn $ \conn' -> do sqlvalues <- quickQuery' conn' ("SELECT value FROM '" ++ queueName ++ "' ORDER BY prio, key") [] case sqlvalues of keys -> return (map (fromSql . head) keys) withLock :: MVar () -> IO a -> IO a withLock m act = handleSql (\err -> throwIO $ SessionError (show err)) $ do bracket (takeMVar m) (putMVar m) $ const act