{-# LANGUAGE FlexibleContexts, PackageImports #-} module Network.XmlPush.HttpPull.Client.Common ( HttpPullClArgs(..), talkC, ) where import Control.Monad import "monads-tf" Control.Monad.Trans import Control.Monad.Base import Control.Monad.Trans.Control import Control.Concurrent hiding (yield) import Control.Concurrent.STM import Data.Maybe import Data.HandleLike import Data.Pipe -- import Data.Pipe.IO import Data.Pipe.TChan import System.IO import Text.XML.Pipe import Network.TigHTTP.Client import Network.TigHTTP.Types import qualified Data.ByteString.Lazy as LBS data HttpPullClArgs h = HttpPullClArgs { hostName :: String, portNumber :: Int, basePath :: FilePath, getPath :: XmlNode -> FilePath, poll :: HandleMonad h XmlNode, isPending :: XmlNode -> Bool, duration :: Maybe Int, getDuration :: XmlNode -> Maybe Int } talkC :: (HandleLike h, MonadBaseControl IO (HandleMonad h)) => h -> String -> Int -> FilePath -> (XmlNode -> FilePath) -> HandleMonad h XmlNode -> (XmlNode -> Bool) -> TVar (Maybe Int) -> (XmlNode -> Maybe Int) -> HandleMonad h (TChan XmlNode, TChan XmlNode) talkC h addr pn pth gp pl ip dr gdr = do -- liftBase $ putStrLn "talkC begin" lock <- liftBase $ atomically newTChan liftBase . atomically $ writeTChan lock () inc <- liftBase $ atomically newTChan inc' <- liftBase $ atomically newTChan otc <- liftBase $ atomically newTChan otc' <- liftBase $ atomically newTChan void . liftBaseDiscard forkIO . runPipe_ $ fromTChan otc -- =$= debug =$= conversation lock h addr pn pth gp dr gdr -- =$= debug =$= toTChan inc void . liftBaseDiscard forkIO . runPipe_ $ fromTChan otc' =$= conversation lock h addr pn pth gp dr gdr =$= toTChan inc' void . liftBaseDiscard forkIO . forever $ do d <- liftBase . atomically $ do md <- readTVar dr case md of Just d -> return d _ -> retry liftBase $ threadDelay d p <- pl liftBase $ polling p ip inc' inc otc' -- liftBase $ putStrLn "talkC end" return (inc, otc) conversation :: (HandleLike h, MonadBase IO (HandleMonad h)) => TChan () -> h -> String -> Int -> FilePath -> (XmlNode -> FilePath) -> TVar (Maybe a) -> (XmlNode -> Maybe a) -> Pipe XmlNode XmlNode (HandleMonad h) () conversation lock h addr pn pth gp dr gdr = talk lock h addr pn pth gp =$= setDuration dr gdr setDuration :: MonadBase IO m => TVar (Maybe a) -> (o -> Maybe a) -> Pipe o o m () setDuration dr gdr = (await >>=) . maybe (return ()) $ \n -> case gdr n of Just d -> do lift . liftBase . atomically $ writeTVar dr (Just d) yield n >> setDuration dr gdr _ -> yield n >> setDuration dr gdr polling :: XmlNode -> (XmlNode -> Bool) -> TChan XmlNode -> TChan XmlNode -> TChan XmlNode -> IO () polling pl ip i i' o = do atomically $ writeTChan o pl r <- atomically $ readTChan i hFlush stdout when (ip r) $ atomically (writeTChan i' r) >> polling pl ip i i' o talk :: (MonadBase IO (HandleMonad h), HandleLike h) => TChan () -> h -> String -> Int -> FilePath -> (XmlNode -> FilePath) -> Pipe XmlNode XmlNode (HandleMonad h) () talk lock h addr pn pth gp = (await >>=) . maybe (return ()) $ \n -> do let m = LBS.fromChunks [xmlString [n]] -- lift . liftBase $ putStrLn "before lock" lift . liftBase . atomically $ readTChan lock -- lift . liftBase $ putStrLn "locking" r <- lift . request h $ post addr pn (pth ++ "/" ++ gp n) (Nothing, m) -- lift . liftBase $ putStrLn "request done" void $ return () =$= responseBody r =$= xmlEvent =$= convert fromJust =$= xmlNode [] lift . liftBase . atomically $ writeTChan lock () -- lift . liftBase $ putStrLn "after lock" talk lock h addr pn pth gp