module Data.Conduit.HDBI
(
selectAll
, insertAll
, insertAllCount
, insertAllTrans
, flushAt
, flushBy
, statementSource
, statementSink
, statementSinkCount
, statementSinkTrans
, allocConnection
, allocStmt
, executeStmt
, asSqlVals
, asThisType
) where
import Control.Exception (try, throw, SomeException(..))
import Control.Monad (when)
import Control.Monad.IO.Class
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Resource
import Data.Conduit
import Database.HDBI
import qualified Data.Conduit.List as L
allocConnection :: (Connection con, MonadResource m) => IO con -> m (ReleaseKey, con)
allocConnection con = allocate con disconnect
allocStmt :: (Statement stmt, MonadResource m) => IO stmt -> m (ReleaseKey, stmt)
allocStmt stmt = allocate stmt finish
executeStmt :: (Connection con, (ConnStatement con) ~ stmt, ToRow row, MonadResource m)
=> con -> Query -> row -> m (ReleaseKey, stmt)
executeStmt con query row = do
(key, stmt) <- allocStmt $ prepare con query
liftIO $ execute stmt row
return (key, stmt)
selectAll :: (Connection con, MonadResource m, FromRow row, ToRow params)
=> con
-> Query
-> params
-> Source m row
selectAll con query params = statementSource (liftIO . fetch) execStmt
where
execStmt = do
st <- prepare con query
(r :: Either SomeException ()) <- try $ execute st params
case r of
Left e -> do
finish st
throw e
Right _ -> return st
insertAllCount :: (Connection con, MonadResource m, Num count, ToRow a) => con -> Query -> Sink a m count
insertAllCount con query = statementSinkCount rowPutter $ prepare con query
insertAll :: (Connection con, MonadResource m, ToRow a)
=> con
-> Query
-> Sink a m ()
insertAll con query = statementSink rowPutter $ prepare con query
insertAllTrans :: (Connection con, MonadResource m, ToRow a)
=> con
-> Query
-> Sink (Flush a) m ()
insertAllTrans con query = statementSinkTrans con rowPutter $ prepare con query
statementSource :: (Statement stmt, MonadResource m)
=> (stmt -> m (Maybe a))
-> IO stmt
-> Source m a
statementSource getter stmt = bracketP
stmt
finish
statementSource'
where
statementSource' st = do
row <- lift $ getter st
case row of
Nothing -> return ()
Just r -> do
yield r
statementSource' st
statementSinkCount :: (Statement stmt, MonadResource m, Num count)
=> (stmt -> a -> m ())
-> IO stmt
-> Sink a m count
statementSinkCount putter stmt = bracketP
stmt
finish
$ statementSinkCount' 0
where
statementSinkCount' !ac st = do
next <- await
case next of
Nothing -> return ac
Just n -> do
lift $ putter st n
statementSinkCount' (ac+1) st
statementSink :: (Statement stmt, MonadResource m)
=> (stmt -> a -> m ())
-> IO stmt
-> Sink a m ()
statementSink putter stmt = bracketP
stmt
finish
statementSink'
where
statementSink' st = do
next <- await
case next of
Nothing -> return ()
Just n -> do
lift $ putter st n
statementSink' st
statementSinkTrans :: (Connection con, (ConnStatement con) ~ stmt, MonadResource m, MonadIO m)
=> con
-> (stmt -> a -> m ())
-> IO stmt
-> Sink (Flush a) m ()
statementSinkTrans con putter stmt = do
intrans <- liftIO $ inTransaction con
bracketP
stmt
finish
$ statementSinkTrans' intrans
where
statementSinkTrans' intrans st = do
next <- await
case next of
Nothing -> do
when intrans $ liftIO $ commit con
return ()
Just n -> case n of
Flush -> do
when intrans $ liftIO $ commit con
statementSinkTrans' False st
Chunk val -> do
when (not intrans) $ liftIO $ begin con
lift $ putter st val
statementSinkTrans' True st
flushBy :: (Monad m) => (a -> a -> Bool) -> Conduit a m (Flush a)
flushBy pref = flushBy' Nothing
where
flushBy' !lst = case lst of
Nothing -> do
n <- await
case n of
Nothing -> return ()
Just x -> do
yield $ Chunk x
flushBy' $ Just x
Just l -> do
n <- await
case n of
Nothing -> return ()
Just x -> do
when (not $ pref l x) $ yield Flush
yield $ Chunk x
flushBy' $ Just x
flushAt :: (Monad m, Integral i) => i -> Conduit a m (Flush a)
flushAt cnt = flushAt' c
where
c = max 1 cnt
flushAt' 0 = do
yield Flush
flushAt' c
flushAt' !count = do
n <- await
case n of
Nothing -> return ()
Just x -> do
yield $ Chunk x
flushAt' $ count 1
rowPutter :: (MonadIO m, Statement stmt, ToRow row)
=> stmt
-> row
-> m ()
rowPutter st row = liftIO $ do
reset st
execute st row
asSqlVals :: (Monad m) => Conduit [SqlValue] m [SqlValue]
asSqlVals = L.map id
asThisType :: (Monad m) => a -> Conduit a m a
asThisType _ = L.map id