module Data.Iteratee.STM (
iterChan
,enumChan
,iterCloseChan
,enumCloseChan
,forkIter
,forkEnum
)
where
import Control.Concurrent.STM.TBMChan
import Data.Iteratee as I
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception (finally)
import Control.Monad
import Control.Monad.IO.Class
drainChan :: TBMChan a -> STM [a]
drainChan chan = go []
where
go acc = tryReadTBMChan chan >>= \res -> case res of
Just (Just a) -> go (a:acc)
_ -> return acc
iterCloseChan :: (Nullable s, MonadIO m) => TBMChan s -> Iteratee s m ()
iterCloseChan chan = liftIO . atomically $ closeTBMChan chan
iterChan :: (Nullable s, MonadIO m) => TBMChan s -> Iteratee s m ()
iterChan chan = do
stream_eof <- isFinished
unless stream_eof $ do
chnk <- getChunk
wrote_chnk <- liftIO . atomically $ do
isClosed <- isClosedTBMChan chan
if isClosed then return False else writeTBMChan chan chnk >> return True
if wrote_chnk
then iterChan chan
else idone () (Chunk chnk)
enumChan :: (NullPoint s, MonadIO m) => TBMChan s -> Enumerator s m a
enumChan chan = enumFromCallback cb ()
where
cb () = do
mres <- liftIO . atomically $ readTBMChan chan
case mres of
Nothing -> return $ Right ((False, ()), I.empty)
Just s -> return $ Right ((True, ()), s)
enumCloseChan :: (MonadIO m) => TBMChan s -> Enumerator s m a
enumCloseChan chan iter = do
liftIO . atomically $ closeTBMChan chan
enumEof iter
forkEnum
:: (MonadIO m, Nullable s, NullPoint s)
=> Int
-> Enumerator s IO ()
-> Enumerator s m a
forkEnum sz enum iter = do
chan <- liftIO $ newTBMChanIO sz
mvar <- liftIO $ newEmptyMVar
liftIO . forkIO $ ((enum >>> enumCloseChan chan) (iterChan chan) >>= run)
`finally` putMVar mvar ()
i2 <- enumChan chan iter
liftIO $ readMVar mvar
return i2
forkIter
:: (Nullable s, NullPoint s, MonadIO m)
=> Int
-> Iteratee s IO ()
-> Iteratee s m ()
forkIter sz iter = do
chan <- liftIO $ newTBMChanIO sz
mvar <- liftIO $ newEmptyMVar
liftIO . forkIO $ (enumChan chan iter >>= run) `finally` putMVar mvar ()
iterChan chan
iterCloseChan chan
liftIO $ readMVar mvar