{-# LANGUAGE Rank2Types #-}
module Sarsi.Consumer where

import Codec.Sarsi (Event, getEvent)
import Control.Concurrent.MVar (newEmptyMVar, takeMVar, putMVar)
import Control.Exception (IOException, bracket, try)
import Data.Binary.Machine (processGet)
import Data.Machine ((<~), auto, asParts)
import Network.Socket (connect, socketToHandle)
import Sarsi (Broker(..), Topic(..), createSocket, getSockAddr)
import System.FSNotify (eventPath, watchDir, withManager)
import System.IO (IOMode(ReadMode), hClose, hWaitForInput)
import System.IO.Machine (IOSource, byChunkOf, sourceHandle)

consumeOrWait :: Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a
consumeOrWait topic@(Topic (Broker bp) tp) f = do
  res <- consume topic f
  either (const $ withManager waitAndRetry) return res
    where
      waitAndRetry mng = do
        lck <- newEmptyMVar
        stop <- watchDir mng bp pred' $ const $ putMVar lck ()
        takeMVar lck
        stop
        consumeOrWait topic f
      pred' e = eventPath e == tp

consume :: Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO (Either IOException a)
consume topic f = try $ consume' topic f

consume' :: Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a
consume' topic f = bracket createHandle hClose (process Nothing)
  where
    createHandle = do
      sock  <- createSocket
      connect sock $ getSockAddr topic
      socketToHandle sock ReadMode
    process s h = do
      sa  <- f s $ asParts <~ auto unpack <~ processGet getEvent <~ sourceHandle (byChunkOf 1) h
      _   <- hWaitForInput h (-1)
      either (continue h) return sa
        where continue h' s' = process (Just s') h'
    unpack (Right e) = [e]
    unpack (Left _) = []