-- Copyright (c) 2014-present, Facebook, Inc.
-- All rights reserved.
--
-- This source code is distributed under the terms of a BSD license,
-- found in the LICENSE file.

{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}

-- |
-- The 'DataSource' class and related types and functions.  This
-- module is provided for access to Haxl internals only; most users
-- should import "Haxl.Core" instead.
--
module Haxl.Core.DataSource
  (
  -- * Data fetching
    DataSource(..)
  , DataSourceName(..)
  , Request
  , BlockedFetch(..)
  , PerformFetch(..)
  , SchedulerHint(..)
  , FailureClassification(..)

  -- * Result variables
  , ResultVar(..)
  , mkResultVar
  , putFailure
  , putResult
  , putResultFromChildThread
  , putSuccess
  , putResultWithStats
  , putResultWithStatsFromChildThread

  -- * Default fetch implementations
  , asyncFetch, asyncFetchWithDispatch
  , asyncFetchAcquireRelease
  , backgroundFetchSeq, backgroundFetchPar
  , backgroundFetchAcquireRelease
  , backgroundFetchAcquireReleaseMVar
  , stubFetch
  , syncFetch

  -- * Utilities
  , except
  , setError
  ) where

import Control.Exception
import Control.Monad
import Data.Hashable
import Data.Text (Text)
import Data.Kind (Type)
import Data.Typeable

import Haxl.Core.Exception
import Haxl.Core.Flags
import Haxl.Core.ShowP
import Haxl.Core.StateStore
import Haxl.Core.Stats


import GHC.Conc ( newStablePtrPrimMVar
                , PrimMVar)
import Control.Concurrent ( threadCapability
                          , forkOn
                          , myThreadId )
import Control.Concurrent.MVar
import Foreign.StablePtr

-- ---------------------------------------------------------------------------
-- DataSource class

-- | The class of data sources, parameterised over the request type for
-- that data source. Every data source must implement this class.
--
-- A data source keeps track of its state by creating an instance of
-- 'StateKey' to map the request type to its state. In this case, the
-- type of the state should probably be a reference type of some kind,
-- such as 'IORef'.
--
-- For a complete example data source, see
-- <https://github.com/facebook/Haxl/tree/master/example Examples>.
--
class (DataSourceName req, StateKey req, ShowP req) => DataSource u req where

  -- | Issues a list of fetches to this 'DataSource'. The 'BlockedFetch'
  -- objects contain both the request and the 'ResultVar's into which to put
  -- the results.
  fetch
    :: State req
      -- ^ Current state.
    -> Flags
      -- ^ Tracing flags.
    -> u
      -- ^ User environment.
    -> PerformFetch req
      -- ^ Fetch the data; see 'PerformFetch'.

  schedulerHint :: u -> SchedulerHint req
  schedulerHint u
_ = SchedulerHint req
forall (req :: * -> *). SchedulerHint req
TryToBatch

  classifyFailure :: u -> req a -> SomeException -> FailureClassification
  classifyFailure u
_ req a
_ SomeException
_ = FailureClassification
StandardFailure

class DataSourceName (req :: Type -> Type) where
  -- | The name of this 'DataSource', used in tracing and stats. Must
  -- take a dummy request.
  dataSourceName :: Proxy req -> Text

-- The 'ShowP' class is a workaround for the fact that we can't write
-- @'Show' (req a)@ as a superclass of 'DataSource', without also
-- parameterizing 'DataSource' over @a@, which is a pain (I tried
-- it). 'ShowP' seems fairly benign, though.

-- | A convenience only: package up 'Eq', 'Hashable', 'Typeable', and 'Show'
-- for requests into a single constraint.
type Request req a =
  ( Eq (req a)
  , Hashable (req a)
  , Typeable (req a)
  , Show (req a)
  , Show a
  )

-- | Hints to the scheduler about this data source
data SchedulerHint (req :: Type -> Type)
  = TryToBatch
    -- ^ Hold data-source requests while we execute as much as we can, so
    -- that we can hopefully collect more requests to batch.
  | SubmitImmediately
    -- ^ Submit a request via fetch as soon as we have one, don't try to
    -- batch multiple requests.  This is really only useful if the data source
    -- returns BackgroundFetch, otherwise requests to this data source will
    -- be performed synchronously, one at a time.

-- | Hints to the stats module about how to deal with these failures
data FailureClassification
  = StandardFailure
  | IgnoredForStatsFailure

-- | A data source can fetch data in one of four ways.
--
data PerformFetch req
  = SyncFetch  ([BlockedFetch req] -> IO ())
    -- ^ Fully synchronous, returns only when all the data is fetched.
    -- See 'syncFetch' for an example.
  | AsyncFetch ([BlockedFetch req] -> IO () -> IO ())
    -- ^ Asynchronous; performs an arbitrary IO action while the data
    -- is being fetched, but only returns when all the data is
    -- fetched.  See 'asyncFetch' for an example.
  | BackgroundFetch ([BlockedFetch req] -> IO ())
    -- ^ Fetches the data in the background, calling 'putResult' at
    -- any time in the future.  This is the best kind of fetch,
    -- because it provides the most concurrency.


-- | A 'BlockedFetch' is a pair of
--
--   * The request to fetch (with result type @a@)
--
--   * A 'ResultVar' to store either the result or an error
--
-- We often want to collect together multiple requests, but they return
-- different types, and the type system wouldn't let us put them
-- together in a list because all the elements of the list must have the
-- same type. So we wrap up these types inside the 'BlockedFetch' type,
-- so that they all look the same and we can put them in a list.
--
-- When we unpack the 'BlockedFetch' and get the request and the 'ResultVar'
-- out, the type system knows that the result type of the request
-- matches the type parameter of the 'ResultVar', so it will let us take the
-- result of the request and store it in the 'ResultVar'.
--
data BlockedFetch r = forall a. BlockedFetch (r a) (ResultVar a)


-- -----------------------------------------------------------------------------
-- ResultVar

-- | A sink for the result of a data fetch in 'BlockedFetch'
newtype ResultVar a =
  ResultVar (Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
  -- The Bool here is True if result was returned by a child thread,
  -- rather than the main runHaxl thread.  see Note [tracking allocation in
  -- child threads]

mkResultVar
  :: (Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
  -> ResultVar a
mkResultVar :: (Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
mkResultVar = (Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a.
(Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
ResultVar

putFailure :: (Exception e) => ResultVar a -> e -> IO ()
putFailure :: ResultVar a -> e -> IO ()
putFailure ResultVar a
r = ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResult ResultVar a
r (Either SomeException a -> IO ())
-> (e -> Either SomeException a) -> e -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> Either SomeException a
forall e a. Exception e => e -> Either SomeException a
except

putSuccess :: ResultVar a -> a -> IO ()
putSuccess :: ResultVar a -> a -> IO ()
putSuccess ResultVar a
r = ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResult ResultVar a
r (Either SomeException a -> IO ())
-> (a -> Either SomeException a) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Either SomeException a
forall a b. b -> Either a b
Right

putResult :: ResultVar a -> Either SomeException a -> IO ()
putResult :: ResultVar a -> Either SomeException a -> IO ()
putResult (ResultVar Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io) Either SomeException a
res = Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io Either SomeException a
res Bool
False Maybe DataSourceStats
forall a. Maybe a
Nothing

putResultWithStats
  :: ResultVar a -> Either SomeException a -> DataSourceStats -> IO ()
putResultWithStats :: ResultVar a -> Either SomeException a -> DataSourceStats -> IO ()
putResultWithStats (ResultVar Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io) Either SomeException a
res DataSourceStats
st = Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io Either SomeException a
res Bool
False (DataSourceStats -> Maybe DataSourceStats
forall a. a -> Maybe a
Just DataSourceStats
st)

putResultWithStatsFromChildThread
  :: ResultVar a -> Either SomeException a -> DataSourceStats -> IO ()
putResultWithStatsFromChildThread :: ResultVar a -> Either SomeException a -> DataSourceStats -> IO ()
putResultWithStatsFromChildThread (ResultVar Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io) Either SomeException a
res DataSourceStats
st = Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io Either SomeException a
res Bool
True (DataSourceStats -> Maybe DataSourceStats
forall a. a -> Maybe a
Just DataSourceStats
st)

-- | Like `putResult`, but used to get correct accounting when work is
-- being done in child threads.  This is particularly important for
-- data sources that are using 'BackgroundFetch', The allocation performed
-- in the child thread up to this point will be propagated back to the
-- thread that called 'runHaxl'.
--
-- Note: if you're doing multiple 'putResult' calls in the same thread
-- ensure that only the /last/ one is 'putResultFromChildThread'.  If you
-- make multiple 'putResultFromChildThread' calls, the allocation will be
-- counted multiple times.
--
-- If you are reusing a thread for multiple fetches, you should call
-- @System.Mem.setAllocationCounter 0@ after
-- 'putResultFromChildThread', so that allocation is not counted
-- multiple times.
putResultFromChildThread :: ResultVar a -> Either SomeException a -> IO ()
putResultFromChildThread :: ResultVar a -> Either SomeException a -> IO ()
putResultFromChildThread (ResultVar Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io) Either SomeException a
res =  Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
io Either SomeException a
res Bool
True Maybe DataSourceStats
forall a. Maybe a
Nothing
  -- see Note [tracking allocation in child threads]

-- | Function for easily setting a fetch to a particular exception
setError :: (Exception e) => (forall a. r a -> e) -> BlockedFetch r -> IO ()
setError :: (forall a. r a -> e) -> BlockedFetch r -> IO ()
setError forall a. r a -> e
e (BlockedFetch r a
req ResultVar a
m) = ResultVar a -> e -> IO ()
forall e a. Exception e => ResultVar a -> e -> IO ()
putFailure ResultVar a
m (r a -> e
forall a. r a -> e
e r a
req)

except :: (Exception e) => e -> Either SomeException a
except :: e -> Either SomeException a
except = SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> (e -> SomeException) -> e -> Either SomeException a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> SomeException
forall e. Exception e => e -> SomeException
toException


-- -----------------------------------------------------------------------------
-- Fetch templates

stubFetch
  :: (Exception e) => (forall a. r a -> e)
  -> State r -> Flags -> u -> PerformFetch r
stubFetch :: (forall a. r a -> e) -> State r -> Flags -> u -> PerformFetch r
stubFetch forall a. r a -> e
e State r
_state Flags
_flags u
_si = ([BlockedFetch r] -> IO ()) -> PerformFetch r
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
SyncFetch (([BlockedFetch r] -> IO ()) -> PerformFetch r)
-> ([BlockedFetch r] -> IO ()) -> PerformFetch r
forall a b. (a -> b) -> a -> b
$ (BlockedFetch r -> IO ()) -> [BlockedFetch r] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((forall a. r a -> e) -> BlockedFetch r -> IO ()
forall e (r :: * -> *).
Exception e =>
(forall a. r a -> e) -> BlockedFetch r -> IO ()
setError forall a. r a -> e
e)

-- | Common implementation templates for 'fetch' of 'DataSource'.
--
-- Example usage:
--
-- > fetch = syncFetch MyDS.withService MyDS.retrieve
-- >   $ \service request -> case request of
-- >     This x -> MyDS.fetchThis service x
-- >     That y -> MyDS.fetchThat service y
--
asyncFetchWithDispatch
  :: ((service -> IO ()) -> IO ())
  -- ^ Wrapper to perform an action in the context of a service.

  -> (service -> IO ())
  -- ^ Dispatch all the pending requests

  -> (service -> IO ())
  -- ^ Wait for the results

  -> (forall a. service -> request a -> IO (IO (Either SomeException a)))
  -- ^ Enqueue an individual request to the service.

  -> State request
  -- ^ Currently unused.

  -> Flags
  -- ^ Currently unused.

  -> u
  -- ^ Currently unused.

  -> PerformFetch request

asyncFetch, syncFetch
  :: ((service -> IO ()) -> IO ())
  -- ^ Wrapper to perform an action in the context of a service.

  -> (service -> IO ())
  -- ^ Dispatch all the pending requests and wait for the results

  -> (forall a. service -> request a -> IO (IO (Either SomeException a)))
  -- ^ Submits an individual request to the service.

  -> State request
  -- ^ Currently unused.

  -> Flags
  -- ^ Currently unused.

  -> u
  -- ^ Currently unused.

  -> PerformFetch request

asyncFetchWithDispatch :: ((service -> IO ()) -> IO ())
-> (service -> IO ())
-> (service -> IO ())
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
asyncFetchWithDispatch
  (service -> IO ()) -> IO ()
withService service -> IO ()
dispatch service -> IO ()
wait forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue State request
_state Flags
_flags u
_si =
  ([BlockedFetch request] -> IO () -> IO ()) -> PerformFetch request
forall (req :: * -> *).
([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
AsyncFetch (([BlockedFetch request] -> IO () -> IO ())
 -> PerformFetch request)
-> ([BlockedFetch request] -> IO () -> IO ())
-> PerformFetch request
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch request]
requests IO ()
inner -> (service -> IO ()) -> IO ()
withService ((service -> IO ()) -> IO ()) -> (service -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \service
service -> do
    [IO ()]
getResults <- (BlockedFetch request -> IO (IO ()))
-> [BlockedFetch request] -> IO [IO ()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (service
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
forall service (request :: * -> *).
service
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
submitFetch service
service forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue) [BlockedFetch request]
requests
    service -> IO ()
dispatch service
service
    IO ()
inner
    service -> IO ()
wait service
service
    [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [IO ()]
getResults

asyncFetch :: ((service -> IO ()) -> IO ())
-> (service -> IO ())
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
asyncFetch (service -> IO ()) -> IO ()
withService service -> IO ()
wait forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue State request
_state Flags
_flags u
_si =
  ([BlockedFetch request] -> IO () -> IO ()) -> PerformFetch request
forall (req :: * -> *).
([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
AsyncFetch (([BlockedFetch request] -> IO () -> IO ())
 -> PerformFetch request)
-> ([BlockedFetch request] -> IO () -> IO ())
-> PerformFetch request
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch request]
requests IO ()
inner -> (service -> IO ()) -> IO ()
withService ((service -> IO ()) -> IO ()) -> (service -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \service
service -> do
    [IO ()]
getResults <- (BlockedFetch request -> IO (IO ()))
-> [BlockedFetch request] -> IO [IO ()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (service
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
forall service (request :: * -> *).
service
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
submitFetch service
service forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue) [BlockedFetch request]
requests
    IO ()
inner
    service -> IO ()
wait service
service
    [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [IO ()]
getResults

syncFetch :: ((service -> IO ()) -> IO ())
-> (service -> IO ())
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
syncFetch (service -> IO ()) -> IO ()
withService service -> IO ()
dispatch forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue State request
_state Flags
_flags u
_si =
  ([BlockedFetch request] -> IO ()) -> PerformFetch request
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
SyncFetch (([BlockedFetch request] -> IO ()) -> PerformFetch request)
-> ([BlockedFetch request] -> IO ()) -> PerformFetch request
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch request]
requests -> (service -> IO ()) -> IO ()
withService ((service -> IO ()) -> IO ()) -> (service -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \service
service -> do
  [IO ()]
getResults <- (BlockedFetch request -> IO (IO ()))
-> [BlockedFetch request] -> IO [IO ()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (service
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
forall service (request :: * -> *).
service
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
submitFetch service
service forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue) [BlockedFetch request]
requests
  service -> IO ()
dispatch service
service
  [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [IO ()]
getResults

backgroundFetchSeq, backgroundFetchPar
  :: (forall a. request a -> IO (Either SomeException a))
  -- ^ Run one request, will be run in a background thread

  -> State request
  -- ^ Currently unused.

  -> Flags
  -- ^ Currently unused.

  -> u
  -- ^ Currently unused.

  -> PerformFetch request

backgroundFetchSeq :: (forall a. request a -> IO (Either SomeException a))
-> State request -> Flags -> u -> PerformFetch request
backgroundFetchSeq forall a. request a -> IO (Either SomeException a)
run State request
_state Flags
_flags u
_si =
  ([BlockedFetch request] -> IO ()) -> PerformFetch request
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
BackgroundFetch (([BlockedFetch request] -> IO ()) -> PerformFetch request)
-> ([BlockedFetch request] -> IO ()) -> PerformFetch request
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch request]
requests -> do
    (Int
cap, Bool
_) <- ThreadId -> IO (Int, Bool)
threadCapability (ThreadId -> IO (Int, Bool)) -> IO ThreadId -> IO (Int, Bool)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO ThreadId
myThreadId
    ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO ThreadId
forkOn Int
cap (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
      let rethrow :: SomeException -> IO ()
rethrow = [BlockedFetch request] -> SomeException -> IO ()
forall (req :: * -> *).
[BlockedFetch req] -> SomeException -> IO ()
rethrowFromBg [BlockedFetch request]
requests
      IO () -> IO ()
forall a. IO a -> IO a
restore ((BlockedFetch request -> IO ()) -> [BlockedFetch request] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ BlockedFetch request -> IO ()
runOne [BlockedFetch request]
requests) IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` SomeException -> IO ()
rethrow
      where
        runOne :: BlockedFetch request -> IO ()
runOne (BlockedFetch request a
request ResultVar a
result) = do
          Either SomeException a
res <- request a -> IO (Either SomeException a)
forall a. request a -> IO (Either SomeException a)
run request a
request
          ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResultFromBg ResultVar a
result Either SomeException a
res

backgroundFetchPar :: (forall a. request a -> IO (Either SomeException a))
-> State request -> Flags -> u -> PerformFetch request
backgroundFetchPar forall a. request a -> IO (Either SomeException a)
run State request
_state Flags
_flags u
_si =
  ([BlockedFetch request] -> IO ()) -> PerformFetch request
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
BackgroundFetch (([BlockedFetch request] -> IO ()) -> PerformFetch request)
-> ([BlockedFetch request] -> IO ()) -> PerformFetch request
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch request]
requests -> do
    (Int
cap, Bool
_) <- ThreadId -> IO (Int, Bool)
threadCapability (ThreadId -> IO (Int, Bool)) -> IO ThreadId -> IO (Int, Bool)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO ThreadId
myThreadId
    (BlockedFetch request -> IO ()) -> [BlockedFetch request] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Int -> BlockedFetch request -> IO ()
runOneInThread Int
cap) [BlockedFetch request]
requests
  where
    runOneInThread :: Int -> BlockedFetch request -> IO ()
runOneInThread Int
cap BlockedFetch request
request = do
      ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO ThreadId
forkOn Int
cap (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
        let rethrow :: SomeException -> IO ()
rethrow = [BlockedFetch request] -> SomeException -> IO ()
forall (req :: * -> *).
[BlockedFetch req] -> SomeException -> IO ()
rethrowFromBg [BlockedFetch request
request]
        IO () -> IO ()
forall a. IO a -> IO a
restore (BlockedFetch request -> IO ()
runOne BlockedFetch request
request) IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` SomeException -> IO ()
rethrow
    runOne :: BlockedFetch request -> IO ()
runOne (BlockedFetch request a
request ResultVar a
result) = do
      Either SomeException a
res <- request a -> IO (Either SomeException a)
forall a. request a -> IO (Either SomeException a)
run request a
request
      ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResultFromBg ResultVar a
result Either SomeException a
res


{- |
A version of 'asyncFetch' (actually 'asyncFetchWithDispatch') that
handles exceptions correctly.  You should use this instead of
'asyncFetch' or 'asyncFetchWithDispatch'.  The danger with
'asyncFetch' is that if an exception is thrown by @withService@, the
@inner@ action won't be executed, and we'll drop some data-fetches in
the same round.

'asyncFetchAcquireRelease' behaves like the following:

> asyncFetchAcquireRelease acquire release dispatch wait enqueue =
>   AsyncFetch $ \requests inner ->
>     bracket acquire release $ \service -> do
>       getResults <- mapM (submitFetch service enqueue) requests
>       dispatch service
>       inner
>       wait service
>       sequence_ getResults

except that @inner@ is run even if @acquire@, @enqueue@, or @dispatch@ throws,
/unless/ an async exception is received.
-}

asyncFetchAcquireRelease
  :: IO service
  -- ^ Resource acquisition for this datasource

  -> (service -> IO ())
  -- ^ Resource release

  -> (service -> IO ())
  -- ^ Dispatch all the pending requests and wait for the results

  -> (service -> IO ())
  -- ^ Wait for the results

  -> (forall a. service -> request a -> IO (IO (Either SomeException a)))
  -- ^ Submits an individual request to the service.

  -> State request
  -- ^ Currently unused.

  -> Flags
  -- ^ Currently unused.

  -> u
  -- ^ Currently unused.

  -> PerformFetch request

asyncFetchAcquireRelease :: IO service
-> (service -> IO ())
-> (service -> IO ())
-> (service -> IO ())
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
asyncFetchAcquireRelease
  IO service
acquire service -> IO ()
release service -> IO ()
dispatch service -> IO ()
wait forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue State request
_state Flags
_flags u
_si =
  ([BlockedFetch request] -> IO () -> IO ()) -> PerformFetch request
forall (req :: * -> *).
([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
AsyncFetch (([BlockedFetch request] -> IO () -> IO ())
 -> PerformFetch request)
-> ([BlockedFetch request] -> IO () -> IO ())
-> PerformFetch request
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch request]
requests IO ()
inner -> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
    Either SomeException service
r1 <- IO service -> IO (Either SomeException service)
forall a. IO a -> IO (Either SomeException a)
tryWithRethrow IO service
acquire
    case Either SomeException service
r1 of
      Left SomeException
err -> do IO () -> IO ()
forall a. IO a -> IO a
restore IO ()
inner; SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (SomeException
err :: SomeException)
      Right service
service -> do
        (IO () -> IO () -> IO ()) -> IO () -> IO () -> IO ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
finally (service -> IO ()
release service
service) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
restore (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          Either SomeException [IO ()]
r2 <- IO [IO ()] -> IO (Either SomeException [IO ()])
forall a. IO a -> IO (Either SomeException a)
tryWithRethrow (IO [IO ()] -> IO (Either SomeException [IO ()]))
-> IO [IO ()] -> IO (Either SomeException [IO ()])
forall a b. (a -> b) -> a -> b
$ do
            [IO ()]
getResults <- (BlockedFetch request -> IO (IO ()))
-> [BlockedFetch request] -> IO [IO ()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (service
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
forall service (request :: * -> *).
service
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
submitFetch service
service forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue) [BlockedFetch request]
requests
            service -> IO ()
dispatch service
service
            [IO ()] -> IO [IO ()]
forall (m :: * -> *) a. Monad m => a -> m a
return [IO ()]
getResults
          IO ()
inner  --  we assume this cannot throw, ensured by performFetches
          case Either SomeException [IO ()]
r2 of
            Left SomeException
err -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO (SomeException
err :: SomeException)
            Right [IO ()]
getResults -> do service -> IO ()
wait service
service; [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [IO ()]
getResults

-- | Used by 'asyncFetch' and 'syncFetch' to retrieve the results of
-- requests to a service.
submitFetch
  :: service
  -> (forall a. service -> request a -> IO (IO (Either SomeException a)))
  -> BlockedFetch request
  -> IO (IO ())
submitFetch :: service
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> BlockedFetch request
-> IO (IO ())
submitFetch service
service forall a. service -> request a -> IO (IO (Either SomeException a))
fetchFn (BlockedFetch request a
request ResultVar a
result)
  = (ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResult ResultVar a
result (Either SomeException a -> IO ())
-> IO (Either SomeException a) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (IO (Either SomeException a) -> IO ())
-> IO (IO (Either SomeException a)) -> IO (IO ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> service -> request a -> IO (IO (Either SomeException a))
forall a. service -> request a -> IO (IO (Either SomeException a))
fetchFn service
service request a
request

putResultFromBg :: ResultVar a -> Either SomeException a -> IO ()
putResultFromBg :: ResultVar a -> Either SomeException a -> IO ()
putResultFromBg ResultVar a
result Either SomeException a
r = do
  -- See comment on putResultFromChildThread
  -- We must set the allocation counter to 0 here in case there are more
  -- results in the batch.
  -- This is safe as we own this thread, and know that there
  -- is no allocation limits set.
  ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResultFromChildThread ResultVar a
result Either SomeException a
r
  Int64 -> IO ()
setAllocationCounter Int64
0

rethrowFromBg :: [BlockedFetch req] -> SomeException -> IO ()
rethrowFromBg :: [BlockedFetch req] -> SomeException -> IO ()
rethrowFromBg [BlockedFetch req]
requests SomeException
e = do
  (BlockedFetch req -> IO ()) -> [BlockedFetch req] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (SomeException -> BlockedFetch req -> IO ()
forall (r :: * -> *). SomeException -> BlockedFetch r -> IO ()
rethrow1bg SomeException
e) [BlockedFetch req]
requests
  SomeException -> IO ()
rethrowAsyncExceptions SomeException
e
  where
    rethrow1bg :: SomeException -> BlockedFetch r -> IO ()
rethrow1bg SomeException
e (BlockedFetch r a
_ ResultVar a
result) =
      ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResultFromBg ResultVar a
result (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left SomeException
e)


backgroundFetchAcquireReleaseMVar
  :: IO service
  -- ^ Resource acquisition for this datasource

  -> (service -> IO ())
  -- ^ Resource release

  -> (service -> Int -> MVar () -> IO ())
  -- ^ Dispatch all the pending requests and when ready trigger the given mvar

  -> (service -> IO ())
  -- ^ Process all requests

  -> (forall a. service -> request a -> IO (IO (Either SomeException a)))
  -- ^ Submits an individual request to the service.

  -> State request
  -- ^ Currently unused.

  -> Flags
  -- ^ Currently unused.

  -> u
  -- ^ Currently unused.

  -> PerformFetch request

backgroundFetchAcquireReleaseMVar :: IO service
-> (service -> IO ())
-> (service -> Int -> MVar () -> IO ())
-> (service -> IO ())
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
backgroundFetchAcquireReleaseMVar
  IO service
acquire service -> IO ()
release service -> Int -> MVar () -> IO ()
dispatch service -> IO ()
process forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue State request
_state Flags
_flags u
_si =
  ([BlockedFetch request] -> IO ()) -> PerformFetch request
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
BackgroundFetch (([BlockedFetch request] -> IO ()) -> PerformFetch request)
-> ([BlockedFetch request] -> IO ()) -> PerformFetch request
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch request]
requests -> do
    MVar ()
mvar <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
      (Int
cap, Bool
_) <- ThreadId -> IO (Int, Bool)
threadCapability (ThreadId -> IO (Int, Bool)) -> IO ThreadId -> IO (Int, Bool)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO ThreadId
myThreadId
      service
service <- IO service
acquire
      IO ()
getResults <- (do
        [IO ()]
results <- IO [IO ()] -> IO [IO ()]
forall a. IO a -> IO a
restore (IO [IO ()] -> IO [IO ()]) -> IO [IO ()] -> IO [IO ()]
forall a b. (a -> b) -> a -> b
$ (BlockedFetch request -> IO (IO ()))
-> [BlockedFetch request] -> IO [IO ()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (service -> BlockedFetch request -> IO (IO ())
submit service
service) [BlockedFetch request]
requests
        -- dispatch takes ownership of mvar, so we call it under `mask` to
        -- ensure that it can safely manage that resource.
        service -> Int -> MVar () -> IO ()
dispatch service
service Int
cap MVar ()
mvar
        IO () -> IO (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ([IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ [IO ()]
results)) IO (IO ()) -> IO () -> IO (IO ())
forall a b. IO a -> IO b -> IO a
`onException` service -> IO ()
release service
service
      -- now spawn off a background thread to wait on the dispatch to finish
      ThreadId
_tid <- Int -> IO () -> IO ThreadId
forkOn Int
cap (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
        MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
mvar
          -- todo: it is possible that we would want to do
          -- this processResults on the main scheduler thread for performance
          -- which might reduce thread switching, especially for large batches
          -- but for now this seems to work just fine
        let rethrow :: SomeException -> IO ()
rethrow = [BlockedFetch request] -> SomeException -> IO ()
forall (req :: * -> *).
[BlockedFetch req] -> SomeException -> IO ()
rethrowFromBg [BlockedFetch request]
requests
        ()
_ <- IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
finally
          (IO () -> IO ()
forall a. IO a -> IO a
restore (service -> IO ()
process service
service IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
getResults) IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` SomeException -> IO ()
rethrow)
          (service -> IO ()
release service
service IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` SomeException -> IO ()
rethrow)
        () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
  where
    submit :: service -> BlockedFetch request -> IO (IO ())
submit service
service (BlockedFetch request a
request ResultVar a
result) =
      (ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResultFromBg ResultVar a
result (Either SomeException a -> IO ())
-> IO (Either SomeException a) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) (IO (Either SomeException a) -> IO ())
-> IO (IO (Either SomeException a)) -> IO (IO ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> service -> request a -> IO (IO (Either SomeException a))
forall a. service -> request a -> IO (IO (Either SomeException a))
enqueue service
service request a
request


{- |
A version of 'backgroundFetchAcquireReleaseMVar' where the dispatch function
is given a 'StablePtr PrimMVar' which is more useful for C based APIs.
-}

backgroundFetchAcquireRelease
  :: IO service
  -- ^ Resource acquisition for this datasource

  -> (service -> IO ())
  -- ^ Resource release

  -> (service -> Int -> StablePtr PrimMVar -> IO ())
  -- ^ Dispatch all the pending requests and when ready trigger the given mvar

  -> (service -> IO ())
  -- ^ Process all requests

  -> (forall a. service -> request a -> IO (IO (Either SomeException a)))
  -- ^ Submits an individual request to the service.

  -> State request
  -- ^ Currently unused.

  -> Flags
  -- ^ Currently unused.

  -> u
  -- ^ Currently unused.

  -> PerformFetch request

backgroundFetchAcquireRelease :: IO service
-> (service -> IO ())
-> (service -> Int -> StablePtr PrimMVar -> IO ())
-> (service -> IO ())
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
backgroundFetchAcquireRelease
  IO service
a service -> IO ()
r service -> Int -> StablePtr PrimMVar -> IO ()
dispatch = IO service
-> (service -> IO ())
-> (service -> Int -> MVar () -> IO ())
-> (service -> IO ())
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
forall service (request :: * -> *) u.
IO service
-> (service -> IO ())
-> (service -> Int -> MVar () -> IO ())
-> (service -> IO ())
-> (forall a.
    service -> request a -> IO (IO (Either SomeException a)))
-> State request
-> Flags
-> u
-> PerformFetch request
backgroundFetchAcquireReleaseMVar
                 IO service
a
                 service -> IO ()
r
                 (\service
s Int
c MVar ()
mvar -> do
                     StablePtr PrimMVar
sp <- MVar () -> IO (StablePtr PrimMVar)
newStablePtrPrimMVar MVar ()
mvar
                     service -> Int -> StablePtr PrimMVar -> IO ()
dispatch service
s Int
c StablePtr PrimMVar
sp)