module Scientific.Workflow.Types
( Workflow(..)
, PID
, NodeState(..)
, SpecialMode(..)
, ProcState
, WorkflowState(..)
, database
, procStatus
, procParaControl
, remote
, logServer
, Processor
, RunMode(..)
, RunOpt(..)
, defaultRunOpt
, Parallel(..)
, WorkflowConfig
) where
import Control.Concurrent.Async.Lifted (concurrently)
import Control.Concurrent.MVar (MVar)
import Control.Exception (SomeException)
import Control.Lens (makeLenses)
import Control.Monad.Reader (ReaderT)
import Control.Monad.Trans.Except (ExceptT)
import Data.Graph.Inductive.Graph (labEdges, labNodes,
mkGraph)
import Data.Graph.Inductive.PatriciaTree (Gr)
import qualified Data.Map as M
import qualified Data.Serialize as S
import Data.Serialize.Text ()
import qualified Data.Text as T
import qualified Language.Haskell.TH.Lift as T
import Network.Socket (Socket)
import Scientific.Workflow.Internal.Builder.Types (Attribute)
import Scientific.Workflow.Internal.DB (WorkflowDB (..))
type PID = T.Text
data NodeState = Success
| Fail SomeException
| Scheduled
| Special SpecialMode
data SpecialMode = Skip
| FetchData
| WriteData FilePath
| EXE FilePath FilePath
data WorkflowState = WorkflowState
{ _database :: WorkflowDB
, _procStatus :: M.Map PID (MVar NodeState, Attribute)
, _procParaControl :: MVar ()
, _remote :: Bool
, _logServer :: Maybe Socket
}
makeLenses ''WorkflowState
type ProcState config = ReaderT WorkflowState (
ExceptT (PID, SomeException) (WorkflowConfig config) )
type WorkflowConfig config = ReaderT config IO
type Processor config a b = a -> (ProcState config) b
data Workflow config = Workflow
{ _worflow_dag :: Gr PID Int
, _worflow_pidToAttr :: M.Map T.Text Attribute
, _workflow :: Processor config () ()
}
data RunOpt = RunOpt
{ dbFile :: FilePath
, nThread :: Int
, runOnRemote :: Bool
, runMode :: RunMode
, configuration :: [FilePath]
, selected :: Maybe [PID]
, logServerAddr :: Maybe String
}
defaultRunOpt :: RunOpt
defaultRunOpt = RunOpt
{ dbFile = "sciflow.db"
, nThread = 1
, runOnRemote = False
, runMode = Master
, configuration = []
, selected = Nothing
, logServerAddr = Nothing
}
data RunMode = Master
| Slave PID FilePath FilePath
| Review PID
| Replace PID FilePath
instance (T.Lift a, T.Lift b) => T.Lift (Gr a b) where
lift gr = [| uncurry mkGraph $(T.lift (labNodes gr, labEdges gr)) |]
newtype Parallel config r = Parallel { runParallel :: (ProcState config) r}
instance Functor (Parallel config) where
fmap f (Parallel a) = Parallel $ f <$> a
instance Applicative (Parallel config) where
pure = Parallel . pure
Parallel fs <*> Parallel as = Parallel $
(\(f, a) -> f a) <$> concurrently fs as
instance S.Serialize (Gr (PID, Attribute) Int)