module Sarsi.Producer where
import Codec.Sarsi (Event(Start), putEvent)
import Control.Exception (IOException, bracket, tryJust)
import Control.Concurrent.Async (async, cancel, wait)
import Control.Concurrent.Chan (dupChan, newChan, readChan, writeChan)
import Control.Concurrent.MVar (modifyMVar_, newMVar, readMVar)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TQueue (newTQueue, tryReadTQueue, writeTQueue)
import Data.Binary.Machine (processPut)
import Data.Machine (ProcessT, (<~), runT_, sinkPart_, prepended)
import Network.Socket (Socket, accept, bind, close, listen, socketToHandle)
import Sarsi (Topic, createSocket, createSockAddr, removeTopic)
import System.IO (IOMode(WriteMode), Handle, hClose)
import System.IO.Machine (byChunk, sinkIO, sinkHandle, sourceIO)
produce :: Topic -> (ProcessT IO Event Event -> IO a) -> IO a
produce t f = do
conns <- atomically $ newTQueue
chan <- newChan
state <- newMVar []
server <- async $ bracket bindSock close (serve (process conns chan state))
feeder <- async $ f $ sinkPart_ (\x -> (x, x)) (sinkIO $ feed chan state)
a <- wait feeder
waitFinish conns
cancel server
removeTopic t
return a
where
bindSock = do
sock <- createSocket
addr <- createSockAddr t
bind sock addr
listen sock 1
return sock
process conns chan' state h = do
chan <- dupChan chan'
es <- readMVar state
conn <- async $ do
runT_ $ sinkHandle byChunk h <~ processPut putEvent <~ (prepended $ reverse es) <~ (sourceIO $ readChan chan)
hClose h
atomically $ writeTQueue conns conn
return Nothing
feed chan state e = do
modifyMVar_ state $ case e of
(Start _) -> const $ return [e]
_ -> return . (:) e
writeChan chan e
waitFinish conns = do
conn <- atomically $ tryReadTQueue conns
_ <- tryJust io $ maybe (return ()) wait conn
return ()
where
io :: IOException -> Maybe ()
io _ = Just ()
serve :: (Handle -> IO (Maybe a)) -> Socket -> IO a
serve f sock = bracket acceptHandle hClose process
where
acceptHandle = do
(conn, _) <- accept sock
h <- socketToHandle conn WriteMode
return h
process h = do
a <- f h
maybe (serve f sock) return a