-- Copyright (c) 2014-present, Facebook, Inc.
-- All rights reserved.
--
-- This source code is distributed under the terms of a BSD license,
-- found in the LICENSE file.

{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE DeriveDataTypeable #-}

-- | Implementation of data-fetching operations.  Most users should
-- import "Haxl.Core" instead.
--
module Haxl.Core.Fetch
  ( dataFetch
  , dataFetchWithShow
  , dataFetchWithInsert
  , uncachedRequest
  , cacheResult
  , dupableCacheRequest
  , cacheResultWithShow
  , cacheRequest
  , performFetches
  , performRequestStore
  , ShowReq
  ) where

import Control.Concurrent.STM
import Control.Exception as Exception
import Control.Monad
import Data.Either
import Data.Hashable
import Data.IORef
import Data.Int
import Data.List
#if __GLASGOW_HASKELL__ < 804
import Data.Monoid
#endif
import Data.Proxy
import Data.Typeable
import Data.Text (Text)
import Data.Kind (Type)
import qualified Data.Text as Text
import Text.Printf
#ifdef PROFILING
import GHC.Stack
#endif

import Haxl.Core.DataSource
import Haxl.Core.DataCache as DataCache
import Haxl.Core.Exception
import Haxl.Core.Flags
import Haxl.Core.Monad
import Haxl.Core.Profile
import Haxl.Core.RequestStore
import Haxl.Core.ShowP
import Haxl.Core.Stats
import Haxl.Core.StateStore
import Haxl.Core.Util

-- -----------------------------------------------------------------------------
-- Data fetching and caching

-- | Possible responses when checking the cache.
data CacheResult u w a
  -- | The request hadn't been seen until now.
  = Uncached
       (ResultVar a)
       {-# UNPACK #-} !(IVar u w a)
       {-# UNPACK #-} !CallId

  -- | The request has been seen before, but its result has not yet been
  -- fetched.
  | CachedNotFetched
      {-# UNPACK #-} !(IVar u w a)
       {-# UNPACK #-} !CallId

  -- | The request has been seen before, and its result has already been
  -- fetched.
  | Cached (ResultVal a w)
           {-# UNPACK #-} !CallId


-- | Show functions for request and its result.
type ShowReq r a = (r a -> String, a -> String)

-- Note [showFn]
--
-- Occasionally, for tracing purposes or generating exceptions, we need to
-- call 'show' on the request in a place where we *cannot* have a Show
-- dictionary. (Because the function is a worker which is called by one of
-- the *WithShow variants that take explicit show functions via a ShowReq
-- argument.) None of the functions that does this is exported, so this is
-- hidden from the Haxl user.

cachedWithInsert
  :: forall r a u w.
     (DataSource u r, Typeable (r a))
  => (r a -> String)    -- See Note [showFn]
  -> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
  -> Env u w -> r a -> IO (CacheResult u w a)
cachedWithInsert :: (r a -> String)
-> (r a
    -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> Env u w
-> r a
-> IO (CacheResult u w a)
cachedWithInsert r a -> String
showFn r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
insertFn env :: Env u w
env@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
DataCache (DataCacheItem u w)
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: DataCache (DataCacheItem u w)
dataCache :: DataCache (DataCacheItem u w)
..} r a
req = do
  let
    doFetch :: IO (CacheResult u w a)
doFetch = do
      IVar u w a
ivar <- IO (IVar u w a)
forall u w a. IO (IVar u w a)
newIVar
      CallId
k <- Env u w -> IO CallId
forall u w. Env u w -> IO CallId
nextCallId Env u w
env
      let !rvar :: ResultVar a
rvar = Env u w -> IVar u w a -> Proxy r -> ResultVar a
forall (r :: * -> *) a u w.
(DataSourceName r, Typeable r) =>
Env u w -> IVar u w a -> Proxy r -> ResultVar a
stdResultVar Env u w
env IVar u w a
ivar (Proxy r
forall k (t :: k). Proxy t
Proxy :: Proxy r)
      r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
insertFn r a
req (IVar u w a -> CallId -> DataCacheItem u w a
forall u w a. IVar u w a -> CallId -> DataCacheItem u w a
DataCacheItem IVar u w a
ivar CallId
k) DataCache (DataCacheItem u w)
dataCache
      CacheResult u w a -> IO (CacheResult u w a)
forall (m :: * -> *) a. Monad m => a -> m a
return (ResultVar a -> IVar u w a -> CallId -> CacheResult u w a
forall u w a.
ResultVar a -> IVar u w a -> CallId -> CacheResult u w a
Uncached ResultVar a
rvar IVar u w a
ivar CallId
k)
  Maybe (DataCacheItem u w a)
mbRes <- r a
-> DataCache (DataCacheItem u w)
-> IO (Maybe (DataCacheItem u w a))
forall (req :: * -> *) a (res :: * -> *).
Typeable (req a) =>
req a -> DataCache res -> IO (Maybe (res a))
DataCache.lookup r a
req DataCache (DataCacheItem u w)
dataCache
  case Maybe (DataCacheItem u w a)
mbRes of
    Maybe (DataCacheItem u w a)
Nothing -> IO (CacheResult u w a)
doFetch
    Just (DataCacheItem i :: IVar u w a
i@IVar{ivarRef :: forall u w a. IVar u w a -> IORef (IVarContents u w a)
ivarRef = IORef (IVarContents u w a)
cr} CallId
k) -> do
      IVarContents u w a
e <- IORef (IVarContents u w a) -> IO (IVarContents u w a)
forall a. IORef a -> IO a
readIORef IORef (IVarContents u w a)
cr
      case IVarContents u w a
e of
        IVarEmpty JobList u w
_ -> do
          IVar u w a
ivar <- IVar u w a -> IO (IVar u w a)
forall u w a. IVar u w a -> IO (IVar u w a)
withCurrentCCS IVar u w a
i
          CacheResult u w a -> IO (CacheResult u w a)
forall (m :: * -> *) a. Monad m => a -> m a
return (IVar u w a -> CallId -> CacheResult u w a
forall u w a. IVar u w a -> CallId -> CacheResult u w a
CachedNotFetched IVar u w a
ivar CallId
k)
        IVarFull ResultVal a w
r -> do
          Flags -> CallId -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => Flags -> CallId -> m a -> m ()
ifTrace Flags
flags CallId
3 (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ case ResultVal a w
r of
            ThrowIO{} -> String
"Cached error: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ r a -> String
showFn r a
req
            ThrowHaxl{} -> String
"Cached error: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ r a -> String
showFn r a
req
            Ok{} -> String
"Cached request: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ r a -> String
showFn r a
req
          CacheResult u w a -> IO (CacheResult u w a)
forall (m :: * -> *) a. Monad m => a -> m a
return (ResultVal a w -> CallId -> CacheResult u w a
forall u w a. ResultVal a w -> CallId -> CacheResult u w a
Cached ResultVal a w
r CallId
k)


-- | Make a ResultVar with the standard function for sending a CompletionReq
-- to the scheduler. This is the function will be executed when the fetch
-- completes.
stdResultVar
  :: forall r a u w. (DataSourceName r, Typeable r)
  => Env u w
  -> IVar u w a
  -> Proxy r
  -> ResultVar a
stdResultVar :: Env u w -> IVar u w a -> Proxy r -> ResultVar a
stdResultVar Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} IVar u w a
ivar Proxy r
p =
  (Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a.
(Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
mkResultVar ((Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
 -> ResultVar a)
-> (Either SomeException a
    -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a b. (a -> b) -> a -> b
$ \Either SomeException a
r Bool
isChildThread Maybe DataSourceStats
_ -> do
    Int64
allocs <- if Bool
isChildThread
      then
        -- In a child thread, return the current allocation counter too,
        -- for correct tracking of allocation.
        IO Int64
getAllocationCounter
      else
        Int64 -> IO Int64
forall (m :: * -> *) a. Monad m => a -> m a
return Int64
0
    LogicBug -> STM () -> IO ()
forall e a. Exception e => e -> STM a -> IO a
atomicallyOnBlocking
      (ReadingCompletionsFailedFetch -> LogicBug
forall e. Exception e => e -> LogicBug
LogicBug (Text -> ReadingCompletionsFailedFetch
ReadingCompletionsFailedFetch (Proxy r -> Text
forall (req :: * -> *). DataSourceName req => Proxy req -> Text
dataSourceName Proxy r
p))) (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      [CompleteReq u w]
cs <- TVar [CompleteReq u w] -> STM [CompleteReq u w]
forall a. TVar a -> STM a
readTVar TVar [CompleteReq u w]
completions
      TVar [CompleteReq u w] -> [CompleteReq u w] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [CompleteReq u w]
completions (ResultVal a w -> IVar u w a -> Int64 -> CompleteReq u w
forall u w a.
ResultVal a w -> IVar u w a -> Int64 -> CompleteReq u w
CompleteReq (Either SomeException a -> ResultVal a w
forall a w. Either SomeException a -> ResultVal a w
eitherToResult Either SomeException a
r) IVar u w a
ivar Int64
allocs CompleteReq u w -> [CompleteReq u w] -> [CompleteReq u w]
forall a. a -> [a] -> [a]
: [CompleteReq u w]
cs)
    -- Decrement the counter as request has finished. Do this after updating the
    -- completions TVar so that if the scheduler is tracking what was being
    -- waited on it gets a consistent view.
    Flags -> ReportFlag -> IO () -> IO ()
forall (m :: * -> *) a.
Monad m =>
Flags -> ReportFlag -> m a -> m ()
ifReport Flags
flags ReportFlag
ReportOutgoneFetches (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
      IORef ReqCountMap -> (ReqCountMap -> (ReqCountMap, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef ReqCountMap
submittedReqsRef (\ReqCountMap
m -> (Proxy r -> CallId -> ReqCountMap -> ReqCountMap
forall (r :: * -> *).
(DataSourceName r, Typeable r) =>
Proxy r -> CallId -> ReqCountMap -> ReqCountMap
subFromCountMap Proxy r
p CallId
1 ReqCountMap
m, ()))
{-# INLINE stdResultVar #-}


-- | Record the call stack for a data fetch in the Stats.  Only useful
-- when profiling.
logFetch :: Env u w -> (r a -> String) -> r a -> CallId -> IO ()
#ifdef PROFILING
logFetch env showFn req fid = do
  ifReport (flags env) ReportFetchStack $ do
    stack <- currentCallStack
    modifyIORef' (statsRef env) $ \(Stats s) ->
      Stats (FetchCall (showFn req) stack fid : s)
#else
logFetch :: Env u w -> (r a -> String) -> r a -> CallId -> IO ()
logFetch Env u w
_ r a -> String
_ r a
_ CallId
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#endif

calcFailure
  :: forall u req a . DataSource u req
  => u
  -> req a
  -> Either SomeException a
  -> FailureCount
calcFailure :: u -> req a -> Either SomeException a -> FailureCount
calcFailure u
_u req a
_r Right{} = FailureCount
forall a. Monoid a => a
mempty
calcFailure u
u req a
r (Left SomeException
e) = case u -> req a -> SomeException -> FailureClassification
forall u (req :: * -> *) a.
DataSource u req =>
u -> req a -> SomeException -> FailureClassification
classifyFailure u
u req a
r SomeException
e of
  FailureClassification
StandardFailure -> FailureCount
forall a. Monoid a => a
mempty { failureCountStandard :: CallId
failureCountStandard = CallId
1 }
  FailureClassification
IgnoredForStatsFailure -> FailureCount
forall a. Monoid a => a
mempty { failureCountIgnored :: CallId
failureCountIgnored = CallId
1 }

addFallbackFetchStats
  :: forall u w req a . DataSource u req
  => Env u w
  -> CallId
  -> req a
  -> ResultVal a w
  -> IO ()
addFallbackFetchStats :: Env u w -> CallId -> req a -> ResultVal a w -> IO ()
addFallbackFetchStats Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} CallId
fid req a
req ResultVal a w
res = do
  CallId
bid <- IORef CallId -> (CallId -> (CallId, CallId)) -> IO CallId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef CallId
statsBatchIdRef ((CallId -> (CallId, CallId)) -> IO CallId)
-> (CallId -> (CallId, CallId)) -> IO CallId
forall a b. (a -> b) -> a -> b
$ \CallId
x -> (CallId
xCallId -> CallId -> CallId
forall a. Num a => a -> a -> a
+CallId
1,CallId
xCallId -> CallId -> CallId
forall a. Num a => a -> a -> a
+CallId
1)
  Int64
start <- IO Int64
getTimestamp
  let
    dsName :: Text
dsName = Proxy req -> Text
forall (req :: * -> *). DataSourceName req => Proxy req -> Text
dataSourceName (Proxy req
forall k (t :: k). Proxy t
Proxy :: Proxy req)
    FailureCount{CallId
failureCountIgnored :: CallId
failureCountStandard :: CallId
failureCountIgnored :: FailureCount -> CallId
failureCountStandard :: FailureCount -> CallId
..} = case ResultVal a w
res of
      Ok{} -> FailureCount
forall a. Monoid a => a
mempty
      (ThrowHaxl SomeException
e WriteTree w
_) -> u -> req a -> Either SomeException a -> FailureCount
forall u (req :: * -> *) a.
DataSource u req =>
u -> req a -> Either SomeException a -> FailureCount
calcFailure u
userEnv req a
req (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left SomeException
e)
      (ThrowIO SomeException
e) -> u -> req a -> Either SomeException a -> FailureCount
forall u (req :: * -> *) a.
DataSource u req =>
u -> req a -> Either SomeException a -> FailureCount
calcFailure u
userEnv req a
req (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left SomeException
e)
    this :: FetchStats
this = FetchStats :: Text
-> CallId
-> Int64
-> Int64
-> Int64
-> CallId
-> CallId
-> CallId
-> [CallId]
-> FetchStats
FetchStats { fetchDataSource :: Text
fetchDataSource = Text
dsName
                      , fetchBatchSize :: CallId
fetchBatchSize = CallId
1
                      , fetchStart :: Int64
fetchStart = Int64
start
                      , fetchDuration :: Int64
fetchDuration = Int64
0
                      , fetchSpace :: Int64
fetchSpace = Int64
0
                      , fetchFailures :: CallId
fetchFailures = CallId
failureCountStandard
                      , fetchIgnoredFailures :: CallId
fetchIgnoredFailures = CallId
failureCountIgnored
                      , fetchBatchId :: CallId
fetchBatchId = CallId
bid
                      , fetchIds :: [CallId]
fetchIds = [CallId
fid] }
  IORef Stats -> (Stats -> (Stats, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Stats
statsRef ((Stats -> (Stats, ())) -> IO ())
-> (Stats -> (Stats, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Stats [FetchStats]
fs) -> ([FetchStats] -> Stats
Stats (FetchStats
this FetchStats -> [FetchStats] -> [FetchStats]
forall a. a -> [a] -> [a]
: [FetchStats]
fs), ())

addFallbackResult
  :: Env u w
  -> ResultVal a w
  -> IVar u w a
  -> IO ()
addFallbackResult :: Env u w -> ResultVal a w -> IVar u w a -> IO ()
addFallbackResult Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} ResultVal a w
res IVar u w a
ivar = do
  LogicBug -> STM () -> IO ()
forall e a. Exception e => e -> STM a -> IO a
atomicallyOnBlocking
    (ReadingCompletionsFailedFetch -> LogicBug
forall e. Exception e => e -> LogicBug
LogicBug (Text -> ReadingCompletionsFailedFetch
ReadingCompletionsFailedFetch Text
"addFallbackResult")) (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    [CompleteReq u w]
cs <- TVar [CompleteReq u w] -> STM [CompleteReq u w]
forall a. TVar a -> STM a
readTVar TVar [CompleteReq u w]
completions
    TVar [CompleteReq u w] -> [CompleteReq u w] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [CompleteReq u w]
completions (ResultVal a w -> IVar u w a -> Int64 -> CompleteReq u w
forall u w a.
ResultVal a w -> IVar u w a -> Int64 -> CompleteReq u w
CompleteReq ResultVal a w
res IVar u w a
ivar Int64
0 CompleteReq u w -> [CompleteReq u w] -> [CompleteReq u w]
forall a. a -> [a] -> [a]
: [CompleteReq u w]
cs)

-- | Performs actual fetching of data for a 'Request' from a 'DataSource'.
dataFetch :: (DataSource u r, Request r a) => r a -> GenHaxl u w a
dataFetch :: r a -> GenHaxl u w a
dataFetch = (r a -> String)
-> (r a
    -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> GenHaxl u w a
forall u w (r :: * -> *) a.
(DataSource u r, Eq (r a), Hashable (r a), Typeable (r a)) =>
(r a -> String)
-> (r a
    -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> GenHaxl u w a
dataFetchWithInsert r a -> String
forall a. Show a => a -> String
show r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
forall (req :: * -> *) a (res :: * -> *).
(Hashable (req a), Typeable (req a), Eq (req a), Show (req a),
 Show a) =>
req a -> res a -> DataCache res -> IO ()
DataCache.insert

-- | Performs actual fetching of data for a 'Request' from a 'DataSource', using
-- the given show functions for requests and their results.
dataFetchWithShow
  :: (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a))
  => ShowReq r a
  -> r a -> GenHaxl u w a
dataFetchWithShow :: ShowReq r a -> r a -> GenHaxl u w a
dataFetchWithShow (r a -> String
showReq, a -> String
showRes) = (r a -> String)
-> (r a
    -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> GenHaxl u w a
forall u w (r :: * -> *) a.
(DataSource u r, Eq (r a), Hashable (r a), Typeable (r a)) =>
(r a -> String)
-> (r a
    -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> GenHaxl u w a
dataFetchWithInsert r a -> String
showReq
  ((r a -> String)
-> (a -> String)
-> r a
-> DataCacheItem u w a
-> DataCache (DataCacheItem u w)
-> IO ()
forall (req :: * -> *) a (res :: * -> *).
(Hashable (req a), Typeable (req a), Eq (req a)) =>
(req a -> String)
-> (a -> String) -> req a -> res a -> DataCache res -> IO ()
DataCache.insertWithShow r a -> String
showReq a -> String
showRes)

-- | Performs actual fetching of data for a 'Request' from a 'DataSource', using
-- the given function to insert requests in the cache.
dataFetchWithInsert
  :: forall u w r a
   . (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a))
  => (r a -> String)    -- See Note [showFn]
  -> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
  -> r a
  -> GenHaxl u w a
dataFetchWithInsert :: (r a -> String)
-> (r a
    -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> GenHaxl u w a
dataFetchWithInsert r a -> String
showFn r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
insertFn r a
req =
  (Env u w -> IO (Result u w a)) -> GenHaxl u w a
forall u w a. (Env u w -> IO (Result u w a)) -> GenHaxl u w a
GenHaxl ((Env u w -> IO (Result u w a)) -> GenHaxl u w a)
-> (Env u w -> IO (Result u w a)) -> GenHaxl u w a
forall a b. (a -> b) -> a -> b
$ \env :: Env u w
env@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
DataCache (DataCacheItem u w)
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: DataCache (DataCacheItem u w)
dataCache :: DataCache (DataCacheItem u w)
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} -> do
  -- First, check the cache
  CacheResult u w a
res <- (r a -> String)
-> (r a
    -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> Env u w
-> r a
-> IO (CacheResult u w a)
forall (r :: * -> *) a u w.
(DataSource u r, Typeable (r a)) =>
(r a -> String)
-> (r a
    -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> Env u w
-> r a
-> IO (CacheResult u w a)
cachedWithInsert r a -> String
showFn r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
insertFn Env u w
env r a
req
  case CacheResult u w a
res of
    -- This request has not been seen before
    Uncached ResultVar a
rvar IVar u w a
ivar CallId
fid -> do
      Env u w -> (r a -> String) -> r a -> CallId -> IO ()
forall u w (r :: * -> *) a.
Env u w -> (r a -> String) -> r a -> CallId -> IO ()
logFetch Env u w
env r a -> String
showFn r a
req CallId
fid
      Flags -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => Flags -> m a -> m ()
ifProfiling Flags
flags (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Env u w -> r a -> CallId -> Bool -> IO ()
forall (r :: * -> *) u w a.
(DataSourceName r, Eq (r a), Hashable (r a), Typeable (r a)) =>
Env u w -> r a -> CallId -> Bool -> IO ()
addProfileFetch Env u w
env r a
req CallId
fid Bool
False
      --
      -- Check whether the data source wants to submit requests
      -- eagerly, or batch them up.
      --
      let
        blockedFetch :: BlockedFetch r
blockedFetch = r a -> ResultVar a -> BlockedFetch r
forall (r :: * -> *) a. r a -> ResultVar a -> BlockedFetch r
BlockedFetch r a
req ResultVar a
rvar
        blockedFetchI :: BlockedFetchInternal
blockedFetchI = CallId -> BlockedFetchInternal
BlockedFetchInternal CallId
fid
        submitFetch :: IO (Result u w a)
submitFetch = do
          case u -> SchedulerHint r
forall u (req :: * -> *).
DataSource u req =>
u -> SchedulerHint req
schedulerHint u
userEnv :: SchedulerHint r of
            SchedulerHint r
SubmitImmediately ->
              Env u w -> [BlockedFetches u] -> IO ()
forall u w. Env u w -> [BlockedFetches u] -> IO ()
performFetches Env u w
env [[BlockedFetch r] -> [BlockedFetchInternal] -> BlockedFetches u
forall u (r :: * -> *).
DataSource u r =>
[BlockedFetch r] -> [BlockedFetchInternal] -> BlockedFetches u
BlockedFetches [BlockedFetch r
blockedFetch] [BlockedFetchInternal
blockedFetchI]]
            SchedulerHint r
TryToBatch ->
              -- add the request to the RequestStore and continue
              IORef (RequestStore u)
-> (RequestStore u -> RequestStore u) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (RequestStore u)
reqStoreRef ((RequestStore u -> RequestStore u) -> IO ())
-> (RequestStore u -> RequestStore u) -> IO ()
forall a b. (a -> b) -> a -> b
$ \RequestStore u
bs ->
                BlockedFetch r
-> BlockedFetchInternal -> RequestStore u -> RequestStore u
forall u (r :: * -> *).
DataSource u r =>
BlockedFetch r
-> BlockedFetchInternal -> RequestStore u -> RequestStore u
addRequest BlockedFetch r
blockedFetch BlockedFetchInternal
blockedFetchI RequestStore u
bs
          Result u w a -> IO (Result u w a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Result u w a -> IO (Result u w a))
-> Result u w a -> IO (Result u w a)
forall a b. (a -> b) -> a -> b
$ IVar u w a -> Cont u w a -> Result u w a
forall u w a b. IVar u w b -> Cont u w a -> Result u w a
Blocked IVar u w a
ivar (IVar u w a -> Cont u w a
forall u w a. IVar u w a -> Cont u w a
Return IVar u w a
ivar)

      -- if there is a fallback configured try that,
      -- else dispatch the fetch
      case Maybe (DataCacheLookup w)
dataCacheFetchFallback of
        Maybe (DataCacheLookup w)
Nothing -> IO (Result u w a)
submitFetch
        Just (DataCacheLookup forall (req :: * -> *) a.
Typeable (req a) =>
req a -> IO (Maybe (ResultVal a w))
dcl) -> do
          Maybe (ResultVal a w)
mbFallbackRes <- r a -> IO (Maybe (ResultVal a w))
forall (req :: * -> *) a.
Typeable (req a) =>
req a -> IO (Maybe (ResultVal a w))
dcl r a
req
          case Maybe (ResultVal a w)
mbFallbackRes of
            Maybe (ResultVal a w)
Nothing -> IO (Result u w a)
submitFetch
            Just ResultVal a w
fallbackRes -> do
              Env u w -> ResultVal a w -> IVar u w a -> IO ()
forall u w a. Env u w -> ResultVal a w -> IVar u w a -> IO ()
addFallbackResult Env u w
env ResultVal a w
fallbackRes IVar u w a
ivar
              Flags -> ReportFlag -> IO () -> IO ()
forall (m :: * -> *) a.
Monad m =>
Flags -> ReportFlag -> m a -> m ()
ifReport Flags
flags ReportFlag
ReportFetchStats (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Env u w -> CallId -> r a -> ResultVal a w -> IO ()
forall u w (req :: * -> *) a.
DataSource u req =>
Env u w -> CallId -> req a -> ResultVal a w -> IO ()
addFallbackFetchStats
                Env u w
env
                CallId
fid
                r a
req
                ResultVal a w
fallbackRes
              Result u w a -> IO (Result u w a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Result u w a -> IO (Result u w a))
-> Result u w a -> IO (Result u w a)
forall a b. (a -> b) -> a -> b
$ IVar u w a -> Cont u w a -> Result u w a
forall u w a b. IVar u w b -> Cont u w a -> Result u w a
Blocked IVar u w a
ivar (IVar u w a -> Cont u w a
forall u w a. IVar u w a -> Cont u w a
Return IVar u w a
ivar)

    -- Seen before but not fetched yet.  We're blocked, but we don't have
    -- to add the request to the RequestStore.
    CachedNotFetched IVar u w a
ivar CallId
fid -> do
      Flags -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => Flags -> m a -> m ()
ifProfiling Flags
flags (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Env u w -> r a -> CallId -> Bool -> IO ()
forall (r :: * -> *) u w a.
(DataSourceName r, Eq (r a), Hashable (r a), Typeable (r a)) =>
Env u w -> r a -> CallId -> Bool -> IO ()
addProfileFetch Env u w
env r a
req CallId
fid Bool
True
      Result u w a -> IO (Result u w a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Result u w a -> IO (Result u w a))
-> Result u w a -> IO (Result u w a)
forall a b. (a -> b) -> a -> b
$ IVar u w a -> Cont u w a -> Result u w a
forall u w a b. IVar u w b -> Cont u w a -> Result u w a
Blocked IVar u w a
ivar (IVar u w a -> Cont u w a
forall u w a. IVar u w a -> Cont u w a
Return IVar u w a
ivar)

    -- Cached: either a result, or an exception
    Cached ResultVal a w
r CallId
fid -> do
      Flags -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => Flags -> m a -> m ()
ifProfiling Flags
flags (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Env u w -> r a -> CallId -> Bool -> IO ()
forall (r :: * -> *) u w a.
(DataSourceName r, Eq (r a), Hashable (r a), Typeable (r a)) =>
Env u w -> r a -> CallId -> Bool -> IO ()
addProfileFetch Env u w
env r a
req CallId
fid Bool
True
      Env u w -> ResultVal a w -> IO (Result u w a)
forall u w a. Env u w -> ResultVal a w -> IO (Result u w a)
done Env u w
env ResultVal a w
r

-- | A data request that is not cached.  This is not what you want for
-- normal read requests, because then multiple identical requests may
-- return different results, and this invalidates some of the
-- properties that we expect Haxl computations to respect: that data
-- fetches can be arbitrarily reordered, and identical requests can be
-- commoned up, for example.
--
-- 'uncachedRequest' is useful for performing writes, provided those
-- are done in a safe way - that is, not mixed with reads that might
-- conflict in the same Haxl computation.
--
-- if we are recording or running a test, we fallback to using dataFetch
-- This allows us to store the request in the cache when recording, which
-- allows a transparent run afterwards. Without this, the test would try to
-- call the datasource during testing and that would be an exception.
uncachedRequest
 :: forall a u w (r :: Type -> Type). (DataSource u r, Request r a)
 => r a -> GenHaxl u w a
uncachedRequest :: r a -> GenHaxl u w a
uncachedRequest r a
req = do
  Flags
flg <- (Env u w -> Flags) -> GenHaxl u w Flags
forall u w a. (Env u w -> a) -> GenHaxl u w a
env Env u w -> Flags
forall u w. Env u w -> Flags
flags
  if Flags -> CallId
recording Flags
flg CallId -> CallId -> Bool
forall a. Eq a => a -> a -> Bool
/= CallId
0
    then r a -> GenHaxl u w a
forall u (r :: * -> *) a w.
(DataSource u r, Request r a) =>
r a -> GenHaxl u w a
dataFetch r a
req
    else (Env u w -> IO (Result u w a)) -> GenHaxl u w a
forall u w a. (Env u w -> IO (Result u w a)) -> GenHaxl u w a
GenHaxl ((Env u w -> IO (Result u w a)) -> GenHaxl u w a)
-> (Env u w -> IO (Result u w a)) -> GenHaxl u w a
forall a b. (a -> b) -> a -> b
$ \e :: Env u w
e@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} -> do
      IVar u w a
ivar <- IO (IVar u w a)
forall u w a. IO (IVar u w a)
newIVar
      CallId
k <- Env u w -> IO CallId
forall u w. Env u w -> IO CallId
nextCallId Env u w
e
      let !rvar :: ResultVar a
rvar = Env u w -> IVar u w a -> Proxy r -> ResultVar a
forall (r :: * -> *) a u w.
(DataSourceName r, Typeable r) =>
Env u w -> IVar u w a -> Proxy r -> ResultVar a
stdResultVar Env u w
e IVar u w a
ivar (Proxy r
forall k (t :: k). Proxy t
Proxy :: Proxy r)
      IORef (RequestStore u)
-> (RequestStore u -> RequestStore u) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (RequestStore u)
reqStoreRef ((RequestStore u -> RequestStore u) -> IO ())
-> (RequestStore u -> RequestStore u) -> IO ()
forall a b. (a -> b) -> a -> b
$ \RequestStore u
bs ->
        BlockedFetch r
-> BlockedFetchInternal -> RequestStore u -> RequestStore u
forall u (r :: * -> *).
DataSource u r =>
BlockedFetch r
-> BlockedFetchInternal -> RequestStore u -> RequestStore u
addRequest (r a -> ResultVar a -> BlockedFetch r
forall (r :: * -> *) a. r a -> ResultVar a -> BlockedFetch r
BlockedFetch r a
req ResultVar a
rvar) (CallId -> BlockedFetchInternal
BlockedFetchInternal CallId
k) RequestStore u
bs
      Result u w a -> IO (Result u w a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Result u w a -> IO (Result u w a))
-> Result u w a -> IO (Result u w a)
forall a b. (a -> b) -> a -> b
$ IVar u w a -> Cont u w a -> Result u w a
forall u w a b. IVar u w b -> Cont u w a -> Result u w a
Blocked IVar u w a
ivar (IVar u w a -> Cont u w a
forall u w a. IVar u w a -> Cont u w a
Return IVar u w a
ivar)


-- | Transparently provides caching. Useful for datasources that can
-- return immediately, but also caches values.  Exceptions thrown by
-- the IO operation (except for asynchronous exceptions) are
-- propagated into the Haxl monad and can be caught by 'catch' and
-- 'try'.
cacheResult :: Request r a => r a -> IO a -> GenHaxl u w a
cacheResult :: r a -> IO a -> GenHaxl u w a
cacheResult = (r a -> String)
-> (r a
    -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> IO a
-> GenHaxl u w a
forall (r :: * -> *) a u w.
Typeable (r a) =>
(r a -> String)
-> (r a
    -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> IO a
-> GenHaxl u w a
cacheResultWithInsert r a -> String
forall a. Show a => a -> String
show r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
forall (req :: * -> *) a (res :: * -> *).
(Hashable (req a), Typeable (req a), Eq (req a), Show (req a),
 Show a) =>
req a -> res a -> DataCache res -> IO ()
DataCache.insert

-- | Transparently provides caching in the same way as 'cacheResult', but uses
-- the given functions to show requests and their results.
cacheResultWithShow
  :: (Eq (r a), Hashable (r a), Typeable (r a))
  => ShowReq r a -> r a -> IO a -> GenHaxl u w a
cacheResultWithShow :: ShowReq r a -> r a -> IO a -> GenHaxl u w a
cacheResultWithShow (r a -> String
showReq, a -> String
showRes) = (r a -> String)
-> (r a
    -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> IO a
-> GenHaxl u w a
forall (r :: * -> *) a u w.
Typeable (r a) =>
(r a -> String)
-> (r a
    -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> IO a
-> GenHaxl u w a
cacheResultWithInsert r a -> String
showReq
  ((r a -> String)
-> (a -> String)
-> r a
-> DataCacheItem u w a
-> DataCache (DataCacheItem u w)
-> IO ()
forall (req :: * -> *) a (res :: * -> *).
(Hashable (req a), Typeable (req a), Eq (req a)) =>
(req a -> String)
-> (a -> String) -> req a -> res a -> DataCache res -> IO ()
DataCache.insertWithShow r a -> String
showReq a -> String
showRes)

-- Transparently provides caching, using the given function to insert requests
-- into the cache.
cacheResultWithInsert
  :: Typeable (r a)
  => (r a -> String)    -- See Note [showFn]
  -> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
  -> r a
  -> IO a
  -> GenHaxl u w a
cacheResultWithInsert :: (r a -> String)
-> (r a
    -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> IO a
-> GenHaxl u w a
cacheResultWithInsert r a -> String
showFn r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
insertFn r a
req IO a
val = (Env u w -> IO (Result u w a)) -> GenHaxl u w a
forall u w a. (Env u w -> IO (Result u w a)) -> GenHaxl u w a
GenHaxl ((Env u w -> IO (Result u w a)) -> GenHaxl u w a)
-> (Env u w -> IO (Result u w a)) -> GenHaxl u w a
forall a b. (a -> b) -> a -> b
$ \env :: Env u w
env@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
DataCache (DataCacheItem u w)
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: DataCache (DataCacheItem u w)
dataCache :: DataCache (DataCacheItem u w)
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} -> do
  Maybe (DataCacheItem u w a)
mbRes <- r a
-> DataCache (DataCacheItem u w)
-> IO (Maybe (DataCacheItem u w a))
forall (req :: * -> *) a (res :: * -> *).
Typeable (req a) =>
req a -> DataCache res -> IO (Maybe (res a))
DataCache.lookup r a
req DataCache (DataCacheItem u w)
dataCache
  case Maybe (DataCacheItem u w a)
mbRes of
    Maybe (DataCacheItem u w a)
Nothing -> do
      let
        getResult :: IO (ResultVal a w)
getResult = do
          Either SomeException a
eitherResult <- IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try IO a
val
          case Either SomeException a
eitherResult of
            Left SomeException
e -> SomeException -> IO ()
rethrowAsyncExceptions SomeException
e
            Either SomeException a
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          ResultVal a w -> IO (ResultVal a w)
forall (m :: * -> *) a. Monad m => a -> m a
return (ResultVal a w -> IO (ResultVal a w))
-> ResultVal a w -> IO (ResultVal a w)
forall a b. (a -> b) -> a -> b
$ Either SomeException a -> ResultVal a w
forall a w. Either SomeException a -> ResultVal a w
eitherToResultThrowIO Either SomeException a
eitherResult
      -- if there is a fallback configured try that
      ResultVal a w
result <- case Maybe (DataCacheLookup w)
dataCacheFetchFallback of
        Maybe (DataCacheLookup w)
Nothing -> IO (ResultVal a w)
getResult
        Just (DataCacheLookup forall (req :: * -> *) a.
Typeable (req a) =>
req a -> IO (Maybe (ResultVal a w))
dcl) -> do
          Maybe (ResultVal a w)
mbFallbackRes <- r a -> IO (Maybe (ResultVal a w))
forall (req :: * -> *) a.
Typeable (req a) =>
req a -> IO (Maybe (ResultVal a w))
dcl r a
req
          IO (ResultVal a w)
-> (ResultVal a w -> IO (ResultVal a w))
-> Maybe (ResultVal a w)
-> IO (ResultVal a w)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO (ResultVal a w)
getResult ResultVal a w -> IO (ResultVal a w)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (ResultVal a w)
mbFallbackRes
      IVar u w a
ivar <- ResultVal a w -> IO (IVar u w a)
forall a w u. ResultVal a w -> IO (IVar u w a)
newFullIVar ResultVal a w
result
      CallId
k <- Env u w -> IO CallId
forall u w. Env u w -> IO CallId
nextCallId Env u w
env
      r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
insertFn r a
req (IVar u w a -> CallId -> DataCacheItem u w a
forall u w a. IVar u w a -> CallId -> DataCacheItem u w a
DataCacheItem IVar u w a
ivar CallId
k) DataCache (DataCacheItem u w)
dataCache
      Env u w -> ResultVal a w -> IO (Result u w a)
forall u w a. Env u w -> ResultVal a w -> IO (Result u w a)
done Env u w
env ResultVal a w
result
    Just (DataCacheItem IVar{ivarRef :: forall u w a. IVar u w a -> IORef (IVarContents u w a)
ivarRef = IORef (IVarContents u w a)
cr} CallId
_) -> do
      IVarContents u w a
e <- IORef (IVarContents u w a) -> IO (IVarContents u w a)
forall a. IORef a -> IO a
readIORef IORef (IVarContents u w a)
cr
      case IVarContents u w a
e of
        IVarEmpty JobList u w
_ -> Env u w -> DataSourceError -> IO (Result u w a)
forall e u w a. Exception e => Env u w -> e -> IO (Result u w a)
raise Env u w
env DataSourceError
corruptCache
        IVarFull ResultVal a w
r -> Env u w -> ResultVal a w -> IO (Result u w a)
forall u w a. Env u w -> ResultVal a w -> IO (Result u w a)
done Env u w
env ResultVal a w
r
  where
    corruptCache :: DataSourceError
corruptCache = Text -> DataSourceError
DataSourceError (Text -> DataSourceError) -> Text -> DataSourceError
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
Text.concat
      [ String -> Text
Text.pack (r a -> String
showFn r a
req)
      , Text
" has a corrupted cache value: these requests are meant to"
      , Text
" return immediately without an intermediate value. Either"
      , Text
" the cache was updated incorrectly, or you're calling"
      , Text
" cacheResult on a query that involves a blocking fetch."
      ]

-- | Inserts a request/result pair into the cache. Throws an exception
-- if the request has already been issued, either via 'dataFetch' or
-- 'cacheRequest'.
--
-- This can be used to pre-populate the cache when running tests, to
-- avoid going to the actual data source and ensure that results are
-- deterministic.
--
cacheRequest
  :: Request req a => req a -> Either SomeException a -> GenHaxl u w ()
cacheRequest :: req a -> Either SomeException a -> GenHaxl u w ()
cacheRequest req a
request Either SomeException a
result = (Env u w -> IO (Result u w ())) -> GenHaxl u w ()
forall u w a. (Env u w -> IO (Result u w a)) -> GenHaxl u w a
GenHaxl ((Env u w -> IO (Result u w ())) -> GenHaxl u w ())
-> (Env u w -> IO (Result u w ())) -> GenHaxl u w ()
forall a b. (a -> b) -> a -> b
$ \e :: Env u w
e@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} -> do
  Maybe (DataCacheItem u w a)
mbRes <- req a -> HaxlDataCache u w -> IO (Maybe (DataCacheItem u w a))
forall (req :: * -> *) a (res :: * -> *).
Typeable (req a) =>
req a -> DataCache res -> IO (Maybe (res a))
DataCache.lookup req a
request HaxlDataCache u w
dataCache
  case Maybe (DataCacheItem u w a)
mbRes of
    Maybe (DataCacheItem u w a)
Nothing -> do
      IVar u w a
cr <- ResultVal a w -> IO (IVar u w a)
forall a w u. ResultVal a w -> IO (IVar u w a)
newFullIVar (Either SomeException a -> ResultVal a w
forall a w. Either SomeException a -> ResultVal a w
eitherToResult Either SomeException a
result)
      CallId
k <- Env u w -> IO CallId
forall u w. Env u w -> IO CallId
nextCallId Env u w
e
      req a -> DataCacheItem u w a -> HaxlDataCache u w -> IO ()
forall (req :: * -> *) a (res :: * -> *).
(Hashable (req a), Typeable (req a), Eq (req a), Show (req a),
 Show a) =>
req a -> res a -> DataCache res -> IO ()
DataCache.insert req a
request (IVar u w a -> CallId -> DataCacheItem u w a
forall u w a. IVar u w a -> CallId -> DataCacheItem u w a
DataCacheItem IVar u w a
cr CallId
k) HaxlDataCache u w
dataCache
      Result u w () -> IO (Result u w ())
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> Result u w ()
forall u w a. a -> Result u w a
Done ())

    -- It is an error if the request is already in the cache.
    -- We can't test whether the cached result is the same without adding an
    -- Eq constraint, and we don't necessarily have Eq for all results.
    Maybe (DataCacheItem u w a)
_other -> Env u w -> DataSourceError -> IO (Result u w ())
forall e u w a. Exception e => Env u w -> e -> IO (Result u w a)
raise Env u w
e (DataSourceError -> IO (Result u w ()))
-> DataSourceError -> IO (Result u w ())
forall a b. (a -> b) -> a -> b
$
      Text -> DataSourceError
DataSourceError Text
"cacheRequest: request is already in the cache"

-- | Similar to @cacheRequest@ but doesn't throw an exception if the key
-- already exists in the cache.
-- If this function is called twice to cache the same Haxl request, the first
-- value will be discarded and overwritten with the second value.
-- Useful e.g. for unit tests
dupableCacheRequest
  :: Request req a => req a -> Either SomeException a -> GenHaxl u w ()
dupableCacheRequest :: req a -> Either SomeException a -> GenHaxl u w ()
dupableCacheRequest req a
request Either SomeException a
result = (Env u w -> IO (Result u w ())) -> GenHaxl u w ()
forall u w a. (Env u w -> IO (Result u w a)) -> GenHaxl u w a
GenHaxl ((Env u w -> IO (Result u w ())) -> GenHaxl u w ())
-> (Env u w -> IO (Result u w ())) -> GenHaxl u w ()
forall a b. (a -> b) -> a -> b
$ \e :: Env u w
e@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} -> do
  IVar u w a
cr <- ResultVal a w -> IO (IVar u w a)
forall a w u. ResultVal a w -> IO (IVar u w a)
newFullIVar (Either SomeException a -> ResultVal a w
forall a w. Either SomeException a -> ResultVal a w
eitherToResult Either SomeException a
result)
  CallId
k <- Env u w -> IO CallId
forall u w. Env u w -> IO CallId
nextCallId Env u w
e
  req a -> DataCacheItem u w a -> HaxlDataCache u w -> IO ()
forall (req :: * -> *) a (res :: * -> *).
(Hashable (req a), Typeable (req a), Eq (req a), Show (req a),
 Show a) =>
req a -> res a -> DataCache res -> IO ()
DataCache.insert req a
request (IVar u w a -> CallId -> DataCacheItem u w a
forall u w a. IVar u w a -> CallId -> DataCacheItem u w a
DataCacheItem IVar u w a
cr CallId
k) HaxlDataCache u w
dataCache
  Result u w () -> IO (Result u w ())
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> Result u w ()
forall u w a. a -> Result u w a
Done ())

performRequestStore
   :: forall u w. Env u w -> RequestStore u -> IO ()
performRequestStore :: Env u w -> RequestStore u -> IO ()
performRequestStore Env u w
env RequestStore u
reqStore =
  Env u w -> [BlockedFetches u] -> IO ()
forall u w. Env u w -> [BlockedFetches u] -> IO ()
performFetches Env u w
env (RequestStore u -> [BlockedFetches u]
forall u. RequestStore u -> [BlockedFetches u]
contents RequestStore u
reqStore)

-- | Issues a batch of fetches in a 'RequestStore'. After
-- 'performFetches', all the requests in the 'RequestStore' are
-- complete, and all of the 'ResultVar's are full.
performFetches
  :: forall u w. Env u w -> [BlockedFetches u] -> IO ()
performFetches :: Env u w -> [BlockedFetches u] -> IO ()
performFetches env :: Env u w
env@Env{flags :: forall u w. Env u w -> Flags
flags=Flags
f, statsRef :: forall u w. Env u w -> IORef Stats
statsRef=IORef Stats
sref, statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef=IORef CallId
sbref} [BlockedFetches u]
jobs = do
  Int64
t0 <- IO Int64
getTimestamp

  Flags -> CallId -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => Flags -> CallId -> m a -> m ()
ifTrace Flags
f CallId
3 (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    [BlockedFetches u] -> (BlockedFetches u -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [BlockedFetches u]
jobs ((BlockedFetches u -> IO ()) -> IO ())
-> (BlockedFetches u -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(BlockedFetches [BlockedFetch r]
reqs [BlockedFetchInternal]
_) ->
      [BlockedFetch r] -> (BlockedFetch r -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [BlockedFetch r]
reqs ((BlockedFetch r -> IO ()) -> IO ())
-> (BlockedFetch r -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(BlockedFetch r a
r ResultVar a
_) -> String -> IO ()
putStrLn (r a -> String
forall (f :: * -> *) a. ShowP f => f a -> String
showp r a
r)

  let
    applyFetch :: CallId -> BlockedFetches u -> IO FetchToDo
applyFetch CallId
i bfs :: BlockedFetches u
bfs@(BlockedFetches ([BlockedFetch r]
reqs :: [BlockedFetch r]) [BlockedFetchInternal]
_) =
      case StateStore -> Maybe (State r)
forall (r :: * -> *). StateKey r => StateStore -> Maybe (State r)
stateGet (Env u w -> StateStore
forall u w. Env u w -> StateStore
states Env u w
env) of
        Maybe (State r)
Nothing ->
          FetchToDo -> IO FetchToDo
forall (m :: * -> *) a. Monad m => a -> m a
return ([BlockedFetch r] -> PerformFetch r -> FetchToDo
forall (req :: * -> *).
(DataSourceName req, Typeable req) =>
[BlockedFetch req] -> PerformFetch req -> FetchToDo
FetchToDo [BlockedFetch r]
reqs (([BlockedFetch r] -> IO ()) -> PerformFetch r
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
SyncFetch ((BlockedFetch r -> IO ()) -> [BlockedFetch r] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((forall a. r a -> DataSourceError) -> BlockedFetch r -> IO ()
forall e (r :: * -> *).
Exception e =>
(forall a. r a -> e) -> BlockedFetch r -> IO ()
setError forall a. r a -> DataSourceError
forall (req :: * -> *) a. ShowP req => req a -> DataSourceError
e))))
         where
           e :: ShowP req => req a -> DataSourceError
           e :: req a -> DataSourceError
e req a
req = Text -> DataSourceError
DataSourceError (Text -> DataSourceError) -> Text -> DataSourceError
forall a b. (a -> b) -> a -> b
$ Text
"data source not initialized: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
dsName
                  Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": "
                  Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
Text.pack (req a -> String
forall (f :: * -> *) a. ShowP f => f a -> String
showp req a
req)
        Just State r
state ->
          FetchToDo -> IO FetchToDo
forall (m :: * -> *) a. Monad m => a -> m a
return (FetchToDo -> IO FetchToDo) -> FetchToDo -> IO FetchToDo
forall a b. (a -> b) -> a -> b
$ [BlockedFetch r] -> PerformFetch r -> FetchToDo
forall (req :: * -> *).
(DataSourceName req, Typeable req) =>
[BlockedFetch req] -> PerformFetch req -> FetchToDo
FetchToDo [BlockedFetch r]
reqs
            (PerformFetch r -> FetchToDo) -> PerformFetch r -> FetchToDo
forall a b. (a -> b) -> a -> b
$ (if ReportFlag -> ReportFlags -> Bool
testReportFlag ReportFlag
ReportFetchStats (ReportFlags -> Bool) -> ReportFlags -> Bool
forall a b. (a -> b) -> a -> b
$ Flags -> ReportFlags
report Flags
f
                then u
-> IORef Stats
-> IORef CallId
-> Text
-> CallId
-> BlockedFetches u
-> PerformFetch r
-> PerformFetch r
forall u (req :: * -> *).
DataSource u req =>
u
-> IORef Stats
-> IORef CallId
-> Text
-> CallId
-> BlockedFetches u
-> PerformFetch req
-> PerformFetch req
wrapFetchInStats
                        (Env u w -> u
forall u w. Env u w -> u
userEnv Env u w
env)
                        IORef Stats
sref
                        IORef CallId
sbref
                        Text
dsName
                        ([BlockedFetch r] -> CallId
forall (t :: * -> *) a. Foldable t => t a -> CallId
length [BlockedFetch r]
reqs)
                        BlockedFetches u
bfs
                else PerformFetch r -> PerformFetch r
forall a. a -> a
id)
            (PerformFetch r -> PerformFetch r)
-> PerformFetch r -> PerformFetch r
forall a b. (a -> b) -> a -> b
$ CallId -> CallId -> Text -> PerformFetch r -> PerformFetch r
forall (req :: * -> *).
CallId -> CallId -> Text -> PerformFetch req -> PerformFetch req
wrapFetchInTrace CallId
i ([BlockedFetch r] -> CallId
forall (t :: * -> *) a. Foldable t => t a -> CallId
length [BlockedFetch r]
reqs) Text
dsName
            (PerformFetch r -> PerformFetch r)
-> PerformFetch r -> PerformFetch r
forall a b. (a -> b) -> a -> b
$ [BlockedFetch r] -> PerformFetch r -> PerformFetch r
forall (req :: * -> *).
[BlockedFetch req] -> PerformFetch req -> PerformFetch req
wrapFetchInCatch [BlockedFetch r]
reqs
            (PerformFetch r -> PerformFetch r)
-> PerformFetch r -> PerformFetch r
forall a b. (a -> b) -> a -> b
$ State r -> Flags -> u -> PerformFetch r
forall u (req :: * -> *).
DataSource u req =>
State req -> Flags -> u -> PerformFetch req
fetch State r
state Flags
f (Env u w -> u
forall u w. Env u w -> u
userEnv Env u w
env)
      where
        dsName :: Text
dsName = Proxy r -> Text
forall (req :: * -> *). DataSourceName req => Proxy req -> Text
dataSourceName (Proxy r
forall k (t :: k). Proxy t
Proxy :: Proxy r)

  [FetchToDo]
fetches <- (CallId -> BlockedFetches u -> IO FetchToDo)
-> [CallId] -> [BlockedFetches u] -> IO [FetchToDo]
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m [c]
zipWithM CallId -> BlockedFetches u -> IO FetchToDo
applyFetch [CallId
0..] [BlockedFetches u]
jobs

  [FetchToDo] -> IORef ReqCountMap -> Flags -> IO ()
scheduleFetches [FetchToDo]
fetches (Env u w -> IORef ReqCountMap
forall u w. Env u w -> IORef ReqCountMap
submittedReqsRef Env u w
env) (Env u w -> Flags
forall u w. Env u w -> Flags
flags Env u w
env)

  Int64
t1 <- IO Int64
getTimestamp
  let roundtime :: Double
roundtime = Int64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
t1 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
t0) Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
1000000 :: Double

  Flags -> CallId -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => Flags -> CallId -> m a -> m ()
ifTrace Flags
f CallId
1 (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    String -> Double -> IO ()
forall r. PrintfType r => String -> r
printf String
"Batch data fetch done (%.4fs)\n" (Double -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac Double
roundtime :: Double)

data FetchToDo where
  FetchToDo
    :: forall (req :: Type -> Type). (DataSourceName req, Typeable req)
    => [BlockedFetch req] -> PerformFetch req -> FetchToDo

-- Catch exceptions arising from the data source and stuff them into
-- the appropriate requests.  We don't want any exceptions propagating
-- directly from the data sources, because we want the exception to be
-- thrown by dataFetch instead.
--
wrapFetchInCatch :: [BlockedFetch req] -> PerformFetch req -> PerformFetch req
wrapFetchInCatch :: [BlockedFetch req] -> PerformFetch req -> PerformFetch req
wrapFetchInCatch [BlockedFetch req]
reqs PerformFetch req
fetch =
  case PerformFetch req
fetch of
    SyncFetch [BlockedFetch req] -> IO ()
f ->
      ([BlockedFetch req] -> IO ()) -> PerformFetch req
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
SyncFetch (([BlockedFetch req] -> IO ()) -> PerformFetch req)
-> ([BlockedFetch req] -> IO ()) -> PerformFetch req
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch req]
reqs -> [BlockedFetch req] -> IO ()
f [BlockedFetch req]
reqs IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exception.catch` SomeException -> IO ()
handler
    AsyncFetch [BlockedFetch req] -> IO () -> IO ()
f ->
      ([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
forall (req :: * -> *).
([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
AsyncFetch (([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req)
-> ([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch req]
reqs IO ()
io -> [BlockedFetch req] -> IO () -> IO ()
f [BlockedFetch req]
reqs IO ()
io IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exception.catch` SomeException -> IO ()
handler
      -- this might be wrong: if the outer 'fio' throws an exception,
      -- then we don't know whether we have executed the inner 'io' or
      -- not.  If not, then we'll likely get some errors about "did
      -- not set result var" later, because we haven't executed some
      -- data fetches.  But we can't execute 'io' in the handler,
      -- because we might have already done it.  It isn't possible to
      -- do it completely right here, so we have to rely on data
      -- sources themselves to catch (synchronous) exceptions.  Async
      -- exceptions aren't a problem because we're going to rethrow
      -- them all the way to runHaxl anyway.
    BackgroundFetch [BlockedFetch req] -> IO ()
f ->
      ([BlockedFetch req] -> IO ()) -> PerformFetch req
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
BackgroundFetch (([BlockedFetch req] -> IO ()) -> PerformFetch req)
-> ([BlockedFetch req] -> IO ()) -> PerformFetch req
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch req]
reqs -> [BlockedFetch req] -> IO ()
f [BlockedFetch req]
reqs IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exception.catch` SomeException -> IO ()
handler
  where
    handler :: SomeException -> IO ()
    handler :: SomeException -> IO ()
handler SomeException
e = do
      SomeException -> IO ()
rethrowAsyncExceptions SomeException
e
      (BlockedFetch req -> IO ()) -> [BlockedFetch req] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (SomeException -> BlockedFetch req -> IO ()
forall p (r :: * -> *). Exception p => p -> BlockedFetch r -> IO ()
forceError SomeException
e) [BlockedFetch req]
reqs

    -- Set the exception even if the request already had a result.
    -- Otherwise we could be discarding an exception.
    forceError :: p -> BlockedFetch r -> IO ()
forceError p
e (BlockedFetch r a
_ ResultVar a
rvar) =
      ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResult ResultVar a
rvar (p -> Either SomeException a
forall e a. Exception e => e -> Either SomeException a
except p
e)


data FailureCount = FailureCount
  { FailureCount -> CallId
failureCountStandard :: {-# UNPACK #-} !Int
  , FailureCount -> CallId
failureCountIgnored :: {-# UNPACK #-} !Int
  }

#if __GLASGOW_HASKELL__ >= 804
instance Semigroup FailureCount where
  <> :: FailureCount -> FailureCount -> FailureCount
(<>) = FailureCount -> FailureCount -> FailureCount
forall a. Monoid a => a -> a -> a
mappend
#endif

instance Monoid FailureCount where
  mempty :: FailureCount
mempty = CallId -> CallId -> FailureCount
FailureCount CallId
0 CallId
0
  mappend :: FailureCount -> FailureCount -> FailureCount
mappend (FailureCount CallId
s1 CallId
i1) (FailureCount CallId
s2 CallId
i2)
    = CallId -> CallId -> FailureCount
FailureCount (CallId
s1CallId -> CallId -> CallId
forall a. Num a => a -> a -> a
+CallId
s2) (CallId
i1CallId -> CallId -> CallId
forall a. Num a => a -> a -> a
+CallId
i2)

wrapFetchInStats
  :: DataSource u req
  => u
  -> IORef Stats
  -> IORef Int
  -> Text
  -> Int
  -> BlockedFetches u
  -> PerformFetch req
  -> PerformFetch req
wrapFetchInStats :: u
-> IORef Stats
-> IORef CallId
-> Text
-> CallId
-> BlockedFetches u
-> PerformFetch req
-> PerformFetch req
wrapFetchInStats
  u
u
  !IORef Stats
statsRef
  !IORef CallId
batchIdRef
  Text
dataSource
  CallId
batchSize
  (BlockedFetches [BlockedFetch r]
_reqs [BlockedFetchInternal]
reqsI)
  PerformFetch req
perform = do
  case PerformFetch req
perform of
    SyncFetch [BlockedFetch req] -> IO ()
f ->
      ([BlockedFetch req] -> IO ()) -> PerformFetch req
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
SyncFetch (([BlockedFetch req] -> IO ()) -> PerformFetch req)
-> ([BlockedFetch req] -> IO ()) -> PerformFetch req
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch req]
reqs -> do
        CallId
bid <- IO CallId
newBatchId
        IORef FailureCount
fail_ref <- FailureCount -> IO (IORef FailureCount)
forall a. a -> IO (IORef a)
newIORef FailureCount
forall a. Monoid a => a
mempty
        (Int64
t0,Int64
t,Int64
alloc,()
_) <- IO () -> IO (Int64, Int64, Int64, ())
forall c d. Num c => IO d -> IO (Int64, Int64, c, d)
statsForIO ([BlockedFetch req] -> IO ()
f ((BlockedFetch req -> BlockedFetch req)
-> [BlockedFetch req] -> [BlockedFetch req]
forall a b. (a -> b) -> [a] -> [b]
map (u -> IORef FailureCount -> BlockedFetch req -> BlockedFetch req
forall u (r :: * -> *).
DataSource u r =>
u -> IORef FailureCount -> BlockedFetch r -> BlockedFetch r
addFailureCount u
u IORef FailureCount
fail_ref)
          (CallId -> [BlockedFetch req] -> [BlockedFetch req]
reqsWithFetchDsStats CallId
bid [BlockedFetch req]
reqs)))
        FailureCount
failures <- IORef FailureCount -> IO FailureCount
forall a. IORef a -> IO a
readIORef IORef FailureCount
fail_ref
        CallId
-> [CallId]
-> Int64
-> Int64
-> Int64
-> CallId
-> FailureCount
-> IO ()
updateFetchStats CallId
bid [CallId]
allFids Int64
t0 Int64
t Int64
alloc CallId
batchSize FailureCount
failures
    AsyncFetch [BlockedFetch req] -> IO () -> IO ()
f -> do
      ([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
forall (req :: * -> *).
([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
AsyncFetch (([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req)
-> ([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch req]
reqs IO ()
inner -> do
        CallId
bid <- IO CallId
newBatchId
        IORef (Int64, Int64)
inner_r <- (Int64, Int64) -> IO (IORef (Int64, Int64))
forall a. a -> IO (IORef a)
newIORef (Int64
0, Int64
0)
        IORef FailureCount
fail_ref <- FailureCount -> IO (IORef FailureCount)
forall a. a -> IO (IORef a)
newIORef FailureCount
forall a. Monoid a => a
mempty
        let inner' :: IO ()
inner' = do
              (Int64
_,Int64
t,Int64
alloc,()
_) <- IO () -> IO (Int64, Int64, Int64, ())
forall c d. Num c => IO d -> IO (Int64, Int64, c, d)
statsForIO IO ()
inner
              IORef (Int64, Int64) -> (Int64, Int64) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Int64, Int64)
inner_r (Int64
t,Int64
alloc)
            reqs' :: [BlockedFetch req]
reqs' = (BlockedFetch req -> BlockedFetch req)
-> [BlockedFetch req] -> [BlockedFetch req]
forall a b. (a -> b) -> [a] -> [b]
map (u -> IORef FailureCount -> BlockedFetch req -> BlockedFetch req
forall u (r :: * -> *).
DataSource u r =>
u -> IORef FailureCount -> BlockedFetch r -> BlockedFetch r
addFailureCount u
u IORef FailureCount
fail_ref) [BlockedFetch req]
reqs
            reqs'' :: [BlockedFetch req]
reqs'' = CallId -> [BlockedFetch req] -> [BlockedFetch req]
reqsWithFetchDsStats CallId
bid [BlockedFetch req]
reqs'
        (Int64
t0, Int64
totalTime, Int64
totalAlloc, ()
_) <- IO () -> IO (Int64, Int64, Int64, ())
forall c d. Num c => IO d -> IO (Int64, Int64, c, d)
statsForIO ([BlockedFetch req] -> IO () -> IO ()
f [BlockedFetch req]
reqs'' IO ()
inner')
        (Int64
innerTime, Int64
innerAlloc) <- IORef (Int64, Int64) -> IO (Int64, Int64)
forall a. IORef a -> IO a
readIORef IORef (Int64, Int64)
inner_r
        FailureCount
failures <- IORef FailureCount -> IO FailureCount
forall a. IORef a -> IO a
readIORef IORef FailureCount
fail_ref
        CallId
-> [CallId]
-> Int64
-> Int64
-> Int64
-> CallId
-> FailureCount
-> IO ()
updateFetchStats CallId
bid [CallId]
allFids Int64
t0 (Int64
totalTime Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
innerTime)
          (Int64
totalAlloc Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
innerAlloc) CallId
batchSize FailureCount
failures
    BackgroundFetch [BlockedFetch req] -> IO ()
io -> do
      ([BlockedFetch req] -> IO ()) -> PerformFetch req
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
BackgroundFetch (([BlockedFetch req] -> IO ()) -> PerformFetch req)
-> ([BlockedFetch req] -> IO ()) -> PerformFetch req
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch req]
reqs -> do
        CallId
bid <- IO CallId
newBatchId
        Int64
startTime <- IO Int64
getTimestamp
        [BlockedFetch req] -> IO ()
io (CallId -> [BlockedFetch req] -> [BlockedFetch req]
reqsWithFetchDsStats CallId
bid
          ((BlockedFetch req -> BlockedFetchInternal -> BlockedFetch req)
-> [BlockedFetch req]
-> [BlockedFetchInternal]
-> [BlockedFetch req]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith (u
-> CallId
-> Int64
-> BlockedFetch req
-> BlockedFetchInternal
-> BlockedFetch req
forall p (r :: * -> *).
DataSource p r =>
p
-> CallId
-> Int64
-> BlockedFetch r
-> BlockedFetchInternal
-> BlockedFetch r
addTimer u
u CallId
bid Int64
startTime) [BlockedFetch req]
reqs [BlockedFetchInternal]
reqsI))
  where
    allFids :: [CallId]
allFids = (BlockedFetchInternal -> CallId)
-> [BlockedFetchInternal] -> [CallId]
forall a b. (a -> b) -> [a] -> [b]
map (\(BlockedFetchInternal CallId
k) -> CallId
k) [BlockedFetchInternal]
reqsI
    newBatchId :: IO CallId
newBatchId = IORef CallId -> (CallId -> (CallId, CallId)) -> IO CallId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef CallId
batchIdRef ((CallId -> (CallId, CallId)) -> IO CallId)
-> (CallId -> (CallId, CallId)) -> IO CallId
forall a b. (a -> b) -> a -> b
$ \CallId
x -> (CallId
xCallId -> CallId -> CallId
forall a. Num a => a -> a -> a
+CallId
1,CallId
xCallId -> CallId -> CallId
forall a. Num a => a -> a -> a
+CallId
1)
    statsForIO :: IO d -> IO (Int64, Int64, c, d)
statsForIO IO d
io = do
      Int64
prevAlloc <- IO Int64
getAllocationCounter
      (Int64
t0,Int64
t,d
a) <- IO d -> IO (Int64, Int64, d)
forall a. IO a -> IO (Int64, Int64, a)
time IO d
io
      Int64
postAlloc <- IO Int64
getAllocationCounter
      (Int64, Int64, c, d) -> IO (Int64, Int64, c, d)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int64
t0,Int64
t, Int64 -> c
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> c) -> Int64 -> c
forall a b. (a -> b) -> a -> b
$ Int64
prevAlloc Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
postAlloc, d
a)
    reqsWithFetchDsStats :: CallId -> [BlockedFetch req] -> [BlockedFetch req]
reqsWithFetchDsStats = \CallId
bid [BlockedFetch req]
reqs
      -> (BlockedFetch req -> BlockedFetchInternal -> BlockedFetch req)
-> [BlockedFetch req]
-> [BlockedFetchInternal]
-> [BlockedFetch req]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith (CallId
-> BlockedFetch req -> BlockedFetchInternal -> BlockedFetch req
forall (r :: * -> *).
CallId -> BlockedFetch r -> BlockedFetchInternal -> BlockedFetch r
addFetchDatasourceStats CallId
bid) [BlockedFetch req]
reqs [BlockedFetchInternal]
reqsI
    addTimer :: p
-> CallId
-> Int64
-> BlockedFetch r
-> BlockedFetchInternal
-> BlockedFetch r
addTimer
      p
u
      CallId
bid
      Int64
t0
      (BlockedFetch r a
req (ResultVar Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
fn))
      (BlockedFetchInternal fid) =
        r a -> ResultVar a -> BlockedFetch r
forall (r :: * -> *) a. r a -> ResultVar a -> BlockedFetch r
BlockedFetch r a
req (ResultVar a -> BlockedFetch r) -> ResultVar a -> BlockedFetch r
forall a b. (a -> b) -> a -> b
$ (Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a.
(Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
ResultVar ((Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
 -> ResultVar a)
-> (Either SomeException a
    -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a b. (a -> b) -> a -> b
$ \Either SomeException a
result Bool
isChildThread Maybe DataSourceStats
stats -> do
          Int64
t1 <- IO Int64
getTimestamp
          -- We cannot measure allocation easily for BackgroundFetch. Here we
          -- just attribute all allocation to the last
          -- `putResultFromChildThread` and use 0 for the others.
          -- While the individual allocations may not be correct,
          -- the total sum and amortized allocation are still meaningful.
          -- see Note [tracking allocation in child threads]
          Int64
allocs <- if Bool
isChildThread then IO Int64
getAllocationCounter else Int64 -> IO Int64
forall (m :: * -> *) a. Monad m => a -> m a
return Int64
0
          CallId
-> [CallId]
-> Int64
-> Int64
-> Int64
-> CallId
-> FailureCount
-> IO ()
updateFetchStats CallId
bid [CallId
fid] Int64
t0 (Int64
t1 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
t0)
            (Int64 -> Int64
forall a. Num a => a -> a
negate Int64
allocs)
            CallId
1 -- batch size: we don't know if this is a batch or not
            (p -> r a -> Either SomeException a -> FailureCount
forall u (req :: * -> *) a.
DataSource u req =>
u -> req a -> Either SomeException a -> FailureCount
calcFailure p
u r a
req Either SomeException a
result) -- failures
          Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
fn Either SomeException a
result Bool
isChildThread Maybe DataSourceStats
stats

    addFetchDatasourceStats
      :: Int
      -> BlockedFetch r
      -> BlockedFetchInternal
      -> BlockedFetch r
    addFetchDatasourceStats :: CallId -> BlockedFetch r -> BlockedFetchInternal -> BlockedFetch r
addFetchDatasourceStats CallId
bid
      (BlockedFetch r a
req (ResultVar Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
fn))
      (BlockedFetchInternal CallId
fid) = r a -> ResultVar a -> BlockedFetch r
forall (r :: * -> *) a. r a -> ResultVar a -> BlockedFetch r
BlockedFetch r a
req (ResultVar a -> BlockedFetch r) -> ResultVar a -> BlockedFetch r
forall a b. (a -> b) -> a -> b
$ (Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a.
(Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
ResultVar
        ((Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
 -> ResultVar a)
-> (Either SomeException a
    -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a b. (a -> b) -> a -> b
$ \Either SomeException a
result Bool
isChildThread Maybe DataSourceStats
stats -> do
          let mkStats :: DataSourceStats -> FetchStats
mkStats DataSourceStats
dss = FetchDataSourceStats :: CallId -> Text -> DataSourceStats -> CallId -> FetchStats
FetchDataSourceStats
                { fetchDsStatsCallId :: CallId
fetchDsStatsCallId = CallId
fid
                , fetchDsStatsDataSource :: Text
fetchDsStatsDataSource = Text
dataSource
                , fetchDsStatsStats :: DataSourceStats
fetchDsStatsStats = DataSourceStats
dss
                , fetchBatchId :: CallId
fetchBatchId = CallId
bid
                }
          case Maybe DataSourceStats
stats of
            Just DataSourceStats
dss -> IORef Stats -> (Stats -> (Stats, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Stats
statsRef
              ((Stats -> (Stats, ())) -> IO ())
-> (Stats -> (Stats, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Stats [FetchStats]
fs) -> ([FetchStats] -> Stats
Stats (DataSourceStats -> FetchStats
mkStats DataSourceStats
dss FetchStats -> [FetchStats] -> [FetchStats]
forall a. a -> [a] -> [a]
: [FetchStats]
fs), ())
            Maybe DataSourceStats
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
fn Either SomeException a
result Bool
isChildThread Maybe DataSourceStats
stats


    updateFetchStats
      :: Int
      -> [CallId]
      -> Timestamp
      -> Microseconds
      -> Int64
      -> Int
      -> FailureCount
      -> IO ()
    updateFetchStats :: CallId
-> [CallId]
-> Int64
-> Int64
-> Int64
-> CallId
-> FailureCount
-> IO ()
updateFetchStats CallId
bid [CallId]
fids Int64
start Int64
time Int64
space CallId
batch FailureCount{CallId
failureCountIgnored :: CallId
failureCountStandard :: CallId
failureCountIgnored :: FailureCount -> CallId
failureCountStandard :: FailureCount -> CallId
..} = do
      let this :: FetchStats
this = FetchStats :: Text
-> CallId
-> Int64
-> Int64
-> Int64
-> CallId
-> CallId
-> CallId
-> [CallId]
-> FetchStats
FetchStats { fetchDataSource :: Text
fetchDataSource = Text
dataSource
                            , fetchBatchSize :: CallId
fetchBatchSize = CallId
batch
                            , fetchStart :: Int64
fetchStart = Int64
start
                            , fetchDuration :: Int64
fetchDuration = Int64
time
                            , fetchSpace :: Int64
fetchSpace = Int64
space
                            , fetchFailures :: CallId
fetchFailures = CallId
failureCountStandard
                            , fetchIgnoredFailures :: CallId
fetchIgnoredFailures = CallId
failureCountIgnored
                            , fetchBatchId :: CallId
fetchBatchId = CallId
bid
                            , fetchIds :: [CallId]
fetchIds = [CallId]
fids }
      IORef Stats -> (Stats -> (Stats, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Stats
statsRef ((Stats -> (Stats, ())) -> IO ())
-> (Stats -> (Stats, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Stats [FetchStats]
fs) -> ([FetchStats] -> Stats
Stats (FetchStats
this FetchStats -> [FetchStats] -> [FetchStats]
forall a. a -> [a] -> [a]
: [FetchStats]
fs), ())

    addFailureCount :: DataSource u r
      => u -> IORef FailureCount -> BlockedFetch r -> BlockedFetch r
    addFailureCount :: u -> IORef FailureCount -> BlockedFetch r -> BlockedFetch r
addFailureCount u
u IORef FailureCount
ref (BlockedFetch r a
req (ResultVar Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
fn)) =
      r a -> ResultVar a -> BlockedFetch r
forall (r :: * -> *) a. r a -> ResultVar a -> BlockedFetch r
BlockedFetch r a
req (ResultVar a -> BlockedFetch r) -> ResultVar a -> BlockedFetch r
forall a b. (a -> b) -> a -> b
$ (Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a.
(Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
ResultVar ((Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
 -> ResultVar a)
-> (Either SomeException a
    -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a b. (a -> b) -> a -> b
$ \Either SomeException a
result Bool
isChildThread Maybe DataSourceStats
stats -> do
        let addFailures :: FailureCount -> (FailureCount, ())
addFailures FailureCount
r = (FailureCount
r FailureCount -> FailureCount -> FailureCount
forall a. Semigroup a => a -> a -> a
<> u -> r a -> Either SomeException a -> FailureCount
forall u (req :: * -> *) a.
DataSource u req =>
u -> req a -> Either SomeException a -> FailureCount
calcFailure u
u r a
req Either SomeException a
result, ())
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Either SomeException a -> Bool
forall a b. Either a b -> Bool
isLeft Either SomeException a
result) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef FailureCount -> (FailureCount -> (FailureCount, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef FailureCount
ref FailureCount -> (FailureCount, ())
addFailures
        Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
fn Either SomeException a
result Bool
isChildThread Maybe DataSourceStats
stats

wrapFetchInTrace
  :: Int
  -> Int
  -> Text
  -> PerformFetch req
  -> PerformFetch req
#ifdef EVENTLOG
wrapFetchInTrace i n dsName f =
  case f of
    SyncFetch io -> SyncFetch (wrapF "Sync" io)
    AsyncFetch fio -> AsyncFetch (wrapF "Async" . fio . unwrapF "Async")
  where
    d = Text.unpack dsName
    wrapF :: String -> IO a -> IO a
    wrapF ty = bracket_ (traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
                        (traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
    unwrapF :: String -> IO a -> IO a
    unwrapF ty = bracket_ (traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
                          (traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
#else
wrapFetchInTrace :: CallId -> CallId -> Text -> PerformFetch req -> PerformFetch req
wrapFetchInTrace CallId
_ CallId
_ Text
_ PerformFetch req
f = PerformFetch req
f
#endif

time :: IO a -> IO (Timestamp,Microseconds,a)
time :: IO a -> IO (Int64, Int64, a)
time IO a
io = do
  Int64
t0 <- IO Int64
getTimestamp
  a
a <- IO a
io
  Int64
t1 <- IO Int64
getTimestamp
  (Int64, Int64, a) -> IO (Int64, Int64, a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int64
t0, Int64
t1 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
t0, a
a)

-- | Start all the async fetches first, then perform the sync fetches before
-- getting the results of the async fetches.
scheduleFetches :: [FetchToDo] -> IORef ReqCountMap -> Flags -> IO ()
scheduleFetches :: [FetchToDo] -> IORef ReqCountMap -> Flags -> IO ()
scheduleFetches [FetchToDo]
fetches IORef ReqCountMap
ref Flags
flags = do
  -- update ReqCountmap for these fetches
  Flags -> ReportFlag -> IO () -> IO ()
forall (m :: * -> *) a.
Monad m =>
Flags -> ReportFlag -> m a -> m ()
ifReport Flags
flags ReportFlag
ReportOutgoneFetches (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_
    [ IORef ReqCountMap -> (ReqCountMap -> (ReqCountMap, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef ReqCountMap
ref ((ReqCountMap -> (ReqCountMap, ())) -> IO ())
-> (ReqCountMap -> (ReqCountMap, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$
        \ReqCountMap
m -> (Proxy req -> CallId -> ReqCountMap -> ReqCountMap
forall (r :: * -> *).
(DataSourceName r, Typeable r) =>
Proxy r -> CallId -> ReqCountMap -> ReqCountMap
addToCountMap (Proxy req
forall k (t :: k). Proxy t
Proxy :: Proxy r) ([BlockedFetch req] -> CallId
forall (t :: * -> *) a. Foldable t => t a -> CallId
length [BlockedFetch req]
reqs) ReqCountMap
m, ())
    | FetchToDo ([BlockedFetch req]
reqs :: [BlockedFetch r]) PerformFetch req
_f <- [FetchToDo]
fetches
    ]

  Flags -> CallId -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => Flags -> CallId -> m a -> m ()
ifTrace Flags
flags CallId
1 (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
forall r. PrintfType r => String -> r
printf String
"Batch data fetch round: %s\n" (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
     String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate (String
", "::String) ([String] -> String) -> [String] -> String
forall a b. (a -> b) -> a -> b
$
        ((CallId, String, String) -> String)
-> [(CallId, String, String)] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (\(CallId
c, String
n, String
ds) -> String -> String -> String -> CallId -> String
forall r. PrintfType r => String -> r
printf String
"%s %s %d" String
n String
ds CallId
c) [(CallId, String, String)]
stats

  IO ()
fully_async_fetches
  IO () -> IO ()
async_fetches IO ()
sync_fetches
 where

  fetchName :: forall req . PerformFetch req -> String
  fetchName :: PerformFetch req -> String
fetchName (BackgroundFetch [BlockedFetch req] -> IO ()
_) = String
"background"
  fetchName (AsyncFetch [BlockedFetch req] -> IO () -> IO ()
_) = String
"async"
  fetchName (SyncFetch [BlockedFetch req] -> IO ()
_) = String
"sync"

  srcName :: forall req . (DataSourceName req) => [BlockedFetch req] -> String
  srcName :: [BlockedFetch req] -> String
srcName [BlockedFetch req]
_ = Text -> String
Text.unpack (Text -> String) -> Text -> String
forall a b. (a -> b) -> a -> b
$ Proxy req -> Text
forall (req :: * -> *). DataSourceName req => Proxy req -> Text
dataSourceName (Proxy req
forall k (t :: k). Proxy t
Proxy :: Proxy req)

  stats :: [(CallId, String, String)]
stats = [([BlockedFetch req] -> CallId
forall (t :: * -> *) a. Foldable t => t a -> CallId
length [BlockedFetch req]
reqs, PerformFetch req -> String
forall (req :: * -> *). PerformFetch req -> String
fetchName PerformFetch req
f, [BlockedFetch req] -> String
forall (req :: * -> *).
DataSourceName req =>
[BlockedFetch req] -> String
srcName [BlockedFetch req]
reqs)
          | FetchToDo [BlockedFetch req]
reqs PerformFetch req
f <- [FetchToDo]
fetches]

  fully_async_fetches :: IO ()
  fully_async_fetches :: IO ()
fully_async_fetches = [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_
    [[BlockedFetch req] -> IO ()
f [BlockedFetch req]
reqs | FetchToDo [BlockedFetch req]
reqs (BackgroundFetch [BlockedFetch req] -> IO ()
f) <- [FetchToDo]
fetches]

  async_fetches :: IO () -> IO ()
  async_fetches :: IO () -> IO ()
async_fetches = [IO () -> IO ()] -> IO () -> IO ()
forall a. [a -> a] -> a -> a
compose
    [[BlockedFetch req] -> IO () -> IO ()
f [BlockedFetch req]
reqs | FetchToDo [BlockedFetch req]
reqs (AsyncFetch [BlockedFetch req] -> IO () -> IO ()
f) <- [FetchToDo]
fetches]

  sync_fetches :: IO ()
  sync_fetches :: IO ()
sync_fetches = [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_
    [[BlockedFetch req] -> IO ()
f [BlockedFetch req]
reqs | FetchToDo [BlockedFetch req]
reqs (SyncFetch [BlockedFetch req] -> IO ()
f) <- [FetchToDo]
fetches]


-- | An exception thrown when reading from datasources fails
data ReadingCompletionsFailedFetch = ReadingCompletionsFailedFetch Text
  deriving CallId -> ReadingCompletionsFailedFetch -> String -> String
[ReadingCompletionsFailedFetch] -> String -> String
ReadingCompletionsFailedFetch -> String
(CallId -> ReadingCompletionsFailedFetch -> String -> String)
-> (ReadingCompletionsFailedFetch -> String)
-> ([ReadingCompletionsFailedFetch] -> String -> String)
-> Show ReadingCompletionsFailedFetch
forall a.
(CallId -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [ReadingCompletionsFailedFetch] -> String -> String
$cshowList :: [ReadingCompletionsFailedFetch] -> String -> String
show :: ReadingCompletionsFailedFetch -> String
$cshow :: ReadingCompletionsFailedFetch -> String
showsPrec :: CallId -> ReadingCompletionsFailedFetch -> String -> String
$cshowsPrec :: CallId -> ReadingCompletionsFailedFetch -> String -> String
Show

instance Exception ReadingCompletionsFailedFetch