module Transient.Move where
import Transient.Base
import Transient.Logged
import Data.Typeable
import Control.Applicative
import Network
import Network.HTTP
import Control.Monad.IO.Class
import System.IO
import Control.Exception
import Data.Maybe
import Unsafe.Coerce
import System.Process
import System.Directory
import Control.Monad
import Network.Info
import System.IO.Unsafe
import Control.Concurrent.STM as STM
import Data.Monoid
import qualified Data.Map as M
import Data.List (nub,(\\))
import Data.IORef
installService node port servport package= do
beamTo node port
liftIO $ do
let packagename= name package
exist <- doesDirectoryExist packagename
when (not exist) $ do
runCommand $ "git clone "++ package
runCommand $ "cd "++ packagename
runCommand "cabal install"
createProcess $ shell $ "./dist/build/"++ packagename++"/"++packagename
++ " " ++ show port
return()
where
name path=
let x= dropWhile (/= '/') path
in if x== "" then tail path else name $ tail x
beamTo :: HostName -> PortID -> TransientIO ()
beamTo host port= do
Log rec log _ <- getSData <|> return (Log False [][])
if rec then return () else do
h <- liftIO $ connectTo host port
liftIO $ hSetBuffering h LineBuffering
liftIO $ hPutStrLn h (show $ reverse log) >> hFlush h
liftIO $ hClose h
delSData h
stop
forkTo :: HostName -> PortID -> TransientIO ()
forkTo host port= do
Log rec log _<- getSData <|> return (Log False [][])
if rec then return () else do
h <- liftIO $ connectTo host port
liftIO $ hSetBuffering h LineBuffering
liftIO $ hPutStrLn h (show $ reverse log) >> hFlush h
liftIO $ hClose h
delSData h
callTo :: (Show a, Read a,Typeable a) => HostName -> PortID -> TransIO a -> TransIO a
callTo host port remoteProc= logged $ Transient $ do
Log rec log fulLog <- getSessionData `onNothing` return (Log False [][])
if rec
then
runTrans $ do
Connection port h sock <- getSData <|> error "callto: no hander"
r <- remoteProc !> "executing remoteProc" !> "CALLTO REMOTE"
liftIO $ hPutStrLn h (show r)
setSData WasRemote
stop
else do
h <- liftIO $ connectTo host port
liftIO $ hPutStrLn h (show $ reverse fulLog) >> hFlush h !> "CALLTO LOCAL"
let log'= WaitRemote:tail log
setSessionData $ Log rec log' log'
runTrans $ waitEvents $ do
liftIO $ hSetBuffering h LineBuffering
s <- hGetLine h
let r = read s
return r !> "read: " ++ s ++" response type= "++show( typeOf r)
callTo' :: (Show a, Read a,Typeable a) => HostName -> PortID -> TransIO a -> TransIO a
callTo' rhost rport remoteProc= logged $ do
(host,port) <- getMyNode
logged $ beamTo rhost rport
r <- logged remoteProc
logged $ beamTo host port
return r
data Connection= Connection PortID Handle Socket deriving Typeable
listen :: PortID -> TransIO ()
listen port = do
setSData $ Log False [] []
sock <- liftIO $ withSocketsDo $ listenOn port
(h,host,port1) <- parallel $ Right <$> accept sock
`catch` (\(e::SomeException) -> sClose sock >> throw e)
liftIO $ hSetBuffering h LineBuffering
slog <- Transient $ liftIO $ (Just <$> hGetLine h)
`catch` (\(e::SomeException) -> print "ERR" >> return Nothing)
setSData $ Connection port h sock
let log= read slog
setSData $ Log True log (reverse log)
beamInit :: PortID -> TransIO a -> IO b
beamInit port program= keep $ do
listen port <|> return ()
program
instance Read PortNumber where
readsPrec n str= let [(n,s)]= readsPrec n str in [(fromIntegral n,s)]
deriving instance Read PortID
deriving instance Typeable PortID
data Node= Node{host :: HostName, port :: PortID, connection :: Maybe(Handle,Socket,HostName,PortID)} deriving (Eq,Typeable)
instance Show Node where show (Node h p _)= show (h,p)
instance Read Node where readsPrec _ s= let [((h,p),s')]= readsPrec 0 s in [((Node h p Nothing),s')]
nodeList :: TVar [Node]
nodeList = unsafePerformIO $ newTVarIO []
deriving instance Ord PortID
myNode= unsafePerformIO $ newIORef Nothing
setMyNode h p= liftIO $ writeIORef myNode $ Just (h,p)
getMyNode= Transient $ liftIO $ readIORef myNode
getNodes :: TransIO [Node]
getNodes = Transient $ Just <$> (liftIO $ atomically $ readTVar nodeList)
addNodes nodes= Transient . liftIO . atomically $ do
prevnodes <- readTVar nodeList
writeTVar nodeList $ nub $ prevnodes ++ nodes
return $ Just ()
clustered :: (Typeable a, Show a, Read a) => Monoid a => TransIO a -> TransIO a
clustered proc= logged $ do
nodes <- step getNodes
logged $ foldr (<>) mempty $ map (\(Node h p _) -> callTo h p proc) nodes !> "fold"
clustered' proc= logged $ do
nodes <- step getNodes
logged $ mapM (\(Node h p _) -> callTo' h p proc) $ nodes
connect :: HostName -> PortID -> HostName -> PortID -> TransientIO ()
connect host port remotehost remoteport= do
listen port <|> return ()
logged $ do
let n= Node host port Nothing
logged $ do
setMyNode host port
addNodes [n]
liftIO $ putStrLn $ "connecting to: "++ show (remotehost,remoteport)
newnode <- logged $ return n
port <- logged $ return port
nodes <- callTo remotehost remoteport $ do
clustered $ addNodes [newnode]
r <- getNodes
liftIO $ putStrLn $ "Connected to modes: " ++ show r
return r
logged $ addNodes nodes