-- Copyright (c) 2014-present, Facebook, Inc.
-- All rights reserved.
-- This source code is distributed under the terms of a BSD license,
-- found in the LICENSE file.

{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE DeriveDataTypeable #-}

-- | Implementation of data-fetching operations.  Most users should
-- import "Haxl.Core" instead.
module Haxl.Core.Fetch
  ( dataFetch
  , dataFetchWithShow
  , uncachedRequest
  , cacheResult
  , dupableCacheRequest
  , cacheResultWithShow
  , cacheRequest
  , performFetches
  , performRequestStore
  , ShowReq
  ) where

import Control.Concurrent.STM
import Control.Exception as Exception
import Control.Monad
import Data.Either
import Data.Hashable
import Data.IORef
import Data.Int
import Data.List
#if __GLASGOW_HASKELL__ < 804
import Data.Monoid
import Data.Proxy
import Data.Typeable
import Data.Text (Text)
import qualified Data.Text as Text
import Text.Printf
import GHC.Stack

import Haxl.Core.DataSource
import Haxl.Core.DataCache as DataCache
import Haxl.Core.Exception
import Haxl.Core.Flags
import Haxl.Core.Monad
import Haxl.Core.Profile
import Haxl.Core.RequestStore
import Haxl.Core.ShowP
import Haxl.Core.Stats
import Haxl.Core.StateStore
import Haxl.Core.Util

-- -----------------------------------------------------------------------------
-- Data fetching and caching

-- | Possible responses when checking the cache.
data CacheResult u w a
  -- | The request hadn't been seen until now.
  = Uncached
       (ResultVar a)
       {-# UNPACK #-} !(IVar u w a)
       {-# UNPACK #-} !CallId

  -- | The request has been seen before, but its result has not yet been
  -- fetched.
  | CachedNotFetched
      {-# UNPACK #-} !(IVar u w a)
       {-# UNPACK #-} !CallId

  -- | The request has been seen before, and its result has already been
  -- fetched.
  | Cached (ResultVal a w)
           {-# UNPACK #-} !CallId

-- | Show functions for request and its result.
type ShowReq r a = (r a -> String, a -> String)

-- Note [showFn]
-- Occasionally, for tracing purposes or generating exceptions, we need to
-- call 'show' on the request in a place where we *cannot* have a Show
-- dictionary. (Because the function is a worker which is called by one of
-- the *WithShow variants that take explicit show functions via a ShowReq
-- argument.) None of the functions that does this is exported, so this is
-- hidden from the Haxl user.

  :: forall r a u w.
     (DataSource u r, Typeable (r a))
  => (r a -> String)    -- See Note [showFn]
  -> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
  -> Env u w -> r a -> IO (CacheResult u w a)
cachedWithInsert showFn insertFn env@Env{..} req = do
    doFetch = do
      ivar <- newIVar
      k <- nextCallId env
      let !rvar = stdResultVar ivar completions submittedReqsRef flags
            (Proxy :: Proxy r)
      insertFn req (DataCacheItem ivar k) dataCache
      return (Uncached rvar ivar k)
  mbRes <- DataCache.lookup req dataCache
  case mbRes of
    Nothing -> doFetch
    Just (DataCacheItem i@IVar{ivarRef = cr} k) -> do
      e <- readIORef cr
      case e of
        IVarEmpty _ -> do
          ivar <- withCurrentCCS i
          return (CachedNotFetched ivar k)
        IVarFull r -> do
          ifTrace flags 3 $ putStrLn $ case r of
            ThrowIO{} -> "Cached error: " ++ showFn req
            ThrowHaxl{} -> "Cached error: " ++ showFn req
            Ok{} -> "Cached request: " ++ showFn req
          return (Cached r k)

-- | Make a ResultVar with the standard function for sending a CompletionReq
-- to the scheduler. This is the function will be executed when the fetch
-- completes.
  :: forall r a u w. (DataSourceName r, Typeable r)
  => IVar u w a
  -> TVar [CompleteReq u w]
  -> IORef ReqCountMap
  -> Flags
  -> Proxy r
  -> ResultVar a
stdResultVar ivar completions ref flags p =
  mkResultVar $ \r isChildThread -> do
    allocs <- if isChildThread
        -- In a child thread, return the current allocation counter too,
        -- for correct tracking of allocation.
        return 0
      (LogicBug (ReadingCompletionsFailedFetch (dataSourceName p))) $ do
      cs <- readTVar completions
      writeTVar completions (CompleteReq r ivar allocs : cs)
    -- Decrement the counter as request has finished. Do this after updating the
    -- completions TVar so that if the scheduler is tracking what was being
    -- waited on it gets a consistent view.
    ifReport flags 1 $
      atomicModifyIORef' ref (\m -> (subFromCountMap p 1 m, ()))
{-# INLINE stdResultVar #-}

-- | Record the call stack for a data fetch in the Stats.  Only useful
-- when profiling.
logFetch :: Env u w -> (r a -> String) -> r a -> CallId -> IO ()
logFetch env showFn req fid = do
  ifReport (flags env) 5 $ do
    stack <- currentCallStack
    modifyIORef' (statsRef env) $ \(Stats s) ->
      Stats (FetchCall (showFn req) stack fid : s)
logFetch _ _ _ _ = return ()

-- | Performs actual fetching of data for a 'Request' from a 'DataSource'.
dataFetch :: (DataSource u r, Request r a) => r a -> GenHaxl u w a
dataFetch = dataFetchWithInsert show DataCache.insert

-- | Performs actual fetching of data for a 'Request' from a 'DataSource', using
-- the given show functions for requests and their results.
  :: (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a))
  => ShowReq r a
  -> r a -> GenHaxl u w a
dataFetchWithShow (showReq, showRes) = dataFetchWithInsert showReq
  (DataCache.insertWithShow showReq showRes)

-- | Performs actual fetching of data for a 'Request' from a 'DataSource', using
-- the given function to insert requests in the cache.
  :: forall u w r a
   . (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a))
  => (r a -> String)    -- See Note [showFn]
  -> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
  -> r a
  -> GenHaxl u w a
dataFetchWithInsert showFn insertFn req =
  GenHaxl $ \env@Env{..} -> do
  -- First, check the cache
  res <- cachedWithInsert showFn insertFn env req
  case res of
    -- This request has not been seen before
    Uncached rvar ivar fid -> do
      logFetch env showFn req fid
      ifProfiling flags $ addProfileFetch env req fid False
      -- Check whether the data source wants to submit requests
      -- eagerly, or batch them up.
      let blockedFetch = BlockedFetch req rvar
      let blockedFetchI = BlockedFetchInternal fid
      case schedulerHint userEnv :: SchedulerHint r of
        SubmitImmediately ->
          performFetches env [BlockedFetches [blockedFetch] [blockedFetchI]]
        TryToBatch ->
          -- add the request to the RequestStore and continue
          modifyIORef' reqStoreRef $ \bs ->
            addRequest blockedFetch blockedFetchI bs
      return $ Blocked ivar (Return ivar)

    -- Seen before but not fetched yet.  We're blocked, but we don't have
    -- to add the request to the RequestStore.
    CachedNotFetched ivar fid -> do
      ifProfiling flags $ addProfileFetch env req fid True
      return $ Blocked ivar (Return ivar)

    -- Cached: either a result, or an exception
    Cached r fid -> do
      ifProfiling flags $ addProfileFetch env req fid True
      done r

-- | A data request that is not cached.  This is not what you want for
-- normal read requests, because then multiple identical requests may
-- return different results, and this invalidates some of the
-- properties that we expect Haxl computations to respect: that data
-- fetches can be arbitrarily reordered, and identical requests can be
-- commoned up, for example.
-- 'uncachedRequest' is useful for performing writes, provided those
-- are done in a safe way - that is, not mixed with reads that might
-- conflict in the same Haxl computation.
-- if we are recording or running a test, we fallback to using dataFetch
-- This allows us to store the request in the cache when recording, which
-- allows a transparent run afterwards. Without this, the test would try to
-- call the datasource during testing and that would be an exception.
 :: forall a u w (r :: * -> *). (DataSource u r, Request r a)
 => r a -> GenHaxl u w a
uncachedRequest req = do
  flg <- env flags
  subRef <- env submittedReqsRef
  if recording flg /= 0
    then dataFetch req
    else GenHaxl $ \e@Env{..} -> do
      ivar <- newIVar
      k <- nextCallId e
      let !rvar = stdResultVar ivar completions subRef flg (Proxy :: Proxy r)
      modifyIORef' reqStoreRef $ \bs ->
        addRequest (BlockedFetch req rvar) (BlockedFetchInternal k) bs
      return $ Blocked ivar (Return ivar)

-- | Transparently provides caching. Useful for datasources that can
-- return immediately, but also caches values.  Exceptions thrown by
-- the IO operation (except for asynchronous exceptions) are
-- propagated into the Haxl monad and can be caught by 'catch' and
-- 'try'.
cacheResult :: Request r a => r a -> IO a -> GenHaxl u w a
cacheResult = cacheResultWithInsert show DataCache.insert

-- | Transparently provides caching in the same way as 'cacheResult', but uses
-- the given functions to show requests and their results.
  :: (Eq (r a), Hashable (r a), Typeable (r a))
  => ShowReq r a -> r a -> IO a -> GenHaxl u w a
cacheResultWithShow (showReq, showRes) = cacheResultWithInsert showReq
  (DataCache.insertWithShow showReq showRes)

-- Transparently provides caching, using the given function to insert requests
-- into the cache.
  :: Typeable (r a)
  => (r a -> String)    -- See Note [showFn]
  -> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
  -> r a
  -> IO a
  -> GenHaxl u w a
cacheResultWithInsert showFn insertFn req val = GenHaxl $ \e@Env{..} -> do
  mbRes <- DataCache.lookup req dataCache
  case mbRes of
    Nothing -> do
      eitherResult <- Exception.try val
      case eitherResult of
        Left e -> rethrowAsyncExceptions e
        _ -> return ()
      let result = eitherToResultThrowIO eitherResult
      ivar <- newFullIVar result
      k <- nextCallId e
      insertFn req (DataCacheItem ivar k) dataCache
      done result
    Just (DataCacheItem IVar{ivarRef = cr} _) -> do
      e <- readIORef cr
      case e of
        IVarEmpty _ -> corruptCache
        IVarFull r -> done r
    corruptCache = raise . DataSourceError $ Text.concat
      [ Text.pack (showFn req)
      , " has a corrupted cache value: these requests are meant to"
      , " return immediately without an intermediate value. Either"
      , " the cache was updated incorrectly, or you're calling"
      , " cacheResult on a query that involves a blocking fetch."

-- | Inserts a request/result pair into the cache. Throws an exception
-- if the request has already been issued, either via 'dataFetch' or
-- 'cacheRequest'.
-- This can be used to pre-populate the cache when running tests, to
-- avoid going to the actual data source and ensure that results are
-- deterministic.
  :: Request req a => req a -> Either SomeException a -> GenHaxl u w ()
cacheRequest request result = GenHaxl $ \e@Env{..} -> do
  mbRes <- DataCache.lookup request dataCache
  case mbRes of
    Nothing -> do
      cr <- newFullIVar (eitherToResult result)
      k <- nextCallId e
      DataCache.insert request (DataCacheItem cr k) dataCache
      return (Done ())

    -- It is an error if the request is already in the cache.
    -- We can't test whether the cached result is the same without adding an
    -- Eq constraint, and we don't necessarily have Eq for all results.
    _other -> raise $
      DataSourceError "cacheRequest: request is already in the cache"

-- | Similar to @cacheRequest@ but doesn't throw an exception if the key
-- already exists in the cache.
-- If this function is called twice to cache the same Haxl request, the first
-- value will be discarded and overwritten with the second value.
-- Useful e.g. for unit tests
  :: Request req a => req a -> Either SomeException a -> GenHaxl u w ()
dupableCacheRequest request result = GenHaxl $ \e@Env{..} -> do
  cr <- newFullIVar (eitherToResult result)
  k <- nextCallId e
  DataCache.insert request (DataCacheItem cr k) dataCache
  return (Done ())

   :: forall u w. Env u w -> RequestStore u -> IO ()
performRequestStore env reqStore =
  performFetches env (contents reqStore)

-- | Issues a batch of fetches in a 'RequestStore'. After
-- 'performFetches', all the requests in the 'RequestStore' are
-- complete, and all of the 'ResultVar's are full.
  :: forall u w. Env u w -> [BlockedFetches u] -> IO ()
performFetches env@Env{flags=f, statsRef=sref, statsBatchIdRef=sbref} jobs = do
  t0 <- getTimestamp

  ifTrace f 3 $
    forM_ jobs $ \(BlockedFetches reqs _) ->
      forM_ reqs $ \(BlockedFetch r _) -> putStrLn (showp r)

    applyFetch i bfs@(BlockedFetches (reqs :: [BlockedFetch r]) _) =
      case stateGet (states env) of
        Nothing ->
          return (FetchToDo reqs (SyncFetch (mapM_ (setError e))))
           e :: ShowP req => req a -> DataSourceError
           e req = DataSourceError $ "data source not initialized: " <> dsName
                  <> ": "
                  <> Text.pack (showp req)
        Just state ->
          return $ FetchToDo reqs
            $ (if report f >= 2
                then wrapFetchInStats
                        (userEnv env)
                        (length reqs)
                else id)
            $ wrapFetchInTrace i (length reqs) dsName
            $ wrapFetchInCatch reqs
            $ fetch state f (userEnv env)
        dsName = dataSourceName (Proxy :: Proxy r)

  fetches <- zipWithM applyFetch [0..] jobs

  scheduleFetches fetches (submittedReqsRef env) (flags env)

  t1 <- getTimestamp
  let roundtime = fromIntegral (t1 - t0) / 1000000 :: Double

  ifTrace f 1 $
    printf "Batch data fetch done (%.4fs)\n" (realToFrac roundtime :: Double)

data FetchToDo where
    :: forall (req :: * -> *). (DataSourceName req, Typeable req)
    => [BlockedFetch req] -> PerformFetch req -> FetchToDo

-- Catch exceptions arising from the data source and stuff them into
-- the appropriate requests.  We don't want any exceptions propagating
-- directly from the data sources, because we want the exception to be
-- thrown by dataFetch instead.
wrapFetchInCatch :: [BlockedFetch req] -> PerformFetch req -> PerformFetch req
wrapFetchInCatch reqs fetch =
  case fetch of
    SyncFetch f ->
      SyncFetch $ \reqs -> f reqs `Exception.catch` handler
    AsyncFetch f ->
      AsyncFetch $ \reqs io -> f reqs io `Exception.catch` handler
      -- this might be wrong: if the outer 'fio' throws an exception,
      -- then we don't know whether we have executed the inner 'io' or
      -- not.  If not, then we'll likely get some errors about "did
      -- not set result var" later, because we haven't executed some
      -- data fetches.  But we can't execute 'io' in the handler,
      -- because we might have already done it.  It isn't possible to
      -- do it completely right here, so we have to rely on data
      -- sources themselves to catch (synchronous) exceptions.  Async
      -- exceptions aren't a problem because we're going to rethrow
      -- them all the way to runHaxl anyway.
    BackgroundFetch f ->
      BackgroundFetch $ \reqs -> f reqs `Exception.catch` handler
    handler :: SomeException -> IO ()
    handler e = do
      rethrowAsyncExceptions e
      mapM_ (forceError e) reqs

    -- Set the exception even if the request already had a result.
    -- Otherwise we could be discarding an exception.
    forceError e (BlockedFetch _ rvar) =
      putResult rvar (except e)

data FailureCount = FailureCount
  { failureCountStandard :: {-# UNPACK #-} !Int
  , failureCountIgnored :: {-# UNPACK #-} !Int

#if __GLASGOW_HASKELL__ >= 804
instance Semigroup FailureCount where
  (<>) = mappend

instance Monoid FailureCount where
  mempty = FailureCount 0 0
  mappend (FailureCount s1 i1) (FailureCount s2 i2)
    = FailureCount (s1+s2) (i1+i2)

  :: DataSource u req
  => u
  -> IORef Stats
  -> IORef Int
  -> Text
  -> Int
  -> BlockedFetches u
  -> PerformFetch req
  -> PerformFetch req
  (BlockedFetches _reqs reqsI)
  perform = do
  case perform of
    SyncFetch f ->
      SyncFetch $ \reqs -> do
        bid <- newBatchId
        fail_ref <- newIORef mempty
        (t0,t,alloc,_) <- statsForIO (f (map (addFailureCount u fail_ref) reqs))
        failures <- readIORef fail_ref
        updateFetchStats bid allFids t0 t alloc batchSize failures
    AsyncFetch f -> do
      AsyncFetch $ \reqs inner -> do
        bid <- newBatchId
        inner_r <- newIORef (0, 0)
        fail_ref <- newIORef mempty
        let inner' = do
              (_,t,alloc,_) <- statsForIO inner
              writeIORef inner_r (t,alloc)
            reqs' = map (addFailureCount u fail_ref) reqs
        (t0, totalTime, totalAlloc, _) <- statsForIO (f reqs' inner')
        (innerTime, innerAlloc) <- readIORef inner_r
        failures <- readIORef fail_ref
        updateFetchStats bid allFids t0 (totalTime - innerTime)
          (totalAlloc - innerAlloc) batchSize failures
    BackgroundFetch io -> do
      BackgroundFetch $ \reqs -> do
        bid <- newBatchId
        startTime <- getTimestamp
        io (zipWith (addTimer u bid startTime) reqs reqsI)
    allFids = map (\(BlockedFetchInternal k) -> k) reqsI
    newBatchId = atomicModifyIORef' batchIdRef $ \x -> (x+1,x+1)
    statsForIO io = do
      prevAlloc <- getAllocationCounter
      (t0,t,a) <- time io
      postAlloc <- getAllocationCounter
      return (t0,t, fromIntegral $ prevAlloc - postAlloc, a)

    calcFailure _u _r (Right _) = mempty
    calcFailure u r (Left e) = case classifyFailure u r e of
      StandardFailure -> mempty { failureCountStandard = 1 }
      IgnoredForStatsFailure -> mempty { failureCountIgnored = 1 }

      (BlockedFetch req (ResultVar fn))
      (BlockedFetchInternal fid) =
        BlockedFetch req $ ResultVar $ \result isChildThread -> do
          t1 <- getTimestamp
          -- We cannot measure allocation easily for BackgroundFetch. Here we
          -- just attribute all allocation to the last
          -- `putResultFromChildThread` and use 0 for the others.
          -- While the individual allocations may not be correct,
          -- the total sum and amortized allocation are still meaningful.
          -- see Note [tracking allocation in child threads]
          allocs <- if isChildThread then getAllocationCounter else return 0
          updateFetchStats bid [fid] t0 (t1 - t0)
            (negate allocs)
            1 -- batch size: we don't know if this is a batch or not
            (calcFailure u req result) -- failures
          fn result isChildThread

      :: Int
      -> [CallId]
      -> Timestamp
      -> Microseconds
      -> Int64
      -> Int
      -> FailureCount
      -> IO ()
    updateFetchStats bid fids start time space batch FailureCount{..} = do
      let this = FetchStats { fetchDataSource = dataSource
                            , fetchBatchSize = batch
                            , fetchStart = start
                            , fetchDuration = time
                            , fetchSpace = space
                            , fetchFailures = failureCountStandard
                            , fetchIgnoredFailures = failureCountIgnored
                            , fetchBatchId = bid
                            , fetchIds = fids }
      atomicModifyIORef' statsRef $ \(Stats fs) -> (Stats (this : fs), ())

    addFailureCount :: DataSource u r
      => u -> IORef FailureCount -> BlockedFetch r -> BlockedFetch r
    addFailureCount u ref (BlockedFetch req (ResultVar fn)) =
      BlockedFetch req $ ResultVar $ \result isChildThread -> do
        let addFailures r = (r <> calcFailure u req result, ())
        when (isLeft result) $ atomicModifyIORef' ref addFailures
        fn result isChildThread

  :: Int
  -> Int
  -> Text
  -> PerformFetch req
  -> PerformFetch req
wrapFetchInTrace i n dsName f =
  case f of
    SyncFetch io -> SyncFetch (wrapF "Sync" io)
    AsyncFetch fio -> AsyncFetch (wrapF "Async" . fio . unwrapF "Async")
    d = Text.unpack dsName
    wrapF :: String -> IO a -> IO a
    wrapF ty = bracket_ (traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
                        (traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
    unwrapF :: String -> IO a -> IO a
    unwrapF ty = bracket_ (traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
                          (traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
wrapFetchInTrace _ _ _ f = f

time :: IO a -> IO (Timestamp,Microseconds,a)
time io = do
  t0 <- getTimestamp
  a <- io
  t1 <- getTimestamp
  return (t0, t1 - t0, a)

-- | Start all the async fetches first, then perform the sync fetches before
-- getting the results of the async fetches.
scheduleFetches :: [FetchToDo] -> IORef ReqCountMap -> Flags -> IO ()
scheduleFetches fetches ref flags = do
  -- update ReqCountmap for these fetches
  ifReport flags 1 $ sequence_
    [ atomicModifyIORef' ref $
        \m -> (addToCountMap (Proxy :: Proxy r) (length reqs) m, ())
    | FetchToDo (reqs :: [BlockedFetch r]) _f <- fetches

  ifTrace flags 1 $ printf "Batch data fetch round: %s\n" $
     intercalate (", "::String) $
        map (\(c, n, ds) -> printf "%s %s %d" n ds c) stats

  async_fetches sync_fetches

  fetchName :: forall req . PerformFetch req -> String
  fetchName (BackgroundFetch _) = "background"
  fetchName (AsyncFetch _) = "async"
  fetchName (SyncFetch _) = "sync"

  srcName :: forall req . (DataSourceName req) => [BlockedFetch req] -> String
  srcName _ = Text.unpack $ dataSourceName (Proxy :: Proxy req)

  stats = [(length reqs, fetchName f, srcName reqs)
          | FetchToDo reqs f <- fetches]

  fully_async_fetches :: IO ()
  fully_async_fetches = sequence_
    [f reqs | FetchToDo reqs (BackgroundFetch f) <- fetches]

  async_fetches :: IO () -> IO ()
  async_fetches = compose
    [f reqs | FetchToDo reqs (AsyncFetch f) <- fetches]

  sync_fetches :: IO ()
  sync_fetches = sequence_
    [f reqs | FetchToDo reqs (SyncFetch f) <- fetches]

-- | An exception thrown when reading from datasources fails
data ReadingCompletionsFailedFetch = ReadingCompletionsFailedFetch Text
  deriving Show

instance Exception ReadingCompletionsFailedFetch