module Happstack.State.Saver.Impl.Queue
( queueReader
, queueWriter
) where
import Happstack.State.Saver.Types
import Happstack.Data.Serialize
import Control.Concurrent.STM
import Control.Concurrent
import Control.Monad
import Data.Typeable
import Data.Binary
data Item = Close (IO ())
| Add Put (IO ())
queueReader :: ReaderStream a -> IO (ReaderStream a)
queueReader stream = return stream
data Encoded = Encoded Put deriving Typeable
instance Version Encoded where mode = Primitive
instance Serialize Encoded where
putCopy (Encoded out) = contain out
getCopy = error "decoding from writer queue."
queueWriter :: Serialize a => WriterStream Encoded -> IO (WriterStream a)
queueWriter writer = do
ch <- newCh
let handler = do
input <- getChs ch
let il (Add p f : rest) a0 a1 = il rest (p:a0) (f:a1)
il something a0 a1 = (reverse a0, sequence_ $ reverse a1, something)
let (ps,io,rest) = il input [] []
when (not (null ps)) $ writerAdd writer (Encoded $ sequence_ ps) io
case rest of
[] -> handler
(Close io':_) -> writerClose writer >> io'
_ -> fail "queueSaver: Invalid saver bunch!"
forkIO handler
return $ WriterStream
{ writerClose = do
mv <- newEmptyMVar
writeCh ch $ Close (writerClose writer >> putMVar mv ())
takeMVar mv
, writerAdd = \ps fin -> writeCh ch $ Add (safePut ps) fin
, writerAtomicReplace = \a -> writerAtomicReplace writer (Encoded $ safePut a)
, writerCut = writerCut writer
}
newtype Ch a = Ch (TVar [a])
newCh :: IO (Ch a)
newCh = fmap Ch $ newTVarIO []
writeCh :: Ch a -> a -> IO ()
writeCh (Ch ch) x = atomically $ do vs <- readTVar ch
writeTVar ch (x:vs)
getChs :: Ch a -> IO [a]
getChs (Ch ch) = atomically $ do vs <- readTVar ch
guard (not (null vs))
writeTVar ch []
return (reverse vs)