{-# LANGUAGE CPP #-} -- | Concurrency for Shellmate programs. module Control.Shell.Concurrent ( Future, ThreadId, Control.Shell.Concurrent.forkIO, Control.Shell.Concurrent.killThread, fork2, fork3, future, await, check, parallel, parallel_, chunks ) where import Control.Concurrent as CC import Control.Monad import Control.Shell import Control.Shell.Internal (inEnv) import Data.IORef import System.Process -- | Only used to have something reliable to attach the futures' weakrefs to. type FinalizerHandle = IORef ThreadId -- | A future is a computation which is run in parallel with a program's main -- thread and which may at some later point finish and return a value. -- -- Note that future computations are killed when their corresponding @Future@ -- is garbage collected. This means that a future should *always* be -- 'await'ed at some point or otherwise kept alive, to ensure that the -- computation finishes. -- -- Note that all any code called in a future using 'unsafeLiftIO' must -- refrain from using environment variables, standard input/output, relative -- paths and the current working directory, in order to avoid race -- conditions. Code within the 'Shell' monad, code imported using 'liftIO' -- and external processes spawned from within 'Shell' is perfectly safe. data Future a = Future !FinalizerHandle !(MVar (Either ExitReason a)) -- | Create a future value. future :: Shell a -> Shell (Future a) future m = do env <- getShellEnv unsafeLiftIO $ do v <- newEmptyMVar tid <- CC.forkIO $ runSh env m >>= putMVar v -- We need a WeakRef to something that's not referenced by the computation -- to be able to kill it when the result is not reachable. IORef to TID is -- as good as anything. r <- newIORef tid _ <- mkWeakIORef r (CC.killThread tid) return $ Future r v -- | Wait for a future value. await :: Future a -> Shell a await (Future h v) = joinResult $ unsafeLiftIO (readMVar v <* readIORef h) -- | Check whether a future value has arrived or not. check :: Future a -> Shell (Maybe a) check (Future h v) = do mx <- unsafeLiftIO $ tryReadMVar v <* readIORef h case h `seq` mx of Just x -> Just <$> joinResult (pure x) _ -> pure Nothing -- | Perform the given computations in parallel. -- The race condition warning for 'Future' when modifying environment -- variables or using relative paths still applies. parallel :: [Shell a] -> Shell [a] parallel = mapM future >=> mapM await -- | Like 'parallel', but discards any return values. parallel_ :: [Shell a] -> Shell () parallel_ = mapM future >=> mapM_ await -- | Break a list into chunks. This is quite useful for when performing *every* -- computation in parallel is too much. For instance, to download a list of -- files three at a time, one would do -- @mapM_ (parallel_ downloadFile) (chunks 3 files)@. chunks :: Int -> [a] -> [[a]] chunks _ [] = [] chunks n xs | length xs > n = take n xs : chunks n (drop n xs) | otherwise = [xs] -- | Run a computation in a separate thread. If the thread throws an error, -- it will simply die; the error will not propagate to its parent thread. forkIO :: Shell () -> Shell ThreadId forkIO m = do env <- getShellEnv unsafeLiftIO $ do CC.forkIO $ void $ runSh env m -- | Run a computation in a separate thread, with its standard input and output -- provided by the first and second handles returned respectively. -- The handles are line buffered by default. fork2 :: Shell () -> Shell (Handle, Handle, ThreadId) fork2 m = do env <- getShellEnv (ri, wi) <- unsafeLiftIO createPipe (ro, wo) <- unsafeLiftIO createPipe mapM_ (flip hSetBuffering LineBuffering) [wi,wo] tid <- Control.Shell.Concurrent.forkIO $ do inEnv (env {envStdOut = wo, envStdIn = ri}) m return (wi, ro, tid) -- | Like 'fork2', but adds a third handle for standard error. fork3 :: Shell () -> Shell (Handle, Handle, Handle, ThreadId) fork3 m = do env <- getShellEnv (ri, wi) <- unsafeLiftIO createPipe (ro, wo) <- unsafeLiftIO createPipe (re, we) <- unsafeLiftIO createPipe mapM_ (flip hSetBuffering LineBuffering) [wi,wo,we] tid <- inEnv (env {envStdOut = wo, envStdErr = we, envStdIn = ri}) $ do Control.Shell.Concurrent.forkIO m return (wi, ro, re, tid) -- | Terminate a thread spawned by 'Control.Shell.Concurrent.forkIO'. killThread :: ThreadId -> Shell () killThread = liftIO . CC.killThread