module Scientific.Workflow.Internal.Builder
( node
, node'
, nodeS
, nodeP
, nodeP'
, nodePS
, nodeSharedP
, nodeSharedP'
, nodeSharedPS
, link
, (~>)
, path
, namespace
, buildWorkflow
, buildWorkflowPart
, mkDAG
, mkProc
) where
import Control.Monad.Identity (runIdentity)
import Data.Monoid ((<>))
import Control.Lens ((^.), (%~), _1, _2, _3, (&))
import Control.Monad.Trans.Except (throwE)
import Control.Monad.State (lift, liftIO, (>=>), foldM_, execState, modify, State)
import Control.Monad.Reader (ask)
import Control.Concurrent.MVar
import Control.Concurrent (forkIO)
import qualified Data.Text as T
import Data.List.Split (chunksOf)
import Data.Yaml (ToJSON)
import Data.Graph.Inductive.Graph ( mkGraph, lab, labNodes, outdeg, nmap
, lpre, labnfilter, nfilter, gmap, suc )
import Data.List (sortBy, foldl')
import Data.Maybe (fromJust, fromMaybe)
import qualified Data.ByteString as B
import Data.Ord (comparing)
import qualified Data.Map as M
import Control.Concurrent.Async.Lifted (mapConcurrently)
import Language.Haskell.TH
import Control.Monad.Catch (try)
import Scientific.Workflow.Types
import Scientific.Workflow.Internal.Builder.Types
import Scientific.Workflow.Internal.DB
import Scientific.Workflow.Internal.Utils (sendLog, Log(..), runRemote, RemoteOpts(..))
nodeWith :: ToExpQ function
=> FunctionConfig
-> PID
-> function
-> State Attribute ()
-> Builder ()
nodeWith conf pid fn setAttr = modify $ _1 %~ (newNode:)
where
attr = execState setAttr defaultAttribute{_functionConfig = conf}
newNode = Node pid (toExpQ fn) attr
node :: ToExpQ fun
=> PID
-> fun
-> State Attribute ()
-> Builder ()
node = nodeWith $ FunctionConfig None IOAction
node' :: ToExpQ fun
=> PID
-> fun
-> State Attribute ()
-> Builder ()
node' = nodeWith $ FunctionConfig None Pure
nodeS :: ToExpQ fun
=> PID
-> fun
-> State Attribute () -> Builder ()
nodeS = nodeWith $ FunctionConfig None Stateful
nodeP :: ToExpQ fun
=> Int
-> PID
-> fun
-> State Attribute () -> Builder ()
nodeP n = nodeWith $ FunctionConfig (Standard n) IOAction
nodeP' :: ToExpQ fun => Int -> PID -> fun -> State Attribute () -> Builder ()
nodeP' n = nodeWith $ FunctionConfig (Standard n) Pure
nodePS :: ToExpQ fun => Int -> PID -> fun -> State Attribute () -> Builder ()
nodePS n = nodeWith $ FunctionConfig (Standard n) Stateful
nodeSharedP :: ToExpQ fun
=> Int
-> PID
-> fun
-> State Attribute () -> Builder ()
nodeSharedP n = nodeWith $ FunctionConfig (ShareData n) IOAction
nodeSharedP' :: ToExpQ fun => Int -> PID -> fun -> State Attribute () -> Builder ()
nodeSharedP' n = nodeWith $ FunctionConfig (ShareData n) Pure
nodeSharedPS :: ToExpQ fun => Int -> PID -> fun -> State Attribute () -> Builder ()
nodeSharedPS n = nodeWith $ FunctionConfig (ShareData n) Stateful
link :: [PID] -> PID -> Builder ()
link xs t = modify $ _2 %~ (zipWith3 Edge xs (repeat t) [0..] ++)
(~>) :: [PID] -> PID -> Builder ()
(~>) = link
path :: [PID] -> Builder ()
path ns = foldM_ f (head ns) $ tail ns
where
f a t = link [a] t >> return t
namespace :: T.Text -> Builder () -> Builder ()
namespace prefix builder = modify (st <>)
where
st = execState (builder >> addPrefix) ([], [])
addPrefix = modify $ \(nodes, edges) ->
( map (\x -> x{_nodePid = prefix <> "_" <> _nodePid x}) nodes
, map (\x -> x{ _edgeFrom = prefix <> "_" <> _edgeFrom x
, _edgeTo = prefix <> "_" <> _edgeTo x }) edges )
buildWorkflow :: String
-> Builder ()
-> Q [Dec]
buildWorkflow workflowName = mkWorkflow workflowName . mkDAG
buildWorkflowPart :: FilePath
-> String
-> Builder ()
-> Q [Dec]
buildWorkflowPart dbPath wfName b = do
st <- runIO $ getWorkflowState dbPath
mkWorkflow wfName $ trimDAG st $ mkDAG b
where
getWorkflowState fl = do
db <- openDB fl
ks <- getKeys db
return $ M.fromList $ zip ks $ repeat Success
mkDAG :: Builder () -> DAG
mkDAG builder = mkGraph ns' es'
where
ns' = map (\x -> (pid2nid $ _nodePid x, x)) ns
es' = map (\Edge{..} -> (pid2nid _edgeFrom, pid2nid _edgeTo, _edgeOrd)) es
(ns, es) = execState builder ([], [])
pid2nid pid = M.findWithDefault
(error $ "mkDAG: cannot identify node: " ++ T.unpack pid) pid $
M.fromListWithKey
(\k _ _ -> error $ "Multiple declaration for: " ++ T.unpack k) $
zip (map _nodePid ns) [0..]
trimDAG :: (M.Map T.Text NodeState) -> DAG -> DAG
trimDAG st dag = gmap revise gr
where
revise context@(linkTo, _, nodeLabel, _)
| shallBuild (_nodePid nodeLabel) && null linkTo = context
| otherwise = context & _3 %~
( \l -> l{_nodeFunction = feedEmptyInput (_nodeFunction l)} )
where
feedEmptyInput x = [| (\() -> undefined) >=> $(x) |]
gr = labnfilter f dag
where
f (i, x) = shallBuild (_nodePid x) || any shallBuild children
where children = map (_nodePid . fromJust . lab dag) $ suc dag i
shallBuild x = case M.lookup x st of
Just Success -> False
_ -> True
mkWorkflow :: String
-> DAG -> Q [Dec]
mkWorkflow workflowName dag =
[d| $(varP $ mkName workflowName) = Workflow dag' pids $workflowMain |]
where
workflowMain = connect sinks [| const $ return () |]
dag' = nmap _nodePid dag
computeNodes = snd $ unzip $ labNodes dag
pids = M.fromList $ map (\Node{..} -> (_nodePid, _nodeAttr)) computeNodes
sinks = labNodes $ nfilter ((==0) . outdeg dag) dag
backTrack (i, Node{..}) = connect (fst $ unzip parents) [| $mkP $fun |]
where
parents = map ( \(x, o) -> ((x, fromJust $ lab dag x), o) ) $
sortBy (comparing snd) $ lpre dag i
fun = case _nodeAttr^.functionConfig of
FunctionConfig _ Pure -> [| return . $_nodeFunction |]
FunctionConfig _ IOAction -> [| liftIO . $_nodeFunction |]
FunctionConfig _ Stateful -> [| (lift . lift) . $_nodeFunction |]
mkP = case _nodeAttr^.functionConfig of
FunctionConfig None _ -> [| mkProc _nodePid |]
FunctionConfig (Standard n) _ -> [| mkProcListN n _nodePid |]
FunctionConfig (ShareData n) _ -> [| mkProcListNWithContext n _nodePid |]
connect [] sink = sink
connect [source] sink = [| $(backTrack source) >=> $sink |]
connect sources sink = [| fmap runParallel $expq >=> $sink |]
where
expq = foldl' g e0 $ sources
e0 = [| (pure. pure) $(conE (tupleDataName $ length sources)) |]
g acc x = [| ((<*>) . fmap (<*>)) $acc $ fmap Parallel $(backTrack x) |]
mkProc :: (DBData a, DBData b, ToJSON config)
=> PID -> (a -> (ProcState config) b) -> (Processor config a b)
mkProc = mkProcWith (return, runIdentity)
mkProcListN :: (DBData a, DBData b, ToJSON config)
=> Int
-> PID
-> (a -> (ProcState config) b)
-> (Processor config [a] [b])
mkProcListN n pid f = mkProcWith (chunksOf n, concat) pid $ mapM f
mkProcListNWithContext :: (DBData a, DBData b, DBData c, ToJSON config)
=> Int -> PID
-> (ContextData c a -> (ProcState config) b)
-> (Processor config (ContextData c [a]) [b])
mkProcListNWithContext n pid f = mkProcWith (toChunks, concat) pid f'
where
f' (ContextData c xs) = mapM f $ zipWith ContextData (repeat c) xs
toChunks (ContextData c xs) = zipWith ContextData (repeat c) $ chunksOf n xs
mkProcWith :: (Traversable t, DBData a, DBData b, ToJSON config)
=> (a -> t a, t b -> b) -> PID
-> (a -> (ProcState config) b)
-> (Processor config a b)
mkProcWith (box, unbox) pid f = \input -> do
wfState <- ask
let (pSt, attr) = M.findWithDefault (error "Impossible") pid $ wfState^.procStatus
pStValue <- liftIO $ takeMVar pSt
case pStValue of
(Fail ex) -> liftIO (putMVar pSt pStValue) >> lift (throwE (pid, ex))
Success -> liftIO $ do
putMVar pSt pStValue
fmap deserialize $ readData pid $ wfState^.database
Scheduled -> do
_ <- liftIO $ takeMVar $ wfState^.procParaControl
liftIO $ sendLog (wfState^.logServer) $ Running pid
config <- lift $ lift ask
let sendToRemote = fromMaybe (wfState^.remote) (attr^.submitToRemote)
remoteOpts = RemoteOpts
{ extraParams = attr^.remoteParam
, environment = config
}
input' = box input
result <- try $ unbox <$> if sendToRemote
then liftIO $ mapConcurrently (runRemote remoteOpts pid) input'
else mapM f input'
case result of
Left ex -> do
_ <- liftIO $ do
putMVar pSt $ Fail ex
_ <- forkIO $ putMVar (wfState^.procParaControl) ()
sendLog (wfState^.logServer) $ Warn pid "Failed!"
lift (throwE (pid, ex))
Right r -> liftIO $ do
saveData pid (serialize r) $ wfState^.database
putMVar pSt Success
_ <- forkIO $ putMVar (wfState^.procParaControl) ()
sendLog (wfState^.logServer) $ Complete pid
return r
Special mode -> handleSpecialMode mode wfState pSt pid f
handleSpecialMode :: (DBData a, DBData b)
=> SpecialMode
-> WorkflowState
-> MVar NodeState -> PID
-> (a -> (ProcState config) b)
-> (ProcState config) b
handleSpecialMode mode wfState nodeSt pid fn = case mode of
Skip -> liftIO $ putMVar nodeSt (Special Skip) >> return undefined
EXE inputData output -> do
c <- liftIO $ B.readFile inputData
r <- fn $ deserialize c
liftIO $ B.writeFile output $ serialize r
liftIO $ putMVar nodeSt $ Special Skip
return r
FetchData -> liftIO $ do
r <- fmap deserialize $ readData pid $ wfState^.database
B.putStr $ showYaml r
putMVar nodeSt $ Special Skip
return r
WriteData inputData -> do
c <- liftIO $ B.readFile inputData
r <- return (readYaml c) `asTypeOf` fn undefined
liftIO $ do
updateData pid (serialize r) $ wfState^.database
putMVar nodeSt $ Special Skip
return r