module Haxl.Core.Fetch
( CacheResult(..)
, cached
, memoized
, performFetches
) where
import Haxl.Core.DataCache as DataCache
import Haxl.Core.Env
import Haxl.Core.Exception
import Haxl.Core.RequestStore
import Haxl.Core.Show1
import Haxl.Core.StateStore
import Haxl.Core.Types
import Haxl.Core.Util
import Control.Exception
import Control.Monad
import Data.IORef
import Data.List
import Data.Time
import Text.Printf
import Data.Monoid
import qualified Data.HashMap.Strict as HashMap
import qualified Data.Text as Text
performFetches :: forall u. Env u -> RequestStore u -> IO ()
performFetches env reqs = do
let f = flags env
sref = statsRef env
jobs = contents reqs
t0 <- getCurrentTime
let
roundstats =
[ (dataSourceName (getReq reqs), length reqs)
| BlockedFetches reqs <- jobs ]
where
getReq :: [BlockedFetch r] -> r a
getReq = undefined
modifyIORef' sref $ \(Stats rounds) ->
Stats (RoundStats (HashMap.fromList roundstats) : rounds)
ifTrace f 1 $
printf "Batch data fetch (%s)\n" $
intercalate (", "::String) $
map (\(name,num) -> printf "%d %s" num (Text.unpack name)) roundstats
ifTrace f 3 $
forM_ jobs $ \(BlockedFetches reqs) ->
forM_ reqs $ \(BlockedFetch r _) -> putStrLn (show1 r)
let
applyFetch (BlockedFetches (reqs :: [BlockedFetch r])) =
case stateGet (states env) of
Nothing ->
return (SyncFetch (mapM_ (setError (const e)) reqs))
where req :: r a; req = undefined
e = DataSourceError $
"data source not initialized: " <> dataSourceName req
Just state ->
return $ wrapFetch reqs $ fetch state f (userEnv env) reqs
fetches <- mapM applyFetch jobs
scheduleFetches fetches
ifTrace f 1 $ do
t1 <- getCurrentTime
printf "Batch data fetch done (%.2fs)\n"
(realToFrac (diffUTCTime t1 t0) :: Double)
wrapFetch :: [BlockedFetch req] -> PerformFetch -> PerformFetch
wrapFetch reqs fetch =
case fetch of
SyncFetch io -> SyncFetch (io `catch` handler)
AsyncFetch fio -> AsyncFetch (\io -> fio io `catch` handler)
where
handler :: SomeException -> IO ()
handler e = mapM_ (forceError e) reqs
forceError e (BlockedFetch _ rvar) = do
void $ tryTakeResult rvar
putResult rvar (except e)
scheduleFetches :: [PerformFetch] -> IO()
scheduleFetches fetches = async_fetches sync_fetches
where
async_fetches :: IO () -> IO ()
async_fetches = compose [f | AsyncFetch f <- fetches]
sync_fetches :: IO ()
sync_fetches = sequence_ [io | SyncFetch io <- fetches]
data CacheResult a
= Uncached (ResultVar a)
| CachedNotFetched (ResultVar a)
| Cached (Either SomeException a)
cached :: (Request r a) => Env u -> r a -> IO (CacheResult a)
cached env = checkCache (flags env) (cacheRef env)
memoized :: (Request r a) => Env u -> r a -> IO (CacheResult a)
memoized env = checkCache (flags env) (memoRef env)
checkCache
:: (Request r a)
=> Flags
-> IORef DataCache
-> r a
-> IO (CacheResult a)
checkCache flags ref req = do
cache <- readIORef ref
let
do_fetch = do
rvar <- newEmptyResult
writeIORef ref (DataCache.insert req rvar cache)
return (Uncached rvar)
case DataCache.lookup req cache of
Nothing -> do_fetch
Just rvar -> do
mb <- tryReadResult rvar
case mb of
Nothing -> return (CachedNotFetched rvar)
Just r -> do
ifTrace flags 3 $ putStrLn $ case r of
Left _ -> "Cached error: " ++ show req
Right _ -> "Cached request: " ++ show req
return (Cached r)