module Haxl.Core.Monad (
GenHaxl (..), runHaxl,
env, withEnv,
Env(..), Caches, caches, initEnvWithData, initEnv, emptyEnv,
throw, catch, catchIf, try, tryToHaxlException,
dataFetch, uncachedRequest,
cacheRequest, cacheResult, cachedComputation,
dumpCacheAsHaskell,
unsafeLiftIO, unsafeToHaxlException,
) where
import Haxl.Core.Types
import Haxl.Core.Show1
import Haxl.Core.StateStore
import Haxl.Core.Exception
import Haxl.Core.RequestStore
import Haxl.Core.Util
import Haxl.Core.DataCache as DataCache
import qualified Data.Text as Text
import qualified Control.Monad.Catch as Catch
import Control.Exception (Exception(..), SomeException)
#if __GLASGOW_HASKELL__ >= 708
import Control.Exception (SomeAsyncException(..))
#endif
#if __GLASGOW_HASKELL__ >= 710
import Control.Exception (AllocationLimitExceeded(..))
#endif
import Control.Monad
import qualified Control.Exception as Exception
#if __GLASGOW_HASKELL__ < 710
import Control.Applicative hiding (Const)
#endif
import Control.DeepSeq
import GHC.Exts (IsString(..))
#if __GLASGOW_HASKELL__ < 706
import Prelude hiding (catch)
#endif
import Data.Hashable
import Data.IORef
import Data.List
import Data.Monoid
import Data.Time
import Data.Typeable
import qualified Data.HashMap.Strict as HashMap
import Text.Printf
import Text.PrettyPrint hiding ((<>))
import Control.Arrow (left)
#ifdef EVENTLOG
import Control.Exception (bracket_)
import Debug.Trace (traceEventIO)
#endif
data Env u = Env
{ cacheRef :: IORef (DataCache ResultVar)
, memoRef :: IORef (DataCache (MemoVar u))
, flags :: Flags
, userEnv :: u
, statsRef :: IORef Stats
, states :: StateStore
}
type Caches u = (IORef (DataCache ResultVar), IORef (DataCache (MemoVar u)))
caches :: Env u -> Caches u
caches env = (cacheRef env, memoRef env)
initEnvWithData :: StateStore -> u -> Caches u -> IO (Env u)
initEnvWithData states e (cref, mref) = do
sref <- newIORef emptyStats
return Env
{ cacheRef = cref
, memoRef = mref
, flags = defaultFlags
, userEnv = e
, states = states
, statsRef = sref
}
initEnv :: StateStore -> u -> IO (Env u)
initEnv states e = do
cref <- newIORef DataCache.empty
mref <- newIORef DataCache.empty
initEnvWithData states e (cref,mref)
emptyEnv :: u -> IO (Env u)
emptyEnv = initEnv stateEmpty
newtype GenHaxl u a = GenHaxl
{ unHaxl :: Env u -> IORef (RequestStore u) -> IO (Result u a) }
data Result u a
= Done a
| Throw SomeException
| Blocked (Cont u a)
data Cont u a
= Cont (GenHaxl u a)
| forall b. Cont u b :>>= (b -> GenHaxl u a)
| forall b. (Cont u (b -> a)) :<*> (Cont u b)
| forall b. (b -> a) :<$> (Cont u b)
toHaxl :: Cont u a -> GenHaxl u a
toHaxl (Cont haxl) = haxl
toHaxl ((m :>>= k1) :>>= k2) = toHaxl (m :>>= (k1 >=> k2))
toHaxl (c :>>= k) = toHaxl c >>= k
toHaxl ((f :<$> i) :<*> (g :<$> j)) =
toHaxl (((\x y -> f x (g y)) :<$> i) :<*> j)
toHaxl (f :<*> x) = toHaxl f <*> toHaxl x
toHaxl (f :<$> (g :<$> x)) = toHaxl ((f . g) :<$> x)
toHaxl (f :<$> x) = fmap f (toHaxl x)
instance (Show a) => Show (Result u a) where
show (Done a) = printf "Done(%s)" $ show a
show (Throw e) = printf "Throw(%s)" $ show e
show Blocked{} = "Blocked"
instance Monad (GenHaxl u) where
return a = GenHaxl $ \_env _ref -> return (Done a)
GenHaxl m >>= k = GenHaxl $ \env ref -> do
e <- m env ref
case e of
Done a -> unHaxl (k a) env ref
Throw e -> return (Throw e)
Blocked cont -> return (Blocked (cont :>>= k))
(>>) = (*>)
instance Functor (GenHaxl u) where
fmap f (GenHaxl m) = GenHaxl $ \env ref -> do
r <- m env ref
case r of
Done a -> return (Done (f a))
Throw e -> return (Throw e)
Blocked a' -> return (Blocked (f :<$> a'))
instance Applicative (GenHaxl u) where
pure = return
GenHaxl f <*> GenHaxl a = GenHaxl $ \env ref -> do
r <- f env ref
case r of
Throw e -> return (Throw e)
Done f' -> do
ra <- a env ref
case ra of
Done a' -> return (Done (f' a'))
Throw e -> return (Throw e)
Blocked a' -> return (Blocked (f' :<$> a'))
Blocked f' -> do
ra <- a env ref
case ra of
Done a' -> return (Blocked (($ a') :<$> f'))
Throw e -> return (Blocked (f' :<*> Cont (throw e)))
Blocked a' -> return (Blocked (f' :<*> a'))
runHaxl :: Env u -> GenHaxl u a -> IO a
#ifdef EVENTLOG
runHaxl env h = do
let go !n env c = do
traceEventIO "START computation"
ref <- newIORef noRequests
e <- toHaxl c env ref
traceEventIO "STOP computation"
case e of
Done a -> return a
Throw e -> Exception.throw e
Blocked cont -> do
bs <- readIORef ref
writeIORef ref noRequests
traceEventIO "START performFetches"
n' <- performFetches n env bs
traceEventIO "STOP performFetches"
go n' env cont
traceEventIO "START runHaxl"
r <- go 0 env (Cont h)
traceEventIO "STOP runHaxl"
return r
#else
runHaxl env (GenHaxl haxl) = do
ref <- newIORef noRequests
e <- haxl env ref
case e of
Done a -> return a
Throw e -> Exception.throw e
Blocked cont -> do
bs <- readIORef ref
writeIORef ref noRequests
void (performFetches 0 env bs)
runHaxl env (toHaxl cont)
#endif
env :: (Env u -> a) -> GenHaxl u a
env f = GenHaxl $ \env _ref -> return (Done (f env))
withEnv :: Env u -> GenHaxl u a -> GenHaxl u a
withEnv newEnv (GenHaxl m) = GenHaxl $ \_env ref -> do
r <- m newEnv ref
case r of
Done a -> return (Done a)
Throw e -> return (Throw e)
Blocked k -> return (Blocked (Cont (withEnv newEnv (toHaxl k))))
throw :: (Exception e) => e -> GenHaxl u a
throw e = GenHaxl $ \_env _ref -> raise e
raise :: (Exception e) => e -> IO (Result u a)
raise = return . Throw . toException
catch :: Exception e => GenHaxl u a -> (e -> GenHaxl u a) -> GenHaxl u a
catch (GenHaxl m) h = GenHaxl $ \env ref -> do
r <- m env ref
case r of
Done a -> return (Done a)
Throw e | Just e' <- fromException e -> unHaxl (h e') env ref
| otherwise -> return (Throw e)
Blocked k -> return (Blocked (Cont (catch (toHaxl k) h)))
catchIf
:: Exception e => (e -> Bool) -> GenHaxl u a -> (e -> GenHaxl u a)
-> GenHaxl u a
catchIf cond haxl handler =
catch haxl $ \e -> if cond e then handler e else throw e
try :: Exception e => GenHaxl u a -> GenHaxl u (Either e a)
try haxl = (Right <$> haxl) `catch` (return . Left)
instance Catch.MonadThrow (GenHaxl u) where throwM = Haxl.Core.Monad.throw
instance Catch.MonadCatch (GenHaxl u) where catch = Haxl.Core.Monad.catch
unsafeLiftIO :: IO a -> GenHaxl u a
unsafeLiftIO m = GenHaxl $ \_env _ref -> Done <$> m
unsafeToHaxlException :: GenHaxl u a -> GenHaxl u a
unsafeToHaxlException (GenHaxl m) = GenHaxl $ \env ref -> do
r <- m env ref `Exception.catch` \e -> return (Throw e)
case r of
Blocked c -> return (Blocked (Cont (unsafeToHaxlException (toHaxl c))))
other -> return other
tryToHaxlException :: GenHaxl u a -> GenHaxl u (Either HaxlException a)
tryToHaxlException h = left asHaxlException <$> try (unsafeToHaxlException h)
data CacheResult a
= Uncached (ResultVar a)
| CachedNotFetched (ResultVar a)
| Cached (Either SomeException a)
cached :: (Request r a) => Env u -> r a -> IO (CacheResult a)
cached env req = do
cache <- readIORef (cacheRef env)
let
do_fetch = do
rvar <- newEmptyResult
writeIORef (cacheRef env) $! DataCache.insert req rvar cache
return (Uncached rvar)
case DataCache.lookup req cache of
Nothing -> do_fetch
Just rvar -> do
mb <- tryReadResult rvar
case mb of
Nothing -> return (CachedNotFetched rvar)
Just r -> do
ifTrace (flags env) 3 $ putStrLn $ case r of
Left _ -> "Cached error: " ++ show req
Right _ -> "Cached request: " ++ show req
return (Cached r)
dataFetch :: (DataSource u r, Request r a) => r a -> GenHaxl u a
dataFetch req = GenHaxl $ \env ref -> do
res <- cached env req
case res of
Uncached rvar -> do
modifyIORef' ref $ \bs -> addRequest (BlockedFetch req rvar) bs
return $ Blocked (Cont (continueFetch req rvar))
CachedNotFetched rvar -> return
$ Blocked (Cont (continueFetch req rvar))
Cached (Left ex) -> return (Throw ex)
Cached (Right a) -> return (Done a)
uncachedRequest :: (DataSource u r, Request r a) => r a -> GenHaxl u a
uncachedRequest req = GenHaxl $ \_env ref -> do
rvar <- newEmptyResult
modifyIORef' ref $ \bs -> addRequest (BlockedFetch req rvar) bs
return $ Blocked (Cont (continueFetch req rvar))
continueFetch
:: (DataSource u r, Request r a, Show a)
=> r a -> ResultVar a -> GenHaxl u a
continueFetch req rvar = GenHaxl $ \_env _ref -> do
m <- tryReadResult rvar
case m of
Nothing -> raise . DataSourceError $
textShow req <> " did not set contents of result var"
Just r -> done r
cacheResult :: (Request r a) => r a -> IO a -> GenHaxl u a
cacheResult req val = GenHaxl $ \env _ref -> do
cachedResult <- cached env req
case cachedResult of
Uncached rvar -> do
result <- Exception.try val
putResult rvar result
case result of
Left e -> do rethrowAsyncExceptions e; done result
_other -> done result
Cached result -> done result
CachedNotFetched _ -> corruptCache
where
corruptCache = raise . DataSourceError $ Text.concat
[ textShow req
, " has a corrupted cache value: these requests are meant to"
, " return immediately without an intermediate value. Either"
, " the cache was updated incorrectly, or you're calling"
, " cacheResult on a query that involves a blocking fetch."
]
rethrowAsyncExceptions :: SomeException -> IO ()
rethrowAsyncExceptions e
#if __GLASGOW_HASKELL__ >= 708
| Just SomeAsyncException{} <- fromException e = Exception.throw e
#endif
#if __GLASGOW_HASKELL__ >= 710
| Just AllocationLimitExceeded{} <- fromException e = Exception.throw e
#endif
| otherwise = return ()
cacheRequest
:: (Request req a) => req a -> Either SomeException a -> GenHaxl u ()
cacheRequest request result = GenHaxl $ \env _ref -> do
res <- cached env request
case res of
Uncached rvar -> do
putResult rvar result
return $ Done ()
_other -> raise $
DataSourceError "cacheRequest: request is already in the cache"
instance IsString a => IsString (GenHaxl u a) where
fromString s = return (fromString s)
performFetches :: forall u. Int -> Env u -> RequestStore u -> IO Int
performFetches n env reqs = do
let f = flags env
sref = statsRef env
jobs = contents reqs
!n' = n + length jobs
t0 <- getCurrentTime
let
roundstats =
[ (dataSourceName (getReq reqs), length reqs)
| BlockedFetches reqs <- jobs ]
where
getReq :: [BlockedFetch r] -> r a
getReq = undefined
ifTrace f 1 $
printf "Batch data fetch (%s)\n" $
intercalate (", "::String) $
map (\(name,num) -> printf "%d %s" num (Text.unpack name)) roundstats
ifTrace f 3 $
forM_ jobs $ \(BlockedFetches reqs) ->
forM_ reqs $ \(BlockedFetch r _) -> putStrLn (show1 r)
let
applyFetch (i, BlockedFetches (reqs :: [BlockedFetch r])) =
case stateGet (states env) of
Nothing ->
return (SyncFetch (mapM_ (setError (const e)) reqs))
where req :: r a; req = undefined
e = DataSourceError $
"data source not initialized: " <> dataSourceName req
Just state ->
return $ wrapFetchInTrace i (length reqs)
(dataSourceName (undefined :: r a))
$ wrapFetchInCatch reqs
$ fetch state f (userEnv env) reqs
fetches <- mapM applyFetch $ zip [n..] jobs
times <-
if report f >= 2
then do
(refs, timedfetches) <- mapAndUnzipM wrapFetchInTimer fetches
scheduleFetches timedfetches
mapM (fmap Just . readIORef) refs
else do
scheduleFetches fetches
return $ repeat Nothing
let dsroundstats = HashMap.fromList
[ (name, DataSourceRoundStats { dataSourceFetches = fetches
, dataSourceTime = time
})
| ((name, fetches), time) <- zip roundstats times]
t1 <- getCurrentTime
let roundtime = realToFrac (diffUTCTime t1 t0) :: Double
ifReport f 1 $
modifyIORef' sref $ \(Stats rounds) -> roundstats `deepseq`
Stats (RoundStats (microsecs roundtime) dsroundstats: rounds)
ifTrace f 1 $
printf "Batch data fetch done (%.2fs)\n" (realToFrac roundtime :: Double)
return n'
wrapFetchInCatch :: [BlockedFetch req] -> PerformFetch -> PerformFetch
wrapFetchInCatch reqs fetch =
case fetch of
SyncFetch io ->
SyncFetch (io `Exception.catch` handler)
AsyncFetch fio ->
AsyncFetch (\io -> fio io `Exception.catch` handler)
where
handler :: SomeException -> IO ()
handler e = do
rethrowAsyncExceptions e
mapM_ (forceError e) reqs
forceError e (BlockedFetch _ rvar) = do
void $ tryTakeResult rvar
putResult rvar (except e)
wrapFetchInTimer :: PerformFetch -> IO (IORef Microseconds, PerformFetch)
wrapFetchInTimer f = do
r <- newIORef 0
case f of
SyncFetch io -> return (r, SyncFetch (time io >>= writeIORef r))
AsyncFetch f -> do
inner_r <- newIORef 0
return (r, AsyncFetch $ \inner -> do
total <- time (f (time inner >>= writeIORef inner_r))
inner_t <- readIORef inner_r
writeIORef r (total inner_t))
wrapFetchInTrace :: Int -> Int -> Text.Text -> PerformFetch -> PerformFetch
#ifdef EVENTLOG
wrapFetchInTrace i n dsName f =
case f of
SyncFetch io -> SyncFetch (wrapF "Sync" io)
AsyncFetch fio -> AsyncFetch (wrapF "Async" . fio . unwrapF "Async")
where
d = Text.unpack dsName
wrapF :: String -> IO a -> IO a
wrapF ty = bracket_ (traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
(traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
unwrapF :: String -> IO a -> IO a
unwrapF ty = bracket_ (traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
(traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
#else
wrapFetchInTrace _ _ _ f = f
#endif
time :: IO () -> IO Microseconds
time io = do
t0 <- getCurrentTime
io
t1 <- getCurrentTime
return . microsecs . realToFrac $ t1 `diffUTCTime` t0
microsecs :: Double -> Microseconds
microsecs t = round (t * 10^(6::Int))
scheduleFetches :: [PerformFetch] -> IO()
scheduleFetches fetches = async_fetches sync_fetches
where
async_fetches :: IO () -> IO ()
async_fetches = compose [f | AsyncFetch f <- fetches]
sync_fetches :: IO ()
sync_fetches = sequence_ [io | SyncFetch io <- fetches]
newtype MemoVar u a = MemoVar (IORef (MemoStatus u a))
data MemoStatus u a
= MemoInProgress (RoundId u) (GenHaxl u a)
| MemoDone (Either SomeException a)
type RoundId u = IORef (RequestStore u)
cachedComputation
:: forall req u a.
(Eq (req a)
, Hashable (req a)
, Typeable (req a))
=> req a -> GenHaxl u a -> GenHaxl u a
cachedComputation req haxl = GenHaxl $ \env ref -> do
cache <- readIORef (memoRef env)
case DataCache.lookup req cache of
Nothing -> do
memovar <- newIORef (MemoInProgress ref haxl)
writeIORef (memoRef env) $!
DataCache.insertNotShowable req (MemoVar memovar) cache
run memovar haxl env ref
Just (MemoVar memovar) -> do
status <- readIORef memovar
case status of
MemoDone r -> done r
MemoInProgress round cont
| round == ref -> return (Blocked (Cont (retryMemo req)))
| otherwise -> run memovar cont env ref
where
retryMemo req =
cachedComputation req (throw (CriticalError "retryMemo"))
run memovar cont env ref = do
e <- unHaxl cont env ref
case e of
Done a -> complete memovar (Right a)
Throw e -> complete memovar (Left e)
Blocked cont -> do
writeIORef memovar (MemoInProgress ref (toHaxl cont))
return (Blocked (Cont (retryMemo req)))
complete memovar r = do
writeIORef memovar (MemoDone r)
done r
done :: Either SomeException a -> IO (Result u a)
done = return . either Throw Done
dumpCacheAsHaskell :: GenHaxl u String
dumpCacheAsHaskell = do
ref <- env cacheRef
entries <- unsafeLiftIO $ readIORef ref >>= showCache
let
mk_cr (req, res) =
text "cacheRequest" <+> parens (text req) <+> parens (result res)
result (Left e) = text "except" <+> parens (text (show e))
result (Right s) = text "Right" <+> parens (text s)
return $ show $
text "loadCache :: GenHaxl u ()" $$
text "loadCache = do" $$
nest 2 (vcat (map mk_cr (concatMap snd entries))) $$
text ""