{-# LANGUAGE Rank2Types #-}
module Sarsi.Producer where

import Codec.Sarsi (Event(Start), putEvent)
import Control.Exception (IOException, bracket, tryJust)
import Control.Concurrent.Async (async, cancel, wait)
import Control.Concurrent.Chan (dupChan, newChan, readChan, writeChan)
import Control.Concurrent.MVar (modifyMVar_, newMVar, readMVar)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TQueue (newTQueue, tryReadTQueue, writeTQueue)
import Data.Binary.Machine (processPut)
import Data.Machine (ProcessT, (<~), runT_, sinkPart_, prepended)
import Network.Socket (Socket, accept, bind, close, listen, socketToHandle)
import Sarsi (Topic, createSocket, createSockAddr, removeTopic)
import System.IO (IOMode(WriteMode), Handle, hClose)
import System.IO.Machine (byChunk, sinkIO, sinkHandle, sourceIO)

produce :: Topic -> (ProcessT IO Event Event -> IO a) -> IO a
produce t f = do
  conns   <- atomically $ newTQueue
  chan    <- newChan
  state   <- newMVar []
  server  <- async $ bracket bindSock close (serve (process conns chan state))
  feeder  <- async $ f $ sinkPart_ (\x -> (x, x)) (sinkIO $ feed chan state)
  a       <- wait feeder
  waitFinish conns
  cancel server
  removeTopic t
  return a
    where
      bindSock = do
        sock  <- createSocket
        addr  <- createSockAddr t
        bind sock addr
        listen sock 1
        return sock
      process conns chan' state h = do
        chan <- dupChan chan'
        es   <- readMVar state
        conn <- async $ do
          runT_ $ sinkHandle byChunk h <~ processPut putEvent <~ (prepended $ reverse es) <~ (sourceIO $ readChan chan)
          hClose h
        atomically $ writeTQueue conns conn
        return Nothing
      feed chan state e = do
        modifyMVar_ state $ case e of
          (Start _) -> const $ return [e]
          _         -> return . (:) e
        writeChan chan e
      waitFinish conns = do
        conn <- atomically $ tryReadTQueue conns
        _    <- tryJust io $ maybe (return ()) wait conn
        return ()
          where
            io :: IOException -> Maybe ()
            io _ = Just ()

serve :: (Handle -> IO (Maybe a)) -> Socket -> IO a
serve f sock = bracket acceptHandle hClose process
  where
    acceptHandle = do
      (conn, _) <- accept sock
      h         <- socketToHandle conn WriteMode
      return h
    process h = do
      a         <- f h
      maybe (serve f sock) return a