module Network.JobQueue.Backend.Sqlite3 (openSqlite3Backend, newSqlite3Backend) where
import qualified Data.ByteString.Char8 as BS
import Database.HDBC
import Database.HDBC.Sqlite3
import Network.JobQueue.Backend.Types
import Network.JobQueue.Backend.Class
import Control.Concurrent.MVar
import Control.Exception
data Sqlite3Queue = Sqlite3Queue
{ conn :: Connection
, queueName :: String
, mlock :: MVar ()
}
instance BackendQueue Sqlite3Queue where
readQueue = readDBQueue
peekQueue = peekDBQueue
updateQueue = updateDBQueue
deleteQueue = deleteDBQueue
writeQueue = writeDBQueue
listQueue = listDBQueue
itemsQueue = itemsDBQueue
countQueue = countDBQueue
closeQueue = const $ return ()
openSqlite3Backend :: String -> IO Backend
openSqlite3Backend filePath = do
c <- connectSqlite3 filePath
m <- newMVar ()
return $ Backend {
bOpenQueue = \qn -> do
_ <- withLock m $ withTransaction c $ \c' -> do
run c' ("CREATE TABLE IF NOT EXISTS '" ++ qn ++ "' (key INTEGER PRIMARY KEY AUTOINCREMENT, prio INTEGER, value TEXT, version INTEGER)") []
return (Sqlite3Queue c qn m)
, bClose = disconnect c
}
newSqlite3Backend :: Connection -> IO Backend
newSqlite3Backend c = do
m <- newMVar ()
return $ Backend {
bOpenQueue = \qn -> do
return (Sqlite3Queue c qn m)
, bClose = return ()
}
readDBQueue :: Sqlite3Queue -> IO (Maybe (BS.ByteString, String))
readDBQueue Sqlite3Queue {..} = withLock mlock $ withTransaction conn $ \conn' -> do
sqlvalues <- quickQuery' conn' ("SELECT key, value FROM '" ++ queueName ++ "' ORDER BY prio, key LIMIT 1") []
case sqlvalues of
((key:value:_):_) -> do
_ <- run conn' ("DELETE FROM '" ++ queueName ++ "' WHERE key = ?") [toSql key]
return (Just (fromSql value, fromSql key))
_ -> return (Nothing)
peekDBQueue :: Sqlite3Queue -> IO (Maybe (BS.ByteString, String, String, Int))
peekDBQueue Sqlite3Queue {..} = withLock mlock $ withTransaction conn $ \conn' -> do
sqlvalues <- quickQuery' conn' ("SELECT key, value, version FROM '" ++ queueName ++ "' ORDER BY prio, key LIMIT 1") []
case sqlvalues of
((key:value:version:_):_) -> return (Just (fromSql value, fromSql key, fromSql key, fromSql version))
_ -> return (Nothing)
writeDBQueue :: Sqlite3Queue -> BS.ByteString -> Int -> IO (String)
writeDBQueue Sqlite3Queue {..} value prio = do
withLock mlock $ withTransaction conn $ \conn' -> do
_ <- run conn' ("INSERT INTO '" ++ queueName ++ "'(prio, value, version) VALUES (?,?,0)") [toSql prio, toSql value]
sqlvalues <- quickQuery' conn' ("SELECT seq FROM sqlite_sequence where name = '" ++ queueName ++ "'") []
case sqlvalues of
((key:_):_) -> do
return (fromSql key)
_ -> return ("")
deleteDBQueue :: Sqlite3Queue -> String -> IO (Bool)
deleteDBQueue Sqlite3Queue {..} key = withLock mlock $ withTransaction conn $ \conn' -> do
_ <- run conn' ("DELETE FROM '" ++ queueName ++ "' WHERE key = ?") [toSql key]
return (True)
updateDBQueue :: Sqlite3Queue -> String -> BS.ByteString -> Int -> IO (Bool)
updateDBQueue Sqlite3Queue {..} key value version = do
withLock mlock $ withTransaction conn $ \conn' -> do
nrows <- run conn' ("UPDATE '" ++ queueName ++ "' SET value = ?, version = ? WHERE key = ? AND version = ?") [toSql value, toSql (version+1), toSql key, toSql version]
return $ if nrows > 0 then True else False
countDBQueue :: Sqlite3Queue -> IO (Int)
countDBQueue Sqlite3Queue {..} = withLock mlock $ withTransaction conn $ \conn' -> do
sqlvalues <- quickQuery' conn' ("SELECT COUNT (*) FROM '" ++ queueName ++ "' ORDER BY prio, key LIMIT 1") []
case sqlvalues of
((count:_):_) -> return (fromSql count)
_ -> return (0)
itemsDBQueue :: Sqlite3Queue -> IO ([String])
itemsDBQueue Sqlite3Queue {..} = withLock mlock $ withTransaction conn $ \conn' -> do
sqlvalues <- quickQuery' conn' ("SELECT key FROM '" ++ queueName ++ "' ORDER BY prio, key") []
case sqlvalues of
keys -> return (map (fromSql . head) keys)
listDBQueue :: Sqlite3Queue -> IO ([BS.ByteString])
listDBQueue Sqlite3Queue {..} = withLock mlock $ withTransaction conn $ \conn' -> do
sqlvalues <- quickQuery' conn' ("SELECT value FROM '" ++ queueName ++ "' ORDER BY prio, key") []
case sqlvalues of
keys -> return (map (fromSql . head) keys)
withLock :: MVar () -> IO a -> IO a
withLock m act = handleSql (\err -> throwIO $ SessionError (show err)) $ do
bracket (takeMVar m) (putMVar m) $ const act