module Spark.Core.Internal.ContextInternal(
FinalResult,
inputSourcesRead,
prepareComputation,
buildComputationGraph,
performGraphTransforms,
getTargetNodes,
getObservables,
insertSourceInfo,
updateCache
) where
import Control.Monad.State(get, put)
import Control.Monad(when)
import Data.Text(pack)
import Data.Maybe(mapMaybe, catMaybes)
import Data.Either(isRight)
import Data.Foldable(toList)
import Control.Arrow((&&&))
import Formatting
import qualified Data.Map.Strict as M
import qualified Data.HashMap.Strict as HM
import Spark.Core.Dataset
import Spark.Core.Try
import Spark.Core.Row
import Spark.Core.Types
import Spark.Core.StructuresInternal(NodeId, NodePath, ComputationID(..))
import Spark.Core.Internal.Caching
import Spark.Core.Internal.CachingUntyped
import Spark.Core.Internal.ContextStructures
import Spark.Core.Internal.Client
import Spark.Core.Internal.ComputeDag
import Spark.Core.Internal.PathsUntyped
import Spark.Core.Internal.Pruning
import Spark.Core.Internal.OpFunctions(hdfsPath, updateSourceStamp)
import Spark.Core.Internal.OpStructures(HdfsPath(..), DataInputStamp)
import Spark.Core.Internal.Paths()
import Spark.Core.Internal.DAGFunctions(buildVertexList, graphMapVertices)
import Spark.Core.Internal.DAGStructures
import Spark.Core.Internal.DatasetFunctions
import Spark.Core.Internal.DatasetStructures
import Spark.Core.Internal.Utilities
type FinalResult = Either NodeComputationFailure NodeComputationSuccess
prepareComputation ::
ComputeGraph ->
SparkStatePure (Try Computation)
prepareComputation cg = do
session <- get
let compt = do
cg2 <- performGraphTransforms session cg
_buildComputation session cg2
when (isRight compt) _increaseCompCounter
return compt
insertSourceInfo :: ComputeGraph -> [(HdfsPath, DataInputStamp)] -> Try ComputeGraph
insertSourceInfo cg l = do
let m = M.fromList l
let g = computeGraphToGraph cg
g2 <- graphMapVertices g (_updateVertex2 m)
let cg2 = graphToComputeGraph g2
return cg2
inputSourcesRead :: ComputeGraph -> [HdfsPath]
inputSourcesRead cg =
mapMaybe (hdfsPath.nodeOp.vertexData) (toList (cdVertices cg))
buildComputationGraph :: ComputeNode loc a -> Try ComputeGraph
buildComputationGraph ld = do
cg <- tryEither $ buildCGraph (untyped ld)
assignPathsUntyped cg
performGraphTransforms :: SparkSession -> ComputeGraph -> Try ComputeGraph
performGraphTransforms session cg = do
let tiedCg = tieNodes cg
let g = computeGraphToGraph tiedCg
let conf = ssConf session
let pruned = if confUseNodePrunning conf
then pruneGraphDefault (ssNodeCache session) g
else g
let acg = fillAutoCache cachingType autocacheGen pruned
g' <- tryEither acg
failures <- tryEither $ checkCaching g' cachingType
case failures of
[] -> return (graphToComputeGraph g')
_ -> tryError $ sformat ("Found some caching errors: "%sh) failures
_buildComputation :: SparkSession -> ComputeGraph -> Try Computation
_buildComputation session cg =
let sid = ssId session
cid = (ComputationID . pack . show . ssCommandCounter) session
allNodes = vertexData <$> toList (cdVertices cg)
terminalNodes = vertexData <$> toList (cdOutputs cg)
terminalNodePaths = nodePath <$> terminalNodes
terminalNodeIds = nodeId <$> terminalNodes
in case terminalNodePaths of
[p] ->
return $ Computation sid cid allNodes [p] p terminalNodeIds
_ -> tryError $ sformat ("Programming error in _build1: cg="%sh) cg
_updateVertex :: M.Map HdfsPath DataInputStamp -> UntypedNode -> Try UntypedNode
_updateVertex m un =
let no = nodeOp un in case hdfsPath no of
Just p -> case M.lookup p m of
Just dis -> updateSourceStamp no dis <&> updateNodeOp un
Nothing -> tryError $ "_updateVertex: Expected to find path " <> show' p
Nothing -> pure un
_updateVertex2 ::
M.Map HdfsPath DataInputStamp ->
UntypedNode ->
[(UntypedNode, StructureEdge)] ->
Try UntypedNode
_updateVertex2 m un _ =
_updateVertex m un
_increaseCompCounter :: SparkStatePure ()
_increaseCompCounter = get >>= \session ->
let
curr = ssCommandCounter session
session2 = session { ssCommandCounter = curr + 1 }
in put session2
_gatherNodes :: LocalData a -> Try [UntypedNode]
_gatherNodes = tryEither . buildVertexList . untyped
_extract1 :: FinalResult -> DataType -> Try Cell
_extract1 (Left nf) _ = tryError $ sformat ("got an error "%shown) nf
_extract1 (Right ncs) dt = tryEither $ jsonToCell dt (ncsData ncs)
getTargetNodes :: (HasCallStack) => Computation -> [UntypedLocalData]
getTargetNodes comp =
let
fun2 :: (HasCallStack) => UntypedNode -> UntypedLocalData
fun2 n = case asLocalObservable <$> castLocality n of
Right (Right x) -> x
err -> failure $ sformat ("_getNodes:fun2: err="%shown%" n="%shown) err n
finalNodeNames = cTerminalNodes comp
dct = M.fromList $ (nodePath &&& id) <$> cNodes comp
untyped2 = finalNodeNames <&> \n ->
let err = failure $ sformat ("Could not find "%sh%" in "%sh) n dct
in M.findWithDefault err n dct
in fun2 <$> untyped2
getObservables :: Computation -> [UntypedLocalData]
getObservables comp =
let fun n = case asLocalObservable <$> castLocality n of
Right (Right x) -> return x
_ -> Nothing
in catMaybes $ fun <$> cNodes comp
updateCache :: ComputationID -> [(NodeId, NodePath, DataType, PossibleNodeStatus)] -> SparkStatePure ([(NodeId, Try Cell)], [(NodePath, NodeCacheStatus)])
updateCache c l = do
l' <- sequence $ _updateCache1 c <$> l
return (catMaybes (fst <$> l'), catMaybes (snd <$> l'))
_updateCache1 :: ComputationID -> (NodeId, NodePath, DataType, PossibleNodeStatus) -> SparkStatePure (Maybe (NodeId, Try Cell), Maybe (NodePath, NodeCacheStatus))
_updateCache1 cid (nid, p, dt, status) =
case status of
(NodeFinishedSuccess (Just s) _) -> do
updated <- _insertCacheUpdate cid nid p NodeCacheSuccess
let res2 = _extract1 (pure s) dt
return (Just (nid, res2), (p, ) <$> updated)
(NodeFinishedFailure e) -> do
updated <- _insertCacheUpdate cid nid p NodeCacheError
let res2 = _extract1 (Left e) dt
return (Just (nid, res2), (p, ) <$> updated)
NodeRunning -> do
updated <- _insertCacheUpdate cid nid p NodeCacheRunning
return (Nothing, (p, ) <$> updated)
_ -> return (Nothing, Nothing)
_insertCacheUpdate :: ComputationID -> NodeId -> NodePath -> NodeCacheStatus -> SparkStatePure (Maybe NodeCacheStatus)
_insertCacheUpdate cid nid p s = do
session <- get
let m = ssNodeCache session
let currentStatus = nciStatus <$> HM.lookup nid m
if currentStatus == Just s
then return Nothing
else do
let v = NodeCacheInfo {
nciStatus = s,
nciComputation = cid,
nciPath = p }
let m' = HM.insert nid v m
let session' = session { ssNodeCache = m' }
put session'
return $ Just s