module Transient.Move where
import Transient.Base
import Transient.Logged
import Data.Typeable
import Control.Applicative
import Network
import Control.Monad.IO.Class
import Control.Monad.State
import System.IO
import Control.Exception
import Data.Maybe
import Unsafe.Coerce
import System.Process
import System.Directory
import Control.Monad
import Network.Info
import System.IO.Unsafe
import Control.Concurrent.STM as STM
import Control.Concurrent.MVar
import Data.Monoid
import qualified Data.Map as M
import Data.List (nub,(\\),find)
import Data.IORef
import qualified Network.Socket as NS
import qualified Network.BSD as BSD
import qualified Data.ByteString.Char8 as BS
import System.IO
import Control.Concurrent
installService (node@(Node _ port _)) servport package= do
beamTo node
liftIO $ do
let packagename= name package
exist <- doesDirectoryExist packagename
when (not exist) $ do
runCommand $ "git clone "++ package
runCommand $ "cd "++ packagename
runCommand "cabal install"
return ()
createProcess $ shell $ "./dist/build/"++ packagename++"/"++packagename
++ " " ++ show port
return()
where
name path=
let x= dropWhile (/= '/') path
in if x== "" then tail path else name $ tail x
beamTo :: Node -> TransientIO ()
beamTo node = do
Log rec log _ <- getSData <|> return (Log False [][])
if rec then return () else do
Connection _ bufSize <- getSData <|> return (Connection Nothing 8192)
h <- assign bufSize node
liftIO $ hPutStrLn h (show $ SMore $ reverse log) >> hFlush h
release node h
let log'= WaitRemote: log
setSData $ Log rec log' log'
stop
forkTo :: Node -> TransientIO ()
forkTo node= do
Log rec log _<- getSData <|> return (Log False [][])
if rec then return () else do
Connection _ bufSize <- getSData <|> return (Connection Nothing 8192)
h <-assign bufSize node
liftIO $ hPutStrLn h (show $ SMore $ reverse log) >> hFlush h
release node h
callTo :: Loggable a => Node -> TransIO a -> TransIO a
callTo n p = streamFrom n (SMore <$> p) >>= \(SMore x) -> return x
runAt :: Loggable a => Node -> TransIO a -> TransIO a
runAt= callTo
streamFrom :: Loggable a => Node -> TransIO (StreamData a) -> TransIO (StreamData a)
streamFrom node remoteProc= logged $ Transient $ do
Log rec log fulLog <- getSessionData `onNothing` return (Log False [][])
if rec
then
runTrans $ do
rnum <- liftIO $ newMVar (0 :: Int)
Connection (Just (_, h, sock, blocked)) _ <- getSData <|> error "callTo: no hander"
r <- remoteProc !> "executing remoteProc" !> "CALLTO REMOTE"
n <- liftIO $ do
withMVar blocked $ const $ hPutStrLn h (show r) `catch` (\(e::SomeException) -> sClose sock)
setSData WasRemote
stop
else do
Connection _ bufSize <- getSessionData `onNothing` return (Connection Nothing 8192)
h <- assign bufSize node
liftIO $ hSetBuffering h LineBuffering
liftIO $ hPutStrLn h ( show $ SLast $ reverse fulLog) !> "CALLTO LOCAL"
let log'= WaitRemote:tail log
setSessionData $ Log rec log' log'
runTrans $ do
r<- parallel $ do
r <- readHandler h
case r of
SDone -> release node h >> return SDone
other -> return other
case r of
SDone -> empty
other -> return other
callTo' :: (Show a, Read a,Typeable a) => Node -> TransIO a -> TransIO a
callTo' node remoteProc= logged $ do
mynode <- logged getMyNode
beamTo node
r <- logged remoteProc
beamTo mynode
return r
type Blocked= MVar ()
type BuffSize = Int
data Connection= Connection (Maybe(PortID, Handle, Socket, Blocked)) BuffSize
deriving Typeable
setBufSize :: Int -> TransIO ()
setBufSize size= Transient $ do
Connection c _ <- getSessionData `onNothing` return (Connection Nothing size)
setSessionData $ Connection c size
return $ Just ()
readHandler h= do
line <- hGetLine h
let [(v,left)]= readsPrec 0 line
return v
`catch` (\(e::SomeException) -> do
hClose h
liftIO $ do
putStr "readHandler: "
print e
return SDone)
where
connectTo' bufSize hostname (PortNumber port) = do
proto <- BSD.getProtocolNumber "tcp"
bracketOnError
(NS.socket NS.AF_INET NS.Stream proto)
(sClose)
(\sock -> do
NS.setSocketOption sock NS.RecvBuffer bufSize
NS.setSocketOption sock NS.SendBuffer bufSize
he <- BSD.getHostByName hostname
NS.connect sock (NS.SockAddrInet port (BSD.hostAddress he))
NS.socketToHandle sock ReadWriteMode
)
listen :: Node -> TransIO ()
listen (Node _ port _) = do
addThreads 1
setSData $ Log False [] []
Connection _ bufSize <- getSData <|> return (Connection Nothing 8192)
sock <- liftIO $ withSocketsDo $ listenOn port
liftIO $ do NS.setSocketOption sock NS.RecvBuffer bufSize
NS.setSocketOption sock NS.SendBuffer bufSize
SMore(h,host,port1) <- parallel $ SMore <$> accept sock
`catch` (\(e::SomeException) -> print "socket exception" >> sClose sock >> throw e)
setSData $ Connection (Just (port, h, sock, unsafePerformIO $ newMVar ())) bufSize
liftIO $ hSetBuffering h LineBuffering
mlog <- parallel $ readHandler h
case mlog of
SError e -> do
liftIO $ do
hClose h
putStr "listen: "
print e
stop
SDone -> liftIO (hClose h) >> stop
SMore log -> setSData $ Log True log (reverse log)
SLast log -> setSData $ Log True log (reverse log)
beamInit :: Node -> TransIO a -> IO a
beamInit node program= keep $ do
listen node <|> return ()
program
instance Read PortNumber where
readsPrec n str= let [(n,s)]= readsPrec n str in [(fromIntegral n,s)]
deriving instance Read PortID
deriving instance Typeable PortID
data Pool= Pool{free :: [Handle], pending :: Int}
data Node= Node{host :: HostName, port :: PortID, connection :: IORef Pool} deriving Typeable
release (Node h p rpool) hand= liftIO $ do
mhs <- atomicModifyIORef rpool $
\(Pool hs pend) ->
if pend==0
then (Pool [] 0,Just hs)
else (Pool (hand:hs) pend,Nothing)
case mhs of
Nothing -> return ()
Just hs -> mapM_ hClose hs
assign bufSize (Node h p pool)= liftIO $ do
mh <- atomicModifyIORef pool $
\(Pool hs p) -> if null hs then (Pool hs p, Nothing)
else (Pool (tail hs) p, Just(head hs)) !> "REUSED"
case mh of
Just handle -> liftIO (putStrLn "REUSED!") >> return handle
Nothing -> liftIO $ do
h <- connectTo' bufSize h p !> "REOPEN"
hSetBuffering h LineBuffering
return h
emptyPool :: MonadIO m => m (IORef Pool)
emptyPool= liftIO $ newIORef $ Pool [] 0
createNode :: HostName -> Integer -> Node
createNode h p= Node h ( PortNumber $ fromInteger p) (unsafePerformIO emptyPool)
instance Eq Node where
Node h p _ ==Node h' p' _= h==h' && p==p'
instance Show Node where show (Node h p _)= show (h,p)
instance Read Node where
readsPrec _ s=
let [((h,p),s')]= readsPrec 0 s
in [(Node h p empty,s')]
where
empty= unsafePerformIO emptyPool
nodeList :: TVar [Node]
nodeList = unsafePerformIO $ newTVarIO []
deriving instance Ord PortID
myNode= unsafePerformIO $ newIORef Nothing
setMyNode node= liftIO $ writeIORef myNode $ Just node
getMyNode= Transient $ liftIO $ readIORef myNode
getNodes :: MonadIO m => m [Node]
getNodes = liftIO $ atomically $ readTVar nodeList
addNodes :: MonadIO m => [Node] -> m ()
addNodes nodes= liftIO . atomically $ do
prevnodes <- readTVar nodeList
writeTVar nodeList $ nub $ nodes ++ prevnodes
shuffleNodes :: MonadIO m => m [Node]
shuffleNodes= liftIO . atomically $ do
nodes <- readTVar nodeList
let nodes'= tail nodes ++ [head nodes]
writeTVar nodeList nodes'
return nodes'
clustered :: Loggable a => TransIO a -> TransIO a
clustered proc= logged $ do
nodes <- step getNodes
logged $ foldr (<|>) empty $ map (\node -> callTo node proc) nodes !> "fold"
clustered' proc= logged $ do
nodes <- getNodes
logged $ mapM (\node -> callTo' node proc) $ nodes
mclustered :: (Monoid a, Loggable a) => TransIO a -> TransIO a
mclustered proc= logged $ do
nodes <- step getNodes
logged $ foldr (<>) mempty $ map (\node -> callTo node proc) nodes !> "fold"
connect :: Node -> Node -> TransientIO ()
connect (node@(Node h port _)) remotenode= do
listen node <|> return ()
logged $ do
logged $ do
setMyNode node
addNodes [node]
liftIO $ putStrLn $ "connecting to: "++ show remotenode
newnode <- logged $ return node
port <- logged $ return port
nodes <- callTo remotenode $ do
clustered $ addNodes [newnode]
r <- getNodes
liftIO $ putStrLn $ "Connected to modes: " ++ show r
return r
logged $ addNodes nodes