{-# LANGUAGE CPP #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TemplateHaskell #-} module Scientific.Workflow.Main ( defaultMain , defaultMainOpts , mainWith , MainOpts(..) , runWorkflow ) where import Control.Concurrent (forkIO) import Control.Concurrent.MVar import Control.Exception (bracket, displayException) import Control.Monad (replicateM_) import Control.Monad.Reader (runReaderT) import Control.Monad.Trans.Except (runExceptT) import Data.Default.Class (Default (..)) import Data.Graph.Inductive.Graph (lab, labNodes, nmap) import Data.Graph.Inductive.Query.DFS (rdfs) import qualified Data.Map as M import Data.Maybe (fromJust) import qualified Data.Set as S import Data.Tuple (swap) import Network.Socket (Family (..), SockAddr (..), Socket, SocketType (Stream), close, connect, defaultProtocol, isConnected, socket) import qualified Data.ByteString.Char8 as B import Data.Graph.Inductive.PatriciaTree (Gr) import Data.Maybe (fromMaybe) import Data.Serialize (encode) import qualified Data.Text as T import qualified Data.Text.Lazy.IO as T import Data.Yaml (FromJSON, decodeEither) #ifdef DRMAA_ENABLED import DRMAA (withSession) #endif import Language.Haskell.TH import qualified Language.Haskell.TH.Lift as T import Options.Applicative hiding (Success) import Text.Printf (printf) import Data.Version (showVersion) import Paths_SciFlow (version) import Scientific.Workflow.Internal.Builder import Scientific.Workflow.Internal.Builder.Types import Scientific.Workflow.Internal.DB import Scientific.Workflow.Internal.Utils import Scientific.Workflow.Main.Options (CMD (..), GlobalOpts (..), argsParser) import Scientific.Workflow.Types import Scientific.Workflow.Visualize data MainOpts = MainOpts { preAction :: Name -- ^ An action to be execute before the workflow. -- The action should have type: @'IO' () -> 'IO' ()@. -- e.g., some initialization processes. , programHeader :: String } T.deriveLift ''MainOpts defaultMainOpts :: MainOpts defaultMainOpts = MainOpts { preAction = 'id , programHeader = printf "SciFlow-%s" (showVersion version) } defaultMain :: Builder () -> Q [Dec] defaultMain = mainWith defaultMainOpts mainWith :: MainOpts -> Builder () -> Q [Dec] mainWith opts builder = do wf_q <- buildWorkflow wfName builder main_q <- [d| main = mainFunc $(varE $ preAction opts) dag $(varE $ mkName wfName) (programHeader opts) |] return $ wf_q ++ main_q where wfName = "sciFlowDefaultMain" dag = nmap (\x -> (_nodePid x, _nodeAttr x)) $ mkDAG builder {-# INLINE mainWith #-} mainFunc :: (Default config, FromJSON config) => (IO () -> IO ()) -- initialization function -> Gr (PID, Attribute) Int -> Workflow config -> String -- program header -> IO () mainFunc initialize dag wf h = execParser (argsParser h) >>= execute where execute cmd = case cmd of Run opts n r s logS -> let runOpts = defaultRunOpt { dbFile = dbPath opts , runOnRemote = True , nThread = n , configuration = fromMaybe [] $ configFile opts , selected = fmap (map T.pack) s , logServerAddr = logS } in if r #ifdef DRMAA_ENABLED then initialize $ withSession $ runWorkflow wf runOpts #else then initialize $ runWorkflow wf runOpts #endif else runWorkflow wf runOpts{runOnRemote = False} View isRaw -> if isRaw then B.putStr $ encode dag else T.putStrLn $ drawWorkflow dag Cat opts pid -> runWorkflow wf defaultRunOpt { dbFile = dbPath opts , nThread = 4 , runMode = Review $ T.pack pid , configuration = fromMaybe [] $ configFile opts } Write opts pid input -> runWorkflow wf defaultRunOpt { dbFile = dbPath opts , nThread = 4 , runMode = Replace (T.pack pid) input , configuration = fromMaybe [] $ configFile opts } Delete opts pid -> bracket (openDB $ dbPath opts) closeDB (delRecord $ T.pack pid) Call opts pid inputFl outputFl -> runWorkflow wf defaultRunOpt { dbFile = dbPath opts , nThread = 4 , runMode = Slave (T.pack pid) inputFl outputFl , configuration = fromMaybe [] $ configFile opts } Recover _ _ -> undefined DumpDB _ _ -> undefined {- recoverExe (Recover opts dir) (Workflow _ ft _) = do fls <- shelly $ lsT $ fromText $ T.pack dir shelly $ rm_f $ fromText $ T.pack $ dbPath opts db <- openDB $ dbPath opts forM_ fls $ \fl -> do let pid = snd $ T.breakOnEnd "/" fl case M.lookup (T.unpack pid) ft of Just (DynFunction fn) -> do printf "Recovering node: %s.\n" pid c <- B.readFile $ T.unpack fl dat <- return (readYaml c) `asTypeOf` fn undefined saveData pid dat db Nothing -> printf "Cannot identify node: %s. Skipped.\n" pid -} {- dumpDBExe (DumpDB opts dir) (Workflow _ ft _) = do shelly $ mkdir_p $ fromText $ T.pack dir db <- openDB $ dbPath opts nodes <- getKeys db forM_ nodes $ \pid -> do let fl = dir ++ "/" ++ T.unpack pid case M.lookup (T.unpack pid) ft of Just (DynFunction fn) -> do printf "Saving node: %s.\n" pid dat <- readData pid db `asTypeOf` fn undefined B.writeFile fl $ showYaml dat Nothing -> return () -} runWorkflow :: (Default config, FromJSON config) => Workflow config -> RunOpt -> IO () runWorkflow (Workflow gr pids wf) opts = bracket (mkConnection opts) cleanUp $ \(db, logS) -> do ks <- S.fromList <$> getKeys db let selection = case selected opts of Nothing -> Nothing Just xs -> let nodeMap = M.fromList $ map swap $ labNodes gr nds = map (flip (M.findWithDefault undefined) nodeMap) xs in Just $ S.fromList $ map (fromJust . lab gr) $ rdfs nds gr pidStateMap <- flip M.traverseWithKey pids $ \pid attr -> case runMode opts of Master -> do v <- case fmap (S.member pid) selection of Just False -> newMVar $ Special Skip _ -> if pid `S.member` ks then newMVar Success else newMVar Scheduled return (v, attr) Slave i input output -> do v <- if pid == i then newMVar $ Special $ EXE input output else newMVar $ Special Skip return (v, attr) Review i -> do v <- if pid == i then newMVar (Special FetchData) else newMVar $ Special Skip return (v, attr) Replace i input -> do v <- if pid == i then newMVar (Special $ WriteData input) else newMVar $ Special Skip return (v, attr) availableThreads <- newEmptyMVar _ <- forkIO $ replicateM_ (nThread opts) $ putMVar availableThreads () let initState = WorkflowState db pidStateMap availableThreads (runOnRemote opts) logS config <- case configuration opts of [] -> return def fls -> do r <- decodeEither . B.unlines <$> mapM B.readFile fls case r of Left err -> error err Right x -> return x result <- runReaderT (runExceptT $ runReaderT (wf ()) initState) config case result of Right _ -> return () Left (pid, ex) -> sendLog logS $ Error $ printf "\"%s\" failed. The error was: %s." pid (displayException ex) mkConnection :: RunOpt -> IO (WorkflowDB, Maybe Socket) mkConnection opts = do db <- openDB $ dbFile opts logS <- case logServerAddr opts of Just addr -> do sock <- socket AF_UNIX Stream defaultProtocol connect sock $ SockAddrUnix addr connected <- isConnected sock if connected then return $ Just sock else error "Could not connect to socket!" Nothing -> return Nothing return (db, logS) cleanUp :: (WorkflowDB, Maybe Socket) -> IO () cleanUp (db, sock) = do sendLog sock Exit case sock of Just s -> close s _ -> return () closeDB db