--------------------------------------------------------------------------
--
-- Module : Transient.Move.Internals
-- Copyright :
-- License : MIT
--
-- Maintainer : agocorona@gmail.com
-- Stability :
-- Portability :
--
--
-----------------------------------------------------------------------------
{-# LANGUAGE DeriveDataTypeable , ExistentialQuantification, OverloadedStrings,FlexibleInstances, UndecidableInstances
,ScopedTypeVariables, StandaloneDeriving, RecordWildCards, FlexibleContexts, CPP
,GeneralizedNewtypeDeriving #-}
module Transient.Move.Internals where
import Prelude hiding(drop,length)
import Transient.Internals
import Transient.Parse
import Transient.Logged
import Transient.Indeterminism
import Transient.Mailboxes
import Data.Typeable
import Control.Applicative
import System.Random
import Data.String
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy.Char8 as BS
import System.Time
import Data.ByteString.Builder
#ifndef ghcjs_HOST_OS
import Network
--- import Network.Info
import Network.URI
--import qualified Data.IP as IP
import qualified Network.Socket as NS
import qualified Network.BSD as BSD
import qualified Network.WebSockets as NWS -- S(RequestHead(..))
import qualified Network.WebSockets.Connection as WS
import Network.WebSockets.Stream hiding(parse)
import qualified Data.ByteString as B(ByteString)
import qualified Data.ByteString.Lazy.Internal as BLC
import qualified Data.ByteString.Lazy as BL
import Network.Socket.ByteString as SBS(sendMany,sendAll,recv)
import qualified Network.Socket.ByteString.Lazy as SBSL
import Data.CaseInsensitive(mk,CI)
import Data.Char
import Data.Aeson
import qualified Data.ByteString.Base64.Lazy as B64
-- import System.Random
#else
import JavaScript.Web.WebSocket
import qualified JavaScript.Web.MessageEvent as JM
import GHCJS.Prim (JSVal)
import GHCJS.Marshal(fromJSValUnchecked)
import qualified Data.JSString as JS
-- import Data.Text.Encoding
import JavaScript.Web.MessageEvent.Internal
import GHCJS.Foreign.Callback.Internal (Callback(..))
import qualified GHCJS.Foreign.Callback as CB
--import Data.JSString (JSString(..), pack,drop,length)
#endif
import Control.Monad.State
import Control.Monad.Fail
import Control.Exception hiding (onException,try)
import Data.Maybe
--import Data.Hashable
import System.IO.Unsafe
import Control.Concurrent.STM as STM
import Control.Concurrent.MVar
import Data.Monoid
import qualified Data.Map as M
import Data.List (partition,union,(\\),length, nubBy,isPrefixOf) -- (nub,(\\),intersperse, find, union, length, partition)
--import qualified Data.List(length)
import Data.IORef
import Control.Concurrent
import System.Mem.StableName
import Unsafe.Coerce
import System.Environment
{- TODO
timeout for closures: little smaller in sender than in receiver
-}
--import System.Random
pk= BS.pack
up= BS.unpack
#ifdef ghcjs_HOST_OS
type HostName = String
newtype PortID = PortNumber Int deriving (Read, Show, Eq, Typeable)
#endif
data Node= Node{ nodeHost :: HostName
, nodePort :: Int
, connection :: Maybe (MVar Pool)
, nodeServices :: [Service]
}
deriving (Typeable)
instance Loggable Node
instance Ord Node where
compare node1 node2= compare (nodeHost node1,nodePort node1)(nodeHost node2,nodePort node2)
-- The cloud monad is a thin layer over Transient in order to make sure that the type system
-- forces the logging of intermediate results
newtype Cloud a= Cloud {runCloud' ::TransIO a} deriving (AdditionalOperators,Functor,
#ifdef MIN_VERSION_base(4,11,0)
Semigroup,
#endif
Monoid ,Applicative, Alternative,MonadFail, Monad, Num, Fractional, MonadState EventF)
{-
instance Applicative Cloud where
pure a = Cloud $ return a
Cloud mf <*> Cloud mx = do
-- bp <- getData `onNothing` error "no backpoint"
-- local $ onExceptionPoint bp $ \(CloudException _ _ _) -> continue
r1 <- onAll . liftIO $ newIORef Nothing
r2 <- onAll . liftIO $ newIORef Nothing
onAll $ fparallel r1 r2 <|> xparallel r1 r2
where
fparallel r1 r2= do
f <- mf
liftIO $ (writeIORef r1 $ Just f)
mr <- liftIO (readIORef r2)
case mr of
Nothing -> empty
Just x -> return $ f x
xparallel r1 r2 = do
mr <- liftIO (readIORef r1)
case mr of
Nothing -> do
p <- gets execMode
if p== Serial then empty else do
x <- mx
liftIO $ (writeIORef r2 $ Just x)
mr <- liftIO (readIORef r1)
case mr of
Nothing -> empty
Just f -> return $ f x
Just f -> do
x <- mx
liftIO $ (writeIORef r2 $ Just x)
return $ f x
-}
type UPassword= BS.ByteString
type Host= BS.ByteString
type ProxyData= (UPassword,Host,Int)
rHTTPProxy= unsafePerformIO $ newIORef (Nothing :: Maybe (Maybe ProxyData, Maybe ProxyData))
getHTTProxyParams t= do
mp <- liftIO $ readIORef rHTTPProxy
case mp of
Just (p1,p2) -> return $ if t then p2 else p1
Nothing -> do
ps <- (,) <$> getp "http" <*> getp "https"
liftIO $ writeIORef rHTTPProxy $ Just ps
getHTTProxyParams t
where
getp t= do
let var= t ++ "_proxy"
p<- liftIO $ lookupEnv var
tr ("proxy",p)
case p of
Nothing -> return Nothing
Just hp -> do
pr<- withParseString (BS.pack hp) $ do
tDropUntilToken (BS.pack "//") <|> return ()
(,,) <$> getUPass <*> tTakeWhile' (/=':') <*> int
return $ Just pr
getUPass= tTakeUntilToken "@" <|> return ""
-- | Execute a distributed computation inside a TransIO computation.
-- All the computations in the TransIO monad that enclose the cloud computation must be `logged`
runCloud :: Cloud a -> TransIO a
runCloud x= do
closRemote <- getState <|> return (Closure 0)
runCloud' x <*** setState closRemote
--instance Monoid a => Monoid (Cloud a) where
-- f mappend x y = mappend <$> x <*> y
-- mempty= return mempty
#ifndef ghcjs_HOST_OS
--- empty Hooks for TLS
{-# NOINLINE tlsHooks #-}
tlsHooks ::IORef (Bool
,SData -> BS.ByteString -> IO ()
,SData -> IO B.ByteString
,NS.Socket -> BS.ByteString -> TransIO ()
,String -> NS.Socket -> BS.ByteString -> TransIO ()
,SData -> IO ())
tlsHooks= unsafePerformIO $ newIORef
( False
, notneeded
, notneeded
, \_ i -> tlsNotSupported i
, \_ _ _-> return()
, \_ -> return())
where
notneeded= error "TLS hook function called"
tlsNotSupported input = do
if ((not $ BL.null input) && BL.head input == 0x16)
then do
conn <- getSData
sendRaw conn $ BS.pack $ "HTTP/1.0 525 SSL Handshake Failed\r\nContent-Length: 0\nConnection: close\r\n\r\n"
else return ()
(isTLSIncluded,sendTLSData,recvTLSData,maybeTLSServerHandshake,maybeClientTLSHandshake,tlsClose)= unsafePerformIO $ readIORef tlsHooks
#endif
-- | Means that this computation will be executed in the current node. the result will be logged
-- so the closure will be recovered if the computation is translated to other node by means of
-- primitives like `beamTo`, `forkTo`, `runAt`, `teleport`, `clustered`, `mclustered` etc
local :: Loggable a => TransIO a -> Cloud a
local = Cloud . logged
--stream :: Loggable a => TransIO a -> Cloud (StreamVar a)
--stream= Cloud . transport
-- #ifndef ghcjs_HOST_OS
-- | Run a distributed computation inside the IO monad. Enables asynchronous
-- console input (see 'keep').
runCloudIO :: Typeable a => Cloud a -> IO (Maybe a)
runCloudIO (Cloud mx)= keep mx
-- | Run a distributed computation inside the IO monad with no console input.
runCloudIO' :: Typeable a => Cloud a -> IO (Maybe a)
runCloudIO' (Cloud mx)= keep' mx
-- #endif
-- | alternative to `local` It means that if the computation is translated to other node
-- this will be executed again if this has not been executed inside a `local` computation.
--
-- > onAll foo
-- > local foo'
-- > local $ do
-- > bar
-- > runCloud $ do
-- > onAll baz
-- > runAt node ....
-- > runAt node' .....
--
-- foo bar and baz will e executed locally.
-- But foo will be executed remotely also in node' while foo' bar and baz don't.
--
--
onAll :: TransIO a -> Cloud a
onAll = Cloud
-- | only executes if the result is demanded. It is useful when the conputation result is only used in
-- the remote node, but it is not serializable. All the state changes executed in the argument with
-- `setData` `setState` etc. are lost
lazy :: TransIO a -> Cloud a
lazy mx= onAll $ do
st <- get
return $ fromJust $ unsafePerformIO $ runStateT (runTrans mx) st >>= return .fst
-- | executes a non-serilizable action in the remote node, whose result can be used by subsequent remote invocations
fixRemote mx= do
r <- lazy mx
fixClosure
return r
-- | subsequent remote invocatioms will send logs to this closure. Therefore logs will be shorter.
--
-- Also, non serializable statements before it will not be re-executed
fixClosure= atRemote $ local $ async $ return ()
-- log the result a cloud computation. Like `loogged`, this erases all the log produced by computations
-- inside and substitute it for that single result when the computation is completed.
loggedc :: Loggable a => Cloud a -> Cloud a
loggedc (Cloud mx)= Cloud $ do
closRemote <- getState <|> return (Closure 0 )
(fixRemote :: Maybe LocalFixData) <- getData
logged mx <*** do setData closRemote
when (isJust fixRemote) $ setState (fromJust fixRemote)
loggedc' :: Loggable a => Cloud a -> Cloud a
loggedc' (Cloud mx)= Cloud $ do
fixRemote :: Maybe LocalFixData <- getData
logged mx <*** (when (isJust fixRemote) $ setState (fromJust fixRemote))
-- | the `Cloud` monad has no `MonadIO` instance. `lliftIO= local . liftIO`
lliftIO :: Loggable a => IO a -> Cloud a
lliftIO= local . liftIO
-- | `localIO = lliftIO`
localIO :: Loggable a => IO a -> Cloud a
localIO= lliftIO
-- | continue the execution in a new node
beamTo :: Node -> Cloud ()
beamTo node = wormhole node teleport
-- | execute in the remote node a process with the same execution state
forkTo :: Node -> Cloud ()
forkTo node= beamTo node <|> return()
-- | open a wormhole to another node and executes an action on it.
-- currently by default it keep open the connection to receive additional requests
-- and responses (streaming)
callTo :: Loggable a => Node -> Cloud a -> Cloud a
callTo node remoteProc= wormhole' node $ atRemote remoteProc
#ifndef ghcjs_HOST_OS
-- | A connectionless version of callTo for long running remote calls
callTo' :: (Show a, Read a,Typeable a) => Node -> Cloud a -> Cloud a
callTo' node remoteProc= do
mynode <- local $ getNodes >>= return . Prelude.head
beamTo node
r <- remoteProc
beamTo mynode
return r
#endif
-- | Within a connection to a node opened by `wormhole`, it run the computation in the remote node and return
-- the result back to the original node.
--
-- If `atRemote` is executed in the remote node, then the computation is executed in the original node
--
-- > wormhole node2 $ do
-- > t <- atRemote $ do
-- > r <- foo -- executed in node2
-- > s <- atRemote bar r -- executed in the original node
-- > baz s -- in node2
-- > bat t -- in the original node
atRemote :: Loggable a => Cloud a -> Cloud a
atRemote proc= loggedc' $ do
--modify $ \s -> s{execMode=Parallel}
teleport -- !> "teleport 1111"
modify $ \s -> s{execMode= if execMode s== Parallel then Parallel else Serial} -- modifyData' f1 Serial
{-
local $ noTrans $ do
cont <- get
let loop=do
chs <- liftIO $ readMVar $ children $ fromJust $ parent cont
tr ("THREADS ***************", length chs)
threadDelay 1000000
loop
liftIO $ forkIO loop
return()
-}
r <- loggedc $ proc <** modify (\s -> s{execMode= Remote}) -- setData Remote
teleport -- !> "teleport 2222"
return r
-- | Execute a computation in the node that initiated the connection.
--
-- if the sequence of connections is n1 -> n2 -> n3 then `atCallingNode $ atCallingNode foo` in n3
-- would execute `foo` in n1, -- while `atRemote $ atRemote foo` would execute it in n3
-- atCallingNode :: Loggable a => Cloud a -> Cloud a
-- atCallingNode proc= connectCaller $ atRemote proc
-- | synonymous of `callTo`
runAt :: Loggable a => Node -> Cloud a -> Cloud a
runAt= callTo
-- | run a single thread with that action for each connection created.
-- When the same action is re-executed within that connection, all the threads generated by the previous execution
-- are killed
--
-- > box <- foo
-- > r <- runAt node . local . single $ getMailbox box
-- > localIO $ print r
--
-- if foo return different mainbox indentifiers, the above code would print the
-- messages of the last one.
-- Without single, it would print the messages of all of them since each call would install a new `getMailBox` for each one of them
single :: TransIO a -> TransIO a
single f= do
cutExceptions
Connection{closChildren=rmap} <- getSData <|> error "single: only works within a connection"
mapth <- liftIO $ readIORef rmap
id <- liftIO $ f `seq` makeStableName f >>= return . hashStableName
case M.lookup id mapth of
Just tv -> liftIO $ killBranch' tv
Nothing -> return ()
tv <- get
f <** do
id <- liftIO $ makeStableName f >>= return . hashStableName
liftIO $ modifyIORef rmap $ \mapth -> M.insert id tv mapth
-- | run an unique continuation for each connection. The first thread that execute `unique` is
-- executed for that connection. The rest are ignored.
unique :: TransIO a -> TransIO a
unique f= do
Connection{closChildren=rmap} <- getSData <|> error "unique: only works within a connection. Use wormhole"
mapth <- liftIO $ readIORef rmap
id <- liftIO $ f `seq` makeStableName f >>= return . hashStableName
let mx = M.lookup id mapth
case mx of
Just _ -> empty
Nothing -> do
tv <- get
liftIO $ modifyIORef rmap $ \mapth -> M.insert id tv mapth
f
-- | A wormhole opens a connection with another node anywhere in a computation.
-- `teleport` uses this connection to translate the computation back and forth between the two nodes connected.
-- If the connection fails, it search the network for suitable relay nodes to reach the destination node.
wormhole node comp= do
onAll $ onException $ \(e@(ConnectionError "no connection" nodeerr)) ->
if nodeerr== node then do runCloud' $ findRelay node ; continue else return ()
wormhole' node comp
where
findRelay node = do
relaynode <- exploreNetUntil $ do
nodes <- local getNodes
let thenode= filter (== node) nodes
if not (null thenode) && isJust(connection $ Prelude.head thenode ) then return $ Prelude.head nodes else empty
local $ addNodes [node{nodeServices= {-nodeServices node ++ -} [[("relay", show (nodeHost (relaynode :: Node),nodePort relaynode ))]]}]
-- when the first teleport has been sent within a wormhole, the
-- log sent should be the segment not send in the previous teleport
newtype DialogInWormholeInitiated= DialogInWormholeInitiated Bool
-- | wormhole without searching for relay nodes.
wormhole' :: Loggable a => Node -> Cloud a -> Cloud a
wormhole' node (Cloud comp) = local $ Transient $ do
moldconn <- getData :: StateIO (Maybe Connection)
mclosure <- getData :: StateIO (Maybe Closure)
mdialog <- getData :: StateIO (Maybe ( Ref DialogInWormholeInitiated))
-- when (isJust moldconn) . setState $ ParentConnection (fromJust moldconn) mclosure
labelState $ "wormhole" <> BC.pack (show node)
log <- getLog
if not $ recover log
then runTrans $ (do
conn <- mconnect node
liftIO $ writeIORef (remoteNode conn) $ Just node
setData conn{synchronous= maybe False id $ fmap synchronous moldconn, calling= True}
setState $ Closure 0
newRState $ DialogInWormholeInitiated False
--lhls <- liftIO $ atomicModifyIORef (wormholes conn) $ \hls -> ((ref:hls),length hls)
--tr ("LENGTH HLS",lhls)
comp )
<*** do
when (isJust moldconn) . setData $ fromJust moldconn
when (isJust mclosure) . setData $ fromJust mclosure
when (isJust mdialog) . setData $ fromJust mdialog
-- <** is not enough since comp may be reactive
else do
-- tr "YES REC"
let conn = fromMaybe (error "wormhole: no connection in remote node") moldconn
setData $ conn{calling= False}
runTrans $ comp
<*** do when (isJust mclosure) . setData $ fromJust mclosure
-- #ifndef ghcjs_HOST_OS
-- type JSString= String
-- pack= id
-- #endif
data CloudException = CloudException Node IdClosure String deriving (Typeable, Show, Read)
instance Exception CloudException
-- | set remote invocations synchronous
-- this is necessary when data is transfered very fast from node to node in a stream non-deterministically
-- in order to keep the continuation of the calling node unchanged until the arrival of the response
-- since all the calls share a single continuation in the calling node.
--
-- If there is no response from the remote node, the streaming is interrupted
--
-- > main= keep $ initNode $ onBrowser $ do
-- > local $ setSynchronous True
-- > line <- local $ threads 0 $ choose[1..10::Int]
-- > localIO $ print ("1",line)
-- > atRemote $ localIO $ print line
-- > localIO $ print ("2", line)
setSynchronous :: Bool -> TransIO ()
setSynchronous sync= do
modifyData'(\con -> con{synchronous=sync}) (error "setSynchronous: no communication data")
return ()
-- set synchronous mode for remote calls within a cloud computation and also avoid unnecessary
-- thread creation
syncStream :: Cloud a -> Cloud a
syncStream proc= do
sync <- local $ do
Connection{synchronous= synchronous} <- modifyData'(\con -> con{synchronous=True}) err
return synchronous
Cloud $ threads 0 $ runCloud' proc <*** modifyData'(\con -> con{synchronous=sync}) err
where err= error "syncStream: no communication data"
teleport :: Cloud ()
teleport = do
modify $ \s -> s{execMode=if execMode s == Remote then Remote else Parallel}
local $ do
conn@Connection{connData=contype, synchronous=synchronous, localClosures= localClosures} <- getData
`onNothing` error "teleport: No connection defined: use wormhole"
-- onException $ \(e :: IOException ) -> do -- to retry the connection in case of failure
-- tr ("teleport:", e) -- should be three tries at most
-- liftIO $ writeIORef contype Nothing
-- mclose conn -- msend will open a new connection. move that open here?
-- continue
Transient $ do
labelState "teleport"
cont <- get
log <- getLog
if not $ recover log -- !> ("teleport rec,loc fulLog=",rec,log,fulLog)
-- if is not recovering in the remote node then it is active
then do
-- when a node call itself, there is no need of socket communications
ty <- liftIO $ readIORef contype
case ty of
Just Self -> runTrans $ do
modify $ \s -> s{execMode= Parallel} -- setData Parallel
abduce -- !> "SELF" -- call himself
liftIO $ do
remote <- readIORef $ remoteNode conn
writeIORef (myNode conn) $ fromMaybe (error "teleport: no connection?") remote
_ -> do
--read this Closure
DialogInWormholeInitiated initiated <- getRData `onNothing` return(DialogInWormholeInitiated True)
--detecta si ya ha enviado en un mismo wormhole
-- como detectar eso sin usar Closure?
-- un Rflag en estado ejecución
--tr("INITIATED",initiated,closRemote/=0)
(closRemote',tosend) <- if initiated
-- for localFix
then do
Closure closRemote <- getData `onNothing` return (Closure 0 )
tr ("REMOTE CLOSURE",closRemote)
return (closRemote, buildLog log)
else do
mfix <- getData -- mirar globalFix
tr ("mfix", mfix)
let droplog Nothing= return (0, fulLog log)
droplog (Just localfix)= do
sent <- liftIO $ atomicModifyIORef' (fixedConnections localfix) $ \list -> do
let n= idConn conn
if n `Prelude.elem` list
then (list, True)
else (n:list,False)
tr ("LOCALFIXXXXXXXXXX",localfix)
let dropped= lazyByteString $ BS.drop (fromIntegral $ lengthFix localfix) $ toLazyByteString $ fulLog log
if sent then return (closure localfix, dropped)
else if isService localfix then return (0, dropped)
else droplog $ prevFix localfix -- look for other previous closure sent
droplog mfix
let closLocal= hashClosure log
map <- liftIO $ readMVar localClosures
let mr = M.lookup closLocal map
pair <- case mr of
-- for synchronous streaming
Just (chs,clos,mvar,_) -> do
when synchronous $ liftIO $ takeMVar mvar
-- tr ("TELEPORT removing", (Data.List.length $unsafePerformIO $ readMVar chs)-1)
--ths <- liftIO $ readMVar (children cont)
--liftIO $ when (length ths > 1)$ mapM_ (killChildren . children) $ tail ths
--runTrans $ msend conn $ SLast (ClosureData closRemote' closLocal mempty)
--no se llama se hace asincronamente en el blucle loopclosures
return (children ${- fromJust $ parent -} cont,clos,mvar,cont)
_ -> liftIO $ do mv <- newEmptyMVar; return ( children $ fromJust $ parent cont,closRemote',mv,cont)
liftIO $ modifyMVar_ localClosures $ \map -> return $ M.insert closLocal pair map
-- The log sent is in the order of execution. log is in reverse order
-- send log with closure ids at head
--tr ("MSEND --------->------>", SMore (unsafePerformIO $ readIORef $ remoteNode conn,closRemote',closLocal,toLazyByteString tosend))
runTrans $ msend conn $ SMore $ ClosureData closRemote' closLocal tosend
return Nothing
else return $ Just ()
{- |
One problem of forwarding closures for streaming is that it could transport not only the data but extra information that reconstruct the closure in the destination node. In a single in-single out interaction It may not be a problem, but think, for example, when I have to synchronize N editors by forwarding small modifications, or worst of all, when transmitting packets of audio or video. But the size of the closure, that is, the amount of variables that I have to transport increases when the code is more complex. But transient build closures upon closures, so It has to send only what has changed since the last interaction.
In one-to-one interactions whithin a wormhole, this is automatic, but when there are different wormholes involved, it is necessary
to tell explicitly what is the closure that will continue the execution. this is what `localFix` does. otherwise it will use the closure 0.
> main= do
> filename <- local input
> source <- atServer $ local $ readFile filename
> local $ render source inEditor
> -- send upto here one single time please, so I only stream the deltas
> localFix
> delta <- react onEachChange
> forallNodes $ update delta
if forwardChanges send to all the nodes editing the document, the data necessary to reconstruct the
closure would include even the source code of the file on EACH change.
Fortunately it is possible to fix a closure that will not change in all the remote nodes so after that,
I only have to send the only necessary variable, the delta. This is as efficient as an hand-made
socket write/forkThread/readSocket loop for each node.
-}
localFix= localFixServ False False
type ConnectionId= Int
type HasClosed= Bool
-- for each connection, the list of closures fixed and the list of connections which created that closure in the remote node
-- unificar para todas las conexiones
-- pero como se sabe si una closure global aplica a un envio despues de una desconexion?
-- el programa tiene que pasar por esa globalClosure,
-- si solo se ha perdido la conexión, tiene estado y puede utilizarla
-- si ha rearrancado, ha ejecutado hasta ahi y tiene que reconstruir su estado de localFix
globalFix = unsafePerformIO $ newIORef (M.empty :: M.Map ConnectionId (HasClosed,[(IdClosure, IORef [ConnectionId ])]))
-- how to signal that was closed?
data LocalFixData= LocalFixData{ isService :: Bool
, lengthFix :: Int
, closure :: Int
, fixedConnections :: IORef [ConnectionId] -- List of connections that created
-- that closure in the remote node
, prevFix :: Maybe LocalFixData} deriving Show
instance Show a => Show (IORef a) where
show r= show $ unsafePerformIO $ readIORef r
-- data LocalFixData= LocalFixData Bool Int Int (IORef (M.Map Int Int))
-- first flag=True assumes that the localFix closure has been created otherwise
-- the first request invoke closure 0 and create the localFix closure
-- further request will invoque this closure
--
-- the second flag creates a closure that is invoked ever, even if localfix is re-executed.
--If this second flag is false,
-- a reexecution of localFix will recreate the remote closure, perhaps with different variables.
localFixServ isService isGlobal= Cloud $ noTrans $ do
log <- getLog
Connection{..} <- getData `onNothing` error "teleport: No connection set: use initNode"
if recover log
then do
cont <- get
mv <- liftIO newEmptyMVar
liftIO $ modifyMVar_ localClosures $ \map -> return $ M.insert (hashClosure log) ( children $ fromJust $ parent cont,0,mv,cont) map
else do
mprevFix <- getData
ref <- liftIO $ if not $ isGlobal then newIORef [] else do
map <- readIORef globalFix
return $ do
(_,l) <- M.lookup idConn map
lookup (hashClosure log) l
`onNothing` do
ref <- newIORef []
modifyIORef globalFix $ \map ->
let (closed,l)= fromMaybe (False,[]) $ M.lookup idConn map
in M.insert idConn (closed,(hashClosure log, ref):l) map
return ref
mmprevFix <- liftIO $ readIORef ref >>= \l -> return $ if Prelude.null l then Nothing else mprevFix
let newfix =LocalFixData{ isService = isService
, lengthFix = fromIntegral $ BS.length $ toLazyByteString $ fulLog log
, closure = hashClosure log
, fixedConnections = ref
, prevFix = mmprevFix}
setState newfix
!> ("SET LOCALFIX", newfix )
-- | forward exceptions back to the calling node
reportBack :: TransIO ()
reportBack= onException $ \(e :: SomeException) -> do
conn <- getData `onNothing` error "reportBack: No connection defined: use wormhole"
Closure closRemote <- getData `onNothing` error "teleport: no closRemote"
node <- getMyNode
let msg= SError $ toException $ ErrorCall $ show $ show $ CloudException node closRemote $ show e
msend conn msg !> "MSEND"
-- | copy a session data variable from the local to the remote node.
-- If there is none set in the local node, The parameter is the default value.
-- In this case, the default value is also set in the local node.
copyData def = do
r <- local getSData <|> return def
onAll $ setData r
return r
-- | execute a Transient action in each of the nodes connected.
--
-- The response of each node is received by the invoking node and processed by the rest of the procedure.
-- By default, each response is processed in a new thread. To restrict the number of threads
-- use the thread control primitives.
--
-- this snippet receive a message from each of the simulated nodes:
--
-- > main = keep $ do
-- > let nodes= map createLocalNode [2000..2005]
-- > addNodes nodes
-- > (foldl (<|>) empty $ map listen nodes) <|> return ()
-- >
-- > r <- clustered $ do
-- > Connection (Just(PortNumber port, _, _, _)) _ <- getSData
-- > return $ "hi from " ++ show port++ "\n"
-- > liftIO $ putStrLn r
-- > where
-- > createLocalNode n= createNode "localhost" (PortNumber n)
clustered :: Loggable a => Cloud a -> Cloud a
clustered proc= callNodes (<|>) empty proc
-- A variant of `clustered` that wait for all the responses and `mappend` them
mclustered :: (Monoid a, Loggable a) => Cloud a -> Cloud a
mclustered proc= callNodes (<>) mempty proc
callNodes op init proc= loggedc' $ do
nodes <- local getEqualNodes
callNodes' nodes op init proc
callNodes' nodes op init proc= loggedc' $ Prelude.foldr op init $ Prelude.map (\node -> runAt node proc) nodes
-----
#ifndef ghcjs_HOST_OS
sendRawRecover con r= do
c <- liftIO $ readIORef $ connData con
con' <- case c of
Nothing -> do
tr "CLOSED CON"
n <- liftIO $ readIORef $ remoteNode con
case n of
Nothing -> error "connection closed by caller"
Just node -> do
r <- mconnect' node
return r
Just _ -> return con
sendRaw con' r
`whileException` \(SomeException _)->
liftIO$ writeIORef (connData con) Nothing
sendRaw con r= do
let blocked= isBlocked con
c <- liftIO $ readIORef $ connData con
liftIO $ modifyMVar_ blocked $ const $ do
tr ("sendRaw",r)
case c of
Just (Node2Web sconn ) -> liftIO $ WS.sendTextData sconn r
Just (Node2Node _ sock _) ->
SBS.sendMany sock (BL.toChunks r )
Just (TLSNode2Node ctx ) ->
sendTLSData ctx r
_ -> error "No connection stablished"
TOD time _ <- getClockTime
return $ Just time
{-
sendRaw (Connection _ _ _ (Just (Node2Web sconn )) _ _ _ _ _ _ _) r=
liftIO $ WS.sendTextData sconn r -- !> ("NOde2Web",r)
sendRaw (Connection _ _ _ (Just (Node2Node _ sock _)) _ _ blocked _ _ _ _) r=
liftIO $ withMVar blocked $ const $ SBS.sendMany sock
(BL.toChunks r ) -- !> ("NOde2Node",r)
sendRaw (Connection _ _ _(Just (TLSNode2Node ctx )) _ _ blocked _ _ _ _) r=
liftIO $ withMVar blocked $ const $ sendTLSData ctx r !> ("TLNode2Web",r)
-}
#else
sendRaw con r= do
c <- liftIO $ readIORef $ connData con
case c of
Just (Web2Node sconn) ->
JavaScript.Web.WebSocket.send r sconn
_ -> error "No connection stablished"
{-
sendRaw (Connection _ _ _ (Just (Web2Node sconn)) _ _ blocked _ _ _ _) r= liftIO $
withMVar blocked $ const $ JavaScript.Web.WebSocket.send r sconn -- !> "MSEND SOCKET"
-}
#endif
data NodeMSG= ClosureData IdClosure IdClosure Builder deriving (Read, Show)
instance Loggable NodeMSG where
serialize (ClosureData clos clos' build)= intDec clos <> "/" <> intDec clos' <> "/" <> build
deserialize= ClosureData <$> (int <* tChar '/') <*> (int <* tChar '/') <*> restOfIt
where
restOfIt= lazyByteString <$> giveParseString
instance Show Builder where
show b= BS.unpack $ toLazyByteString b
instance Read Builder where
readsPrec _ str= [(lazyByteString $ BS.pack $ read str,"")]
instance Loggable a => Loggable (StreamData a) where
serialize (SMore x)= byteString "SMore/" <> serialize x
serialize (SLast x)= byteString "SLast/" <> serialize x
serialize SDone= byteString "SDone"
serialize (SError e)= byteString "SError/" <> serialize e
deserialize = smore <|> slast <|> sdone <|> serror
where
smore = symbol "SMore/" >> (SMore <$> deserialize)
slast = symbol "SLast/" >> (SLast <$> deserialize)
sdone = symbol "SDone" >> return SDone
serror= symbol "SError/" >> (SError <$> deserialize)
{-
en msend escribir la longitud del paquete y el paquete
en mread cojer la longitud y el mensaje
data Packet= Packet Int BS.ByteString deriving (Read,Show)
instance Loggable Packet where
serialize (Packet len msg) = intDec len <> lazyByteString msg
deserialize = do
len <- int
Packet len <$> tTake (fromIntegral len)
-}
msend :: Connection -> StreamData NodeMSG -> TransIO ()
-- msend (Connection _ _ _ (Just Self) _ _ _ _ _ _ _) r= return ()
#ifndef ghcjs_HOST_OS
msend con r= do
tr ("MSEND --------->------>", r)
c <- liftIO $ readIORef $ connData con
con' <- case c of
Nothing -> do
tr "CLOSED CON"
n <- liftIO $ readIORef $ remoteNode con
case n of
Nothing -> error "connection closed by caller"
Just node -> do
r <- mconnect node
return r
--case r of
-- Nothing -> error $ "can not reconnect with " ++ show n
-- Just c -> return c
Just _ -> return con
let blocked= isBlocked con'
c <- liftIO $ readIORef $ connData con'
let bs = toLazyByteString $ serialize r
do
--liftIO $ do
case c of
Just (TLSNode2Node ctx) -> liftIO $ modifyMVar_ blocked $ const $ do
tr "TLSSSSSSSSSSS SEND"
sendTLSData ctx $ toLazyByteString $ int64Dec $ BS.length bs
sendTLSData ctx bs
TOD time _ <- getClockTime
return $ Just time
Just (Node2Node _ sock _) -> liftIO $ modifyMVar_ blocked $ const $ do
tr "NODE2NODE SEND"
SBSL.send sock $ toLazyByteString $ int64Dec $ BS.length bs
SBSL.sendAll sock bs
TOD time _ <- getClockTime
return $ Just time
Just (HTTP2Node _ sock _) -> liftIO $ modifyMVar_ blocked $ const $ do
tr "HTTP2NODE SEND"
SBSL.sendAll sock $ bs <> "\r\n"
TOD time _ <- getClockTime
return $ Just time
Just (HTTPS2Node ctx) -> liftIO $ modifyMVar_ blocked $ const $ do
tr "HTTPS2NODE SEND"
sendTLSData ctx $ bs <> "\r\n"
TOD time _ <- getClockTime
return $ Just time
Just (Node2Web sconn) -> do
tr "NODE2WEB"
-- {-withMVar blocked $ const $ -} WS.sendTextData sconn $ serialize r -- BS.pack (show r) !> "websockets send"
liftIO $ do
let bs = toLazyByteString $ serialize r
-- WS.sendTextData sconn $ toLazyByteString $ int64Dec $ BS.length bs
tr "ANTES SEND"
WS.sendTextData sconn bs -- !> ("N2N SEND", bd)
tr "AFTER SEND"
Just Self -> error "connection to the same node shouldn't happen, file a bug please"
_ -> error "msend out of connection context: use wormhole to connect"
-- return()
{-
msend (Connection _ _ _ (Just (Node2Node _ sock _)) _ _ blocked _ _ _ _) r=do
liftIO $ withMVar blocked $ const $ do
let bs = toLazyByteString $ serialize r
SBSL.send sock $ toLazyByteString $ int64Dec $ BS.length bs
SBSL.sendAll sock bs -- !> ("N2N SEND", bd)
msend (Connection _ _ _ (Just (HTTP2Node _ sock _)) _ _ blocked _ _ _ _) r=do
liftIO $ withMVar blocked $ const $ do
let bs = toLazyByteString $ serialize r
let len= BS.length bs
lenstr= toLazyByteString $ int64Dec $ len
SBSL.send sock $ "HTTP/1.0 200 OK\r\nContent-Type: text/html\r\nContent-Length: "
<> lenstr
-- <>"\r\n" <> "Set-Cookie:" <> "cookie=" <> cook -- <> "\r\n"
<>"\r\n\r\n"
SBSL.sendAll sock bs -- !> ("N2N SEND", bd)
msend (Connection _ _ _ (Just (TLSNode2Node ctx)) _ _ _ _ _ _ _) r= liftIO $ do
let bs = toLazyByteString $ serialize r
sendTLSData ctx $ toLazyByteString $ int64Dec $ BS.length bs
sendTLSData ctx bs -- !> "TLS SEND"
msend (Connection _ _ _ (Just (Node2Web sconn)) _ _ _ _ _ _ _) r=
-- {-withMVar blocked $ const $ -} WS.sendTextData sconn $ serialize r -- BS.pack (show r) !> "websockets send"
liftIO $ do
let bs = toLazyByteString $ serialize r
WS.sendTextData sconn $ toLazyByteString $ int64Dec $ BS.length bs
WS.sendTextData sconn bs -- !> ("N2N SEND", bd)
-}
#else
msend con r= do
tr ("MSEND --------->------>", r)
let blocked= isBlocked con
c <- liftIO $ readIORef $ connData con
case c of
Just (Web2Node sconn) -> liftIO $ do
tr "MSEND BROWSER"
--modifyMVar_ (isBlocked con) $ const $ do
let bs = toLazyByteString $ serialize r
JavaScript.Web.WebSocket.send (JS.pack $ BS.unpack bs) sconn -- TODO OPTIMIZE THAT!
tr "AFTER MSEND"
--TOD time _ <- getClockTime
--return $ Just time
_ -> error "msend out of connection context: use wormhole to connect"
{-
msend (Connection _ _ remoten (Just (Web2Node sconn)) _ _ blocked _ _ _ _) r= liftIO $ do
withMVar blocked $ const $ do -- JavaScript.Web.WebSocket.send (serialize r) sconn -- (JS.pack $ show r) sconn !> "MSEND SOCKET"
let bs = toLazyByteString $ serialize r
JavaScript.Web.WebSocket.send (toLazyByteString $ int64Dec $ BS.length bs) sconn
JavaScript.Web.WebSocket.send bs sconn
-}
#endif
#ifdef ghcjs_HOST_OS
mread con= do
labelState "mread"
sconn <- liftIO $ readIORef $ connData con
case sconn of
Just (Web2Node sconn) -> wsRead sconn
Nothing -> error "connection not opened"
--mread (Connection _ _ _ (Just (Web2Node sconn)) _ _ _ _ _ _ _)= wsRead sconn
wsRead :: Loggable a => WebSocket -> TransIO a
wsRead ws= do
dat <- react (hsonmessage ws) (return ())
tr "received"
case JM.getData dat of
JM.StringData ( text) -> do
setParseString $ BS.pack . JS.unpack $ text -- TODO OPTIMIZE THAT
--len <- integer
tr ("Browser webSocket read", text) !> "<------<----<----<------"
deserialize -- return (read' $ JS.unpack str)
JM.BlobData blob -> error " blob"
JM.ArrayBufferData arrBuffer -> error "arrBuffer"
wsOpen :: JS.JSString -> TransIO WebSocket
wsOpen url= do
ws <- liftIO $ js_createDefault url -- !> ("wsopen",url)
react (hsopen ws) (return ()) -- !!> "react"
return ws -- !!> "AFTER ReACT"
foreign import javascript safe
"window.location.hostname"
js_hostname :: JSVal
foreign import javascript safe
"window.location.pathname"
js_pathname :: JSVal
foreign import javascript safe
"window.location.protocol"
js_protocol :: JSVal
foreign import javascript safe
"(function(){var res=window.location.href.split(':')[2];if (res === undefined){return 80} else return res.split('/')[0];})()"
js_port :: JSVal
foreign import javascript safe
"$1.onmessage =$2;"
js_onmessage :: WebSocket -> JSVal -> IO ()
getWebServerNode :: TransIO Node
getWebServerNode = liftIO $ do
h <- fromJSValUnchecked js_hostname
p <- fromIntegral <$> (fromJSValUnchecked js_port :: IO Int)
createNode h p
hsonmessage ::WebSocket -> (MessageEvent ->IO()) -> IO ()
hsonmessage ws hscb= do
cb <- makeCallback1 MessageEvent hscb
js_onmessage ws cb
foreign import javascript safe
"$1.onopen =$2;"
js_open :: WebSocket -> JSVal -> IO ()
foreign import javascript safe
"$1.readyState"
js_readystate :: WebSocket -> Int
newtype OpenEvent = OpenEvent JSVal deriving Typeable
hsopen :: WebSocket -> (OpenEvent ->IO()) -> IO ()
hsopen ws hscb= do
cb <- makeCallback1 OpenEvent hscb
js_open ws cb
makeCallback1 :: (JSVal -> a) -> (a -> IO ()) -> IO JSVal
makeCallback1 f g = do
Callback cb <- CB.syncCallback1 CB.ContinueAsync (g . f)
return cb
-- makeCallback :: IO () -> IO ()
makeCallback f = do
Callback cb <- CB.syncCallback CB.ContinueAsync f
return cb
foreign import javascript safe
"new WebSocket($1)" js_createDefault :: JS.JSString -> IO WebSocket
#else
mread conn= do
cc <- liftIO $ readIORef $ connData conn
case cc of
Just (Node2Node _ _ _) -> parallelReadHandler conn
Just (TLSNode2Node _ ) -> parallelReadHandler conn
-- the rest of the cases are managed by listenNew
-- Just (Node2Web sconn ) -> do
-- ss <- parallel $ receiveData' conn sconn
-- case ss of
-- SDone -> empty
-- SMore s -> do
-- tr ("WEBSOCKET RECEIVED", s)
-- setParseString s
-- --integer
-- TOD t _ <- liftIO getClockTime
-- liftIO $ modifyMVar_ (isBlocked conn) $ const $ Just <$> return t
-- deserialize
-- where
-- perform timeouts and cleanup of the server when connections
receiveData' a b= NWS.receiveData b
-- receiveData' :: Connection -> NWS.Connection -> IO BS.ByteString
-- receiveData' c conn = do
-- msg <- WS.receive conn
-- tr ("RECEIVED",msg)
-- case msg of
-- NWS.DataMessage _ _ _ am -> return $ NWS.fromDataMessage am
-- NWS.ControlMessage cm -> case cm of
-- NWS.Close i closeMsg -> do
-- hasSentClose <- readIORef $ WS.connectionSentClose conn
-- unless hasSentClose $ WS.send conn msg
-- writeIORef (connData c) Nothing
-- cleanConnectionData c
-- empty
-- NWS.Pong _ -> do
-- TOD t _ <- liftIO getClockTime
-- liftIO $ modifyMVar_ (isBlocked c) $ const $ Just <$> return t
-- receiveData' c conn
-- --NWS.connectionOnPong (WS.connectionOptions conn)
-- --NWS.receiveDataMessage conn
-- NWS.Ping pl -> do
-- TOD t _ <- liftIO getClockTime
-- liftIO $ modifyMVar_ (isBlocked c) $ const $ Just <$> return t
-- WS.send conn (NWS.ControlMessage (NWS.Pong pl))
-- receiveData' c conn
-- --WS.receiveDataMessage conn
{-
mread (Connection _ _ _ (Just (Node2Node _ _ _)) _ _ _ _ _ _ _) = parallelReadHandler -- !> "mread"
mread (Connection _ _ _ (Just (TLSNode2Node _ )) _ _ _ _ _ _ _) = parallelReadHandler
-- parallel $ do
-- s <- recvTLSData ctx
-- return . read' $ BC.unpack s
mread (Connection _ _ _ (Just (Node2Web sconn )) _ _ _ _ _ _ _)= do
s <- waitEvents $ WS.receiveData sconn
setParseString s
integer
deserialize
{-
parallel $ do
s <- WS.receiveData sconn
return . read' $ BS.unpack s
!> ("WS MREAD RECEIVED ----<----<------<--------", s)
-}
-}
many' p= p <|> many' p
parallelReadHandler :: Loggable a => Connection -> TransIO (StreamData a)
parallelReadHandler conn= do
onException $ \(e:: IOError) -> empty
many' extractPacket
where
extractPacket= do
len <- integer <|> (do s <- getParseBuffer; if BS.null s then empty else error $ show $ ("malformed data received: expected Int, received: ", BS.take 10 s))
str <- tTake (fromIntegral len)
tr ("MREAD <-------<-------",str)
TOD t _ <- liftIO $ getClockTime
liftIO $ modifyMVar_ (isBlocked conn) $ const $ Just <$> return t
abduce
setParseString str
deserialize
{-
parallelReadHandler= do
str <- giveParseString :: TransIO BS.ByteString
r <- choose $ readStream str
return r
!> ("parallel read handler read", r)
!> "<-------<----------<--------<----------"
where
readStream :: (Typeable a, Read a) => BS.ByteString -> [StreamData a]
readStream s= readStream1 $ BS.unpack s
where
readStream1 s=
let [(x,r)] = reads s
in x : readStream1 r
-}
getWebServerNode :: TransIO Node
getWebServerNode = getNodes >>= return . Prelude.head
#endif
mclose :: MonadIO m => Connection -> m ()
#ifndef ghcjs_HOST_OS
mclose con= do
--c <- liftIO $ readIORef $ connData con
c <- liftIO $ atomicModifyIORef (connData con) $ \c -> (Nothing,c)
case c of
Just (TLSNode2Node ctx) -> liftIO $ withMVar (isBlocked con) $ const $ liftIO $ tlsClose ctx
Just (Node2Node _ sock _ ) -> liftIO $ withMVar (isBlocked con) $ const $ liftIO $ NS.close sock !> "SOCKET CLOSE"
Just (Node2Web sconn ) -> liftIO $ WS.sendClose sconn ("closemsg" :: BS.ByteString) !> "WEBSOCkET CLOSE"
_ -> return()
cleanConnectionData con
{-
mclose (Connection _ _ _
(Just (Node2Node _ sock _ )) _ _ _ _ _ _ _)= liftIO $ NS.close sock
mclose (Connection _ _ _
(Just (Node2Web sconn ))
_ _ _ _ _ _ _)=
liftIO $ WS.sendClose sconn ("closemsg" :: BS.ByteString)
-}
#else
mclose con= do
--c <- liftIO $ readIORef $ connData con
c <- liftIO $ atomicModifyIORef (connData con) $ \c -> (Nothing,c)
case c of
Just (Web2Node sconn)->
liftIO $ JavaScript.Web.WebSocket.close Nothing Nothing sconn
{-
mclose (Connection _ _ _ (Just (Web2Node sconn)) _ _ blocked _ _ _ _)=
liftIO $ JavaScript.Web.WebSocket.close Nothing Nothing sconn
-}
#endif
#ifndef ghcjs_HOST_OS
-- connection cookie
rcookie= unsafePerformIO $ newIORef $ BS.pack "cookie1"
#endif
conSection= unsafePerformIO $ newMVar ()
exclusiveCon mx= do
liftIO $ takeMVar conSection
r <- mx
liftIO $ putMVar conSection ()
return r
-- check for cached connection and return it, otherwise tries to connect with connect1 without cookie check
mconnect' :: Node -> TransIO Connection
mconnect' node'= exclusiveCon $ do
conn <- do
node <- fixNode node'
nodes <- getNodes
let fnode = filter (==node) nodes
case fnode of
[] -> mconnect1 node -- !> "NO NODE"
(node'@(Node _ _ pool _):_) -> do
plist <- liftIO $ readMVar $ fromJust pool
case plist of -- !> ("length", length plist,nodePort node) of
(handle:_) -> do
c <- liftIO $ readIORef $ connData handle
if isNothing c -- was closed by timeout
then mconnect1 node
else return handle
!> ("REUSED!", nodeHost node, nodePort node)
_ -> do
delNodes [node]
r <- mconnect1 node
tr "after mconnect1"
return r
-- ctx <- liftIO $ readIORef $ istream conn
-- modify $ \s -> s{parseContext= ctx}
-- liftIO $ print "SET PARSECONTEXT"
setState conn
return conn
#ifndef ghcjs_HOST_OS
-- effective connect trough different methods
mconnect1 (node@(Node host port _ services ))= do
return () !> ("MCONNECT1",host,port,isTLSIncluded)
{-
onException $ \(ConnectionError msg node) -> do
liftIO $ do
putStr msg
putStr " connecting "
print node
continue
empty
-}
let types=mapMaybe (lookup "type") services -- need to look in all services
needTLS <- if "HTTP" `elem` types then return False
else if "HTTPS" `elem` types then
if not isTLSIncluded then error "no 'initTLS'. This is necessary for https connections. Please include it: main= do{ initTLS; keep ...."
else return True
else return isTLSIncluded
-- case lookup "type" services of
-- Just "HTTP" -> return False;
-- Just "HTTPS" ->
-- if not isTLSIncluded then error "no 'initTLS'. This is necessary for https connections. Please include it: main= do{ initTLS; keep ...."
-- else return True
-- _ -> return isTLSIncluded
tr ("NEED TLS",needTLS)
(conn,parseContext) <- checkSelf node <|>
timeout 10000000 (connectNode2Node host port needTLS) <|>
timeout 1000000 (connectWebSockets host port needTLS) <|>
timeout 1000000 (checkRelay needTLS) <|>
(throw $ ConnectionError "no connection" node)
setState conn
modify $ \s -> s{execMode=Serial,parseContext= parseContext}
-- "write node connected in the connection"
liftIO $ writeIORef (remoteNode conn) $ Just node
-- "write connection in the node"
liftIO $ modifyMVar_ (fromJust $ connection node) . const $ return [conn]
addNodes [node]
return conn
where
checkSelf node= do
tr "CHECKSELF"
node' <- getMyNodeMaybe
guard $ isJust (connection node')
v <- liftIO $ readMVar (fromJust $ connection node') -- to force connection in case of calling a service of itself
tr "IN CHECKSELF"
if node /= node' || null v
then empty
else do
conn<- case connection node of
Nothing -> error "checkSelf error"
Just ref -> do
rnode <- liftIO $ newIORef node'
cdata <- liftIO $ newIORef $ Just Self
conn <- defConnection >>= \c -> return c{myNode= rnode, connData= cdata}
liftIO $ withMVar ref $ const $ return [conn]
return conn
return (conn, noParseContext)
timeout t proc= do
r <- collect' 1 t proc
case r of
[] -> empty !> "TIMEOUT EMPTY"
mr:_ -> case mr of
Nothing -> throw $ ConnectionError "Bad cookie" node
Just r -> return r
checkRelay needTLS= do
case lookup "relay" $ map head (nodeServices node) of
Nothing -> empty -- !> "NO RELAY"
Just relayinfo -> do
let (h,p)= read relayinfo
connectWebSockets1 h p ("/relay/" ++ h ++ "/" ++ show p ++ "/") needTLS
connectSockTLS host port needTLS= do
return () !> "connectSockTLS"
let size=8192
c@Connection{myNode=my,connData=rcdata} <- getSData <|> defConnection
tr "BEFORE HANDSHAKE"
sock <- liftIO $ connectTo' size host $ PortNumber $ fromIntegral port
let cdata= (Node2Node u sock (error $ "addr: outgoing connection"))
cdata' <- liftIO $ readIORef rcdata
--input <- liftIO $ SBSL.getContents sock
-- let pcontext= ParseContext (do mclose c; return SDone) input (unsafePerformIO $ newIORef False)
pcontext <- makeParseContext $ SBSL.recv sock 4096
conn' <- if isNothing cdata' -- lost connection, reconnect
then do
liftIO $ writeIORef rcdata $ Just cdata
liftIO $ writeIORef (istream c) pcontext
return c !> "RECONNECT"
else do
c <- defConnection
rcdata' <- liftIO $ newIORef $ Just cdata
liftIO $ writeIORef (istream c) pcontext
return c{myNode=my,connData= rcdata'} !> "CONNECT"
setData conn'
--modify $ \s ->s{parseContext=ParseContext (do NS.close sock ; return SDone) input} --throw $ ConnectionError "connection closed" node) input}
modify $ \s ->s{execMode=Serial,parseContext=pcontext}
--modify $ \s ->s{execMode=Serial,parseContext=ParseContext (SMore . BL.fromStrict <$> recv sock 1000) mempty}
when (isTLSIncluded && needTLS) $ maybeClientTLSHandshake host sock mempty
connectNode2Node host port needTLS= do
-- onException $ \(e :: SomeException) -> empty
tr "NODE 2 NODE"
mproxy <- getHTTProxyParams needTLS
let (upass,h',p) = case (mproxy) of
Just p -> p
_ -> ("",BS.pack host,port)
h= BS.unpack h'
if (isLocal host || h == host && p == port) then
connectSockTLS h p needTLS
else do
let connect =
"CONNECT "<> pk host <> ":" <> pk (show port) <> " HTTP/1.1\r\n" <>
"Host: "<> pk host <> ":" <> BS.pack (show port) <> "\r\n" <>
"User-Agent: transient\r\n" <>
(if BS.null upass then "" else "Proxy-Authorization: Basic " <> (B64.encode upass)<> "\r\n") <>
"Proxy-Connection: Keep-Alive\r\n" <>
"\r\n"
tr connect
connectSockTLS h p False
conn <- getSData <|> error "mconnect: no connection data"
sendRaw conn $ connect
first@(vers,code,_) <- getFirstLineResp -- tTakeUntilToken (BS.pack "\r\n\r\n")
tr ("PROXY RESPONSE=",first)
guard (BC.head code== '2')
<|> do
headers <- getHeaders
Raw body <- parseBody headers
error $ show (headers,body) -- decode the body and print
when (isTLSIncluded && needTLS) $ do
Just(Node2Node{socket=sock}) <- liftIO $ readIORef $ connData conn
maybeClientTLSHandshake h sock mempty
conn <- getSData <|> error "mconnect: no connection data"
--mynode <- getMyNode
parseContext <- gets parseContext
return $ Just(conn,parseContext)
connectWebSockets host port needTLS= connectWebSockets1 host port "/" needTLS
connectWebSockets1 host port verb needTLS= do
-- onException $ \(e :: SomeException) -> empty
tr "WEBSOCKETS"
connectSockTLS host port needTLS -- a new connection
never <- liftIO $ newEmptyMVar :: TransIO (MVar ())
conn <- getSData <|> error "connectWebSockets: no connection"
stream <- liftIO $ makeWSStreamFromConn conn
co <- liftIO $ readIORef rcookie
let hostport= host++(':': show port)
headers= [("cookie", "cookie=" <> BS.toStrict co)] -- if verb =="/" then [("Host",fromString hostport)] else []
onException $ \(NWS.CloseRequest code msg) -> do
conn <- getSData
cleanConnectionData conn
-- throw $ ConnectionError (BS.unpack msg) node
empty
wscon <- react (NWS.runClientWithStream stream hostport verb
WS.defaultConnectionOptions headers)
(takeMVar never)
msg <- liftIO $ WS.receiveData wscon
tr "WS RECEIVED"
case msg of
("OK" :: BS.ByteString) -> do
tr "return connectWebSockets"
cdata <- liftIO $ newIORef $ Just $ (Node2Web wscon)
return $ Just (conn{connData= cdata}, noParseContext)
_ -> do tr "RECEIVED CLOSE"; liftIO $ WS.sendClose wscon ("" ::BS.ByteString); return Nothing
isLocal:: String ->Bool
isLocal host= host=="localhost" ||
(or $ map (flip isPrefixOf host)
["0.0","10.","100", "127", "169", "172", "192", "198", "203"]) ||
isAlphaNum (head host) && not ('.' `elem` host) -- is not a host address with dot inside: www.host.com
-- >>> isLocal "titan"
-- True
--
makeParseContext rec= liftIO $ do
done <- newIORef False
let receive= liftIO $ do
d <- readIORef done
if d then return SDone
else (do
r<- rec
if BS.null r then liftIO $ do writeIORef done True; return SDone
else return $ SMore r)
`catch` \(SomeException e) -> do liftIO $ writeIORef done True
putStr "Parse: "
print e
return SDone
return $ ParseContext receive mempty done
#else
mconnect1 (node@(Node host port (Just pool) _))= do
conn <- getSData <|> error "connect: listen not set for this node"
if nodeHost node== "webnode"
then do
liftIO $ writeIORef (connData conn) $ Just Self
return conn
else do
ws <- connectToWS host $ PortNumber $ fromIntegral port
-- !> "CONNECTWS"
liftIO $ writeIORef (connData conn) $ Just (Web2Node ws)
-- !> ("websocker CONNECION")
let parseContext =
ParseContext (error "parsecontext not available in the browser")
"" (unsafePerformIO $ newIORef False)
chs <- liftIO $ newIORef M.empty
let conn'= conn{closChildren= chs}
liftIO $ modifyMVar_ pool $ \plist -> return $ conn':plist
return conn'
#endif
u= undefined
data ConnectionError= ConnectionError String Node deriving (Show , Read)
instance Exception ConnectionError
-- check for cached connect, if not, it connects and check cookie with mconnect2
mconnect node'= do
node <- fixNode node'
nodes <- getNodes
let fnode = filter (==node) nodes
case fnode of
[] -> mconnect2 node -- !> "NO NODE"
[node'@(Node _ _ pool _)] -> do
plist <- liftIO $ readMVar $ fromJust pool
case plist of -- !> ("length", length plist,nodePort node) of
(handle:_) -> do
c <- liftIO $ readIORef $ connData handle
if isNothing c -- was closed by timeout
then mconnect2 node
else return handle
-- !> ("REUSED!", node)
_ -> do
delNodes [node]
mconnect2 node
where
-- connect and check for connection cookie among nodes
mconnect2 node= do
conn <- mconnect1 node
-- `catcht` \(e :: SomeException) -> empty
cd <- liftIO $ readIORef $ connData conn
case cd of
#ifndef ghcjs_HOST_OS
Just Self -> return()
Just (TLSNode2Node _ ) -> do
checkCookie conn
watchConnection conn node
Just (Node2Node _ _ _) -> do
checkCookie conn
watchConnection conn node
#endif
_ -> watchConnection conn node
return conn
#ifndef ghcjs_HOST_OS
checkCookie conn= do
cookie <- liftIO $ readIORef rcookie
mynode <- getMyNode
sendRaw conn $ "CLOS " <> cookie <> --" b \r\nField: value\r\n\r\n" -- TODO put it standard: Set-Cookie:...
" b \r\nHost: " <> BS.pack (nodeHost mynode) <> "\r\nPort: " <> BS.pack (show $ nodePort mynode) <> "\r\n\r\n"
r <- liftIO $ readFrom conn
case r of
"OK" -> return ()
_ -> do
let Connection{connData=rcdata}= conn
cdata <- liftIO $ readIORef rcdata
case cdata of
Just(Node2Node _ s _) -> liftIO $ NS.close s -- since the HTTP firewall closes the connection
Just(TLSNode2Node c) -> liftIO $ tlsClose c
empty
#endif
watchConnection conn node= do
liftIO $ atomicModifyIORef connectionList $ \m -> (conn:m,())
parseContext <- gets parseContext -- getSData <|> error "NO PARSE CONTEXT"
:: TransIO ParseContext
chs <- liftIO $ newIORef M.empty
--whls <- liftIO $ newIORef []
let conn'= conn{closChildren= chs} --, wormholes= whls}
-- liftIO $ modifyMVar_ (fromJust pool) $ \plist -> do
-- if not (null plist) then print "DUPLICATE" else return ()
-- return $ conn':plist -- !> (node,"ADDED TO POOL")
-- tell listenResponses to watch incoming responses
putMailbox ((conn',parseContext,node) :: (Connection,ParseContext,Node))
liftIO $ threadDelay 100000 -- give time to initialize listenResponses
#ifndef ghcjs_HOST_OS
close1 sock= do
NS.setSocketOption sock NS.Linger 0
NS.close sock
connectTo' bufSize hostname (PortNumber port) = do
proto <- BSD.getProtocolNumber "tcp"
bracketOnError
(NS.socket NS.AF_INET NS.Stream proto)
(NS.close) -- only done if there's an error
(\sock -> do
NS.setSocketOption sock NS.RecvBuffer bufSize
NS.setSocketOption sock NS.SendBuffer bufSize
-- NS.setSocketOption sock NS.SendTimeOut 1000000 !> ("CONNECT",port)
he <- BSD.getHostByName hostname
NS.connect sock (NS.SockAddrInet port (BSD.hostAddress he))
return sock)
#else
connectToWS h (PortNumber p) = do
protocol <- liftIO $ fromJSValUnchecked js_protocol
pathname <- liftIO $ fromJSValUnchecked js_pathname
tr ("PAHT",pathname)
let ps = case (protocol :: JS.JSString)of "http:" -> "ws://"; "https:" -> "wss://"
wsOpen $ JS.pack $ ps++ h++ ":"++ show p ++ pathname
#endif
-- last usage+ blocking semantics for sending
type Blocked= MVar (Maybe Integer)
type BuffSize = Int
data ConnectionData=
#ifndef ghcjs_HOST_OS
Node2Node{port :: PortID
,socket ::Socket
,sockAddr :: NS.SockAddr
}
| TLSNode2Node{tlscontext :: SData}
| HTTPS2Node{tlscontext :: SData}
| Node2Web{webSocket :: WS.Connection}
| HTTP2Node{port :: PortID
,socket ::Socket
,sockAddr :: NS.SockAddr}
| Self
#else
Self
| Web2Node{webSocket :: WebSocket}
#endif
-- deriving (Eq,Ord)
data Connection= Connection{idConn :: Int
,myNode :: IORef Node
,remoteNode :: IORef (Maybe Node)
,connData :: IORef (Maybe ConnectionData)
,istream :: IORef ParseContext
,bufferSize :: BuffSize
-- multiple wormhole/teleport use the same connection concurrently
,isBlocked :: Blocked
,calling :: Bool
,synchronous :: Bool
-- local localClosures with his continuation and a blocking MVar
-- another MVar with the children created by the closure
-- also has the id of the remote closure connected with
,localClosures :: MVar (M.Map IdClosure (MVar[EventF],IdClosure, MVar (),EventF))
-- for each remote closure that points to local closure 0,
-- a new container of child processes
-- in order to treat them separately
-- so that 'killChilds' do not kill unrelated processes
-- used by `single` and `unique`
,closChildren :: IORef (M.Map Int EventF)}
deriving Typeable
connectionList :: IORef [Connection]
connectionList= unsafePerformIO $ newIORef []
defConnection :: TransIO Connection
noParseContext= let err= error "parseContext not set" in
ParseContext err err err
-- #ifndef ghcjs_HOST_OS
defConnection = do
idc <- genGlobalId
liftIO $ do
my <- createNode "localhost" 0 >>= newIORef
x <- newMVar Nothing
y <- newMVar M.empty
ref <- newIORef Nothing
z <- newIORef M.empty
noconn <- newIORef Nothing
np <- newIORef noParseContext
-- whls <- newIORef []
return $ Connection idc my ref noconn np 8192
--(error "defConnection: accessing network events out of listen")
x False False y z -- whls
#ifndef ghcjs_HOST_OS
setBuffSize :: Int -> TransIO ()
setBuffSize size= do
conn<- getData `onNothing` (defConnection !> "DEFF3")
setData $ conn{bufferSize= size}
getBuffSize=
(do getSData >>= return . bufferSize) <|> return 8192
-- | Setup the node to start listening for incoming connections.
--
listen :: Node -> Cloud ()
listen (node@(Node _ port _ _ )) = onAll $ do
labelState "listen"
{-
st <- get
onException $ \(e :: SomeException) -> do
case fromException e of
Just (CloudException _ _ _) -> return()
_ -> do
cutExceptions
liftIO $ print "EXCEPTION: KILLING"
topState >>= showThreads
-- liftIO $ killBranch' st
-- Closure closRemote <- getData `onNothing` error "teleport: no closRemote"
-- conn <- getData `onNothing` error "reportBack: No connection defined: use wormhole"
-- liftIO $ putStrLn "Closing connection"
-- mclose conn
-- msend conn $ SError $ toException $ ErrorCall $ show $ show $ CloudException node closRemote $ show e
empty
-}
-- ex <- exceptionPoint :: TransIO (BackPoint SomeException)
-- setData ex
onException $ \(ConnectionError msg node) -> empty
--addThreads 2
fork connectionTimeouts
fork loopClosures
setData $ Log{recover=False, buildLog= mempty, fulLog= mempty, lengthFull= 0, hashClosure= 0}
conn' <- getSData <|> defConnection
chs <- liftIO $ newIORef M.empty
cdata <- liftIO $ newIORef $ Just Self
let conn= conn'{connData=cdata,closChildren=chs}
pool <- liftIO $ newMVar [conn]
let node'= node{connection=Just pool}
liftIO $ writeIORef (myNode conn) node'
setData conn
liftIO $ modifyMVar_ (fromJust $ connection node') $ const $ return [conn]
addNodes [node']
setRState(JobGroup M.empty) --used by resetRemote
ex <- exceptionPoint :: TransIO (BackPoint SomeException)
setData ex
mlog <- listenNew (fromIntegral port) conn <|> listenResponses :: TransIO (StreamData NodeMSG)
execLog mlog
--showNext "after listen" 10
tr "END LISTEN"
-- listen incoming requests
listenNew port conn'= do
sock <- liftIO $ listenOn $ PortNumber port
liftIO $ do
let bufSize= bufferSize conn'
NS.setSocketOption sock NS.RecvBuffer bufSize
NS.setSocketOption sock NS.SendBuffer bufSize
-- wait for connections. One thread per connection
liftIO $ do putStr "Connected to port: "; print port
(sock,addr) <- waitEvents $ NS.accept sock
chs <- liftIO $ newIORef M.empty
-- case addr of
-- NS.SockAddrInet port host -> liftIO $ print("connection from", port, host)
-- NS.SockAddrInet6 a b c d -> liftIO $ print("connection from", a, b,c,d)
noNode <- liftIO $ newIORef Nothing
id1 <- genGlobalId
let conn= conn'{idConn=id1,closChildren=chs, remoteNode= noNode}
--liftIO $ atomicModifyIORef connectionList $ \m -> (conn: m,()) -- TODO
input <- liftIO $ SBSL.getContents sock
--tr "SOME INPUT"
-- cutExceptions
-- onException $ \(e :: IOException) ->
-- when (ioeGetLocation e=="Network.Socket.recvBuf") $ do
-- liftIO $ putStr "listen: " >> print e
-- let Connection{remoteNode=rnode,localClosures=localClosures,closChildren= rmap} = conn
-- mnode <- liftIO $ readIORef rnode
-- case mnode of
-- Nothing -> return ()
-- Just node -> do
-- liftIO $ putStr "removing1 node: " >> print node
-- nodes <- getNodes
-- setNodes $ nodes \\ [node]
-- liftIO $ do
-- modifyMVar_ localClosures $ const $ return M.empty
-- writeIORef rmap M.empty
-- -- topState >>= showThreads
-- killBranch
let nod = unsafePerformIO $ liftIO $ createNode "incoming connection" 0 in
modify $ \s -> s{execMode=Serial,parseContext= (ParseContext
(liftIO $ NS.close sock >> throw (ConnectionError "connection closed" nod))
input (unsafePerformIO $ newIORef False)
::ParseContext )}
cdata <- liftIO $ newIORef $ Just (Node2Node (PortNumber port) sock addr)
let conn'= conn{connData=cdata}
setState conn'
liftIO $ atomicModifyIORef connectionList $ \m -> (conn': m,()) -- TODO
maybeTLSServerHandshake sock input
-- tr "AFTER HANDSHAKE"
firstLine@(method, uri, vers) <- getFirstLine
headers <- getHeaders
setState $ HTTPHeaders firstLine headers
-- tr ("HEADERS", headers)
-- string "\r\n\r\n"
-- tr (method, uri,vers)
case (method, uri) of
("CLOS", hisCookie) -> do
conn <- getSData
tr "CONNECTING"
let host = BC.unpack $ fromMaybe (error "no host in header")$ lookup "Host" headers
port = read $ BC.unpack $ fromMaybe (error "no port in header")$ lookup "Port" headers
remNode' <- liftIO $ createNode host port
rc <- liftIO $ newMVar [conn]
let remNode= remNode'{connection= Just rc}
liftIO $ writeIORef (remoteNode conn) $ Just remNode
tr ("ADDED NODE", remNode)
addNodes [remNode]
myCookie <- liftIO $ readIORef rcookie
if BS.toStrict myCookie /= hisCookie
then do
sendRaw conn "NOK"
mclose conn
error "connection attempt with bad cookie"
else do
sendRaw conn "OK" -- !> "CLOS detected"
--async (return (SMore $ ClosureData 0 0[Exec])) <|> mread conn
mread conn
_ -> do
-- it is a HTTP request
-- process the current request in his own thread and then (<|>) any other request that arrive in the same connection
cutBody method headers <|> many' cutHTTPRequest
HTTPHeaders (method,uri,vers) headers <- getState <|> error "HTTP: no headers?"
let uri'= BC.tail $ uriPath uri !> uriPath uri
tr ("uri'", uri')
case BC.span (/= '/') uri' of
("api",_) -> do
-- if "api" `BC.isPrefixOf` uri'
--then do
--log <- return $ Exec:Exec: (Var $ IDyns $ up method):(map (Var . IDyns ) $ split $ BC.unpack $ BC.drop 4 uri')
let log= exec <> lazyByteString method <> byteString "/" <> byteString (BC.drop 4 uri')
maybeSetHost headers
tr ("HEADERS", headers)
str <- giveParseString <|> error "no api data"
if lookup "Transfer-Encoding" headers == Just "chunked" then error $ "chunked not supported" else do
len <- (read <$> BC.unpack
<$> (Transient $ return (lookup "Content-Length" headers)))
<|> return 0
log' <- case lookup "Content-Type" headers of
Just "application/json" -> do
let toDecode= BS.take len str
-- tr ("TO DECODE", log <> lazyByteString toDecode)
setParseString $ BS.take len str
return $ log <> "/" <> lazyByteString toDecode -- [(Var $ IDynamic json)] -- TODO hande this serialization
Just "application/x-www-form-urlencoded" -> do
tr ("POST HEADERS=", BS.take len str)
setParseString $ BS.take len str
--postParams <- parsePostUrlEncoded <|> return []
return $ log <> lazyByteString ( BS.take len str) -- [(Var . IDynamic $ postParams)] TODO: solve deserialization
Just x -> do
tr ("POST HEADERS=", BS.take len str)
let str= BS.take len str
return $ log <> lazyByteString str -- ++ [Var $ IDynamic str]
_ -> return $ log
setParseString $ toLazyByteString log'
return $ SMore $ ClosureData 0 0 log' !> ("APIIIII", log')
--else if "relay" `BC.isPrefixOf` uri' then proxy sock method vers uri'
("relay",_) -> proxy sock method vers uri'
(h,rest) -> do
if BC.null rest || h== "file" then do
--headers <- getHeaders
maybeSetHost headers
let uri= if BC.null h || BC.null rest then uri' else BC.tail rest
tr (method,uri)
-- stay serving pages until a websocket request is received
servePages (method, uri, headers)
-- when servePages finish, is because a websocket request has arrived
conn <- getSData
sconn <- makeWebsocketConnection conn uri headers
-- websockets mode
-- para qué reiniciarlo todo????
rem <- liftIO $ newIORef Nothing
chs <- liftIO $ newIORef M.empty
cls <- liftIO $ newMVar M.empty
cme <- liftIO $ newIORef M.empty
cdata <- liftIO $ newIORef $ Just (Node2Web sconn)
let conn'= conn{connData= cdata
, closChildren=chs,localClosures=cls, remoteNode=rem} --,comEvent=cme}
setState conn' !> "WEBSOCKETS CONNECTION"
co <- liftIO $ readIORef rcookie
let receivedCookie= lookup "cookie" headers
tr ("cookie", receivedCookie)
rcookie <- case receivedCookie of
Just str-> Just <$> do
withParseString (BS.fromStrict str) $ do
tDropUntilToken "cookie="
tTakeWhile (not . isSpace)
Nothing -> return Nothing
tr ("RCOOKIE", rcookie)
if rcookie /= Nothing && rcookie /= Just co
then do
node <- getMyNode
--let msg= SError $ toException $ ErrorCall $ show $ show $ CloudException node 0 $ show $ ConnectionError "bad cookie" node
tr "SENDINg"
liftIO $ WS.sendClose sconn ("Bad Cookie" :: BS.ByteString) !> "SendClose Bad cookie"
empty
else do
liftIO $ WS.sendTextData sconn ("OK" :: BS.ByteString)
-- a void message is sent to the application signaling the beginning of a connection
-- async (return (SMore $ ClosureData 0 0[Exec])) <|> do
tr "WEBSOCKET"
-- onException $ \(e :: SomeException) -> do
-- liftIO $ putStr "listen websocket:" >> print e
-- -- liftIO $ mclose conn'
-- -- killBranch
-- -- empty
s <- waitEvents $ receiveData' conn' sconn :: TransIO BS.ByteString
setParseString s
tr ("WEBSOCKET RECEIVED <-----------",s)
-- integer
deserialize
-- a void message is sent to the application signaling the beginning of a connection
<|> (return $ SMore (ClosureData 0 0 (exec <> lazyByteString s)))
else do
let uriparsed= BS.pack $ unEscapeString $ BC.unpack uri'
setParseString uriparsed !> ("uriparsed",uriparsed)
remoteClosure <- deserialize :: TransIO Int
tChar '/'
thisClosure <- deserialize :: TransIO Int
tChar '/'
--cdata <- liftIO $ newIORef $ Just (HTTP2Node (PortNumber port) sock addr)
conn <- getSData
liftIO $ atomicModifyIORef' (connData conn) $ \cdata -> case cdata of
Just(Node2Node port sock addr) -> (Just $ HTTP2Node port sock addr,())
Just(TLSNode2Node ctx) -> (Just $ HTTPS2Node ctx,())
--setState conn{connData=cdata}
s <- giveParseString
cook <- liftIO $ readIORef rcookie
liftIO $ SBSL.sendAll sock $ "HTTP/1.0 200 OK\r\nContent-Type: text/plain\r\n\r\n"
return $ SMore $ ClosureData remoteClosure thisClosure $ lazyByteString s
where
cutHTTPRequest = do
first@(method,_,_) <- getFirstLine
-- tr ("after getfirstLine", method, uri, vers)
headers <- getHeaders
setState $ HTTPHeaders first headers
cutBody method headers
cutBody method headers= do
if method == "POST" then
case fmap (read . BC.unpack) $ lookup "Content-Length" headers of
Nothing -> return () -- most likely chunked encoding
Just len -> do
str <- tTake (fromIntegral len)
abduce
setParseString str
else abduce
uriPath = BC.dropWhile (/= '/')
split []= []
split ('/':r)= split r
split s=
let (h,t) = Prelude.span (/= '/') s
in h: split t
-- reverse proxy for urls that look like http://host:port/relay/otherhost/otherport/
proxy sclient method vers uri' = do
let (host:port:_)= split $ BC.unpack $ BC.drop 6 uri'
tr ("RELAY TO",host, port)
--liftIO $ threadDelay 1000000
sserver <- liftIO $ connectTo' 4096 host $ PortNumber $ fromIntegral $ read port
tr "CONNECTED"
rawHeaders <- getRawHeaders
tr ("RAWHEADERS",rawHeaders)
let uri= BS.fromStrict $ let d x= BC.tail $ BC.dropWhile (/= '/') x in d . d $ d uri'
let sent= method <> BS.pack " /"
<> uri
<> BS.cons ' ' vers
<> BS.pack "\r\n"
<> rawHeaders <> BS.pack "\r\n\r\n"
tr ("SENT",sent)
liftIO $ SBSL.send sserver sent
-- Connection{connData=Just (Node2Node _ sclient _)} <- getState <|> error "proxy: no connection"
(send sclient sserver <|> send sserver sclient)
`catcht` \(e:: SomeException ) -> liftIO $ do
putStr "Proxy: " >> print e
NS.close sserver
NS.close sclient
empty
empty
where
send f t= async $ mapData f t
mapData from to = do
content <- recv from 4096
tr (" proxy received ", content)
if not $ BC.null content
then sendAll to content >> mapData from to
else finish
where
finish= NS.close from >> NS.close to
-- throw $ Finish "finish"
maybeSetHost headers= do
setHost <- liftIO $ readIORef rsetHost
when setHost $ do
mnode <- liftIO $ do
let mhost= lookup "Host" headers
case mhost of
Nothing -> return Nothing
Just host -> atomically $ do
-- set the first node (local node) as is called from outside
nodes <- readTVar nodeList
let (host1,port)= BC.span (/= ':') host
hostnode= (Prelude.head nodes){nodeHost= BC.unpack host1
,nodePort= if BC.null port then 80
else read $ BC.unpack $ BC.tail port}
writeTVar nodeList $ hostnode : Prelude.tail nodes
return $ Just hostnode -- !> (host1,port)
when (isJust mnode) $ do
conn <- getState
liftIO $ writeIORef (myNode conn) $fromJust mnode
liftIO $ writeIORef rsetHost False -- !> "HOSt SET"
{-#NOINLINE rsetHost #-}
rsetHost= unsafePerformIO $ newIORef True
--instance Read PortNumber where
-- readsPrec n str= let [(n,s)]= readsPrec n str in [(fromIntegral n,s)]
--deriving instance Read PortID
--deriving instance Typeable PortID
-- | filter out HTTP requests
noHTTP= onAll $ do
conn <- getState
cdata <- liftIO $ readIORef $ connData conn
case cdata of
Just (HTTPS2Node ctx) -> do
liftIO $ sendTLSData ctx $ "HTTP/1.1 403 Forbidden\r\nConnection: close\r\nContent-Length: 11\r\n\r\nForbidden\r\n"
liftIO $ tlsClose ctx
Just (HTTP2Node _ sock _) -> do
liftIO $ SBSL.sendAll sock $ "HTTP/1.1 403 Forbidden\r\nConnection: close\r\nContent-Length: 11\r\n\r\nForbidden\r\n"
liftIO $ NS.close sock
empty
_ -> return ()
{-
-- filter out WebSockets connections(usually coming from a web node)
noWebSockets= onAll $ do
conn <- getState
cdata <- liftIO $ readIORef $ connData conn
case cdata of
Just (Web2Node _) -> empty
_ -> return()
-}
#endif
listenResponses :: Loggable a => TransIO (StreamData a)
listenResponses= do
labelState "listen responses"
(conn, parsecontext, node) <- getMailbox :: TransIO (Connection,ParseContext,Node)
labelState . fromString $ "listen from: "++ show node
setData conn
tr ("CONNECTION RECEIVED","listen from: "++ show node)
modify $ \s-> s{execMode=Serial,parseContext = parsecontext}
-- cutExceptions
-- onException $ \(e:: SomeException) -> do
-- liftIO $ putStr "ListenResponses: " >> print e
-- liftIO $ putStr "removing node: " >> print node
-- nodes <- getNodes
-- setNodes $ nodes \\ [node]
-- -- topState >>= showThreads
-- killChilds
-- let Connection{localClosures=localClosures}= conn
-- liftIO $ modifyMVar_ localClosures $ const $ return M.empty
-- empty
mread conn
type IdClosure= Int
-- The remote closure ids for each node connection
newtype Closure= Closure IdClosure deriving (Read,Show,Typeable)
type RemoteClosure= (Node, IdClosure)
newtype JobGroup= JobGroup (M.Map BC.ByteString RemoteClosure) deriving Typeable
-- | if there is a remote job identified by th string identifier, it stop that job, and set the
-- current remote operation (if any) as the current remote job for this identifier.
-- The purpose is to have a single remote job.
-- to identify the remote job, it should be used after the `wormhole` and before the remote call:
--
-- > r <- wormhole node $ do
-- > stopRemoteJob "streamlog"
-- > atRemote myRemotejob
--
-- So:
--
-- > runAtUnique ident node job= wormhole node $ do stopRemoteJob ident; atRemote job
-- This program receive a stream of "hello" from a second node when the option "hello" is entered in the keyboard
-- If you enter "world", the "hello" stream from the second node
-- will stop and will print an stream of "world" from a third node:
-- Entering "hello" again will stream "hello" again from the second node and so on:
-- > main= keep $ initNode $ inputNodes <|> do
-- >
-- > local $ option "init" "init"
-- > nodes <- local getNodes
-- > r <- proc (nodes !! 1) "hello" <|> proc (nodes !! 2) "world"
-- > localIO $ print r
-- > return ()
-- >
-- > proc node par = do
-- > v <- local $ option par par
-- > runAtUnique "name" node $ local $ do
-- > abduce
-- > r <- threads 0 $ choose $ repeat v
-- > liftIO $ threadDelay 1000000
-- > return r
-- the nodes could be started from the command line as such in different terminals:
-- > program -p start/localhost/8000
-- > program -p start/localhost/8002
-- > program -p start/localhost/8001/add/localhost/8000/n/add/localhost/8002/n/init
-- The third is the one wich has the other two connected and can execute the two options.
stopRemoteJob :: BC.ByteString -> Cloud ()
instance Loggable Closure
stopRemoteJob ident = do
resetRemote ident
Closure closr <- local $ getData `onNothing` error "stopRemoteJob: Connection not set, use wormhole"
tr ("CLOSRRRRRRRR", closr)
fixClosure
local $ do
Closure closr <- getData `onNothing` error "stopRemoteJob: Connection not set, use wormhole"
conn <- getData `onNothing` error "stopRemoteJob: Connection not set, use wormhole"
remote <- liftIO $ readIORef $ remoteNode conn
return (remote,closr) !> ("REMOTE",remote)
JobGroup map <- getRState <|> return (JobGroup M.empty)
setRState $ JobGroup $ M.insert ident (fromJust remote,closr) map
-- |kill the remote job. Usually, before starting a new one.
resetRemote :: BC.ByteString -> Cloud ()
resetRemote ident = do
mj <- local $ do
JobGroup map <- getRState <|> return (JobGroup M.empty)
return $ M.lookup ident map
when (isJust mj) $ do
let (remote,closr)= fromJust mj
--do -- when (closr /= 0) $ do
runAt remote $ local $ do
conn@Connection {localClosures=localClosures} <- getData `onNothing` error "Listen: myNode not set"
mcont <- liftIO $ modifyMVar localClosures $ \map -> return ( M.delete closr map, M.lookup closr map)
case mcont of
Nothing -> error $ "closure not found: " ++ show closr
Just (_,_,_,cont) -> do
topState >>= showThreads
liftIO $ killBranch' cont
return ()
execLog :: StreamData NodeMSG -> TransIO ()
execLog mlog =Transient $ do
tr "EXECLOG"
case mlog of
SError e -> do
return() !> ("SERROR",e)
case fromException e of
Just (ErrorCall str) -> do
case read str of
(e@(CloudException _ closl err)) -> do
process closl (error "closr: should not be used") (Left e) True
SDone -> runTrans(back $ ErrorCall "SDone") >> return Nothing -- TODO remove closure?
SMore (ClosureData closl closr log) -> process closl closr (Right log) False
SLast (ClosureData closl closr log) -> process closl closr (Right log) True
where
process :: IdClosure -> IdClosure -> (Either CloudException Builder) -> Bool -> StateIO (Maybe ())
process closl closr mlog deleteClosure= do
conn@Connection {localClosures=localClosures} <- getData `onNothing` error "Listen: myNode not set"
if closl== 0 then do
case mlog of
Left except -> do
setData emptyLog
tr "Exception received from network 1"
runTrans $ throwt except
empty
Right log -> do
tr ("CLOSURE 0",log)
setData Log{recover= True, buildLog= mempty, fulLog= log, lengthFull= 0, hashClosure= 0} --Log True [] []
setState $ Closure closr
setRState $ DialogInWormholeInitiated True
-- setParseString $ toLazyByteString log -- not needed it is has the log from the request, still not parsed
return $ Just () -- !> "executing top level closure"
else do
mcont <- liftIO $ modifyMVar localClosures
$ \map -> return (if deleteClosure then
M.delete closl map
else map, M.lookup closl map)
-- !> ("localClosures=", M.size map)
case mcont of
Nothing -> do
node <- liftIO $ readIORef (remoteNode conn) `onNothing` error "mconnect: no remote node?"
let e = "request received for non existent closure. Perhaps the connection was closed by timeout and reopened"
let err= CloudException node closl $ show e
throw err
-- execute the closure
Just (chs,closLocal, mv,cont) -> do
when deleteClosure $ do
-- liftIO $ killChildren chs
empty -- last message received
liftIO $ tryPutMVar mv ()
void $ liftIO $ runStateT (case mlog of
Right log -> do
-- Log _ _ fulLog hashClosure <- getData `onNothing` return (Log True [] [] 0)
Log{fulLog=fulLog, hashClosure=hashClosure} <- getLog
-- return() !> ("fullog in execlog", reverse fulLog)
let nlog= fulLog <> log -- let nlog= reverse log ++ fulLog
setData $ Log{recover= True, buildLog= mempty, fulLog= nlog, lengthFull=error "lengthFull TODO", hashClosure= hashClosure} -- TODO hashClosure must change?
setState $ Closure closr
setRState $ DialogInWormholeInitiated True
setParseString $ toLazyByteString log
--restrs <- giveParseString
--tr ("rs' in execlog =", fmap (BS.take 4) restrs)
runContinuation cont ()
Left except -> do
setData emptyLog
tr ("Exception received from the network", except)
runTrans $ throwt except) cont
return Nothing
#ifdef ghcjs_HOST_OS
listen node = onAll $ do
addNodes [node]
setRState(JobGroup M.empty)
-- ex <- exceptionPoint :: TransIO (BackPoint SomeException)
-- setData ex
events <- liftIO $ newIORef M.empty
rnode <- liftIO $ newIORef node
conn <- defConnection >>= \c -> return c{myNode=rnode} -- ,comEvent=events}
liftIO $ atomicModifyIORef connectionList $ \m -> (conn: m,())
setData conn
r <- listenResponses
execLog r
#endif
type Pool= [Connection]
type SKey= String
type SValue= String
type Service= [(SKey, SValue)]
lookup2 key doubleList=
let r= mapMaybe(lookup key ) doubleList
in if null r then Nothing else Just $ head r
filter2 key doubleList= mapMaybe(lookup key ) doubleList
--------------------------------------------
#ifndef ghcjs_HOST_OS
-- maybeRead line= unsafePerformIO $ do
-- let [(v,left)] = reads line
---- print v
-- (v `seq` return [(v,left)])
-- `catch` (\(e::SomeException) -> do
-- liftIO $ print $ "******readStream ERROR in: "++take 100 line
-- maybeRead left)
{-
readFrom Connection{connData= Just(TLSNode2Node ctx)}= recvTLSData ctx
readFrom Connection{connData= Just(Node2Node _ sock _)} = toStrict <$> loop
readFrom _ = error "readFrom error"
-}
readFrom con= do
cd <- readIORef $ connData con
case cd of
Just(TLSNode2Node ctx) -> recvTLSData ctx
Just(Node2Node _ sock _) -> BS.toStrict <$> loop sock
_ -> error "readFrom error"
where
bufSize= 4098
loop sock = loop1
where
loop1 :: IO BL.ByteString
loop1 = unsafeInterleaveIO $ do
s <- SBS.recv sock bufSize
if BC.length s < bufSize
then return $ BLC.Chunk s mempty
else BLC.Chunk s `liftM` loop1
-- toStrict= B.concat . BS.toChunks
makeWSStreamFromConn conn= do
tr "WEBSOCKETS request"
let rec= readFrom conn
send= sendRaw conn
makeStream
(do
bs <- rec -- SBS.recv sock 4098
return $ if BC.null bs then Nothing else Just bs)
(\mbBl -> case mbBl of
Nothing -> return ()
Just bl -> send bl) -- SBS.sendMany sock (BL.toChunks bl) >> return()) -- !!> show ("SOCK RESP",bl)
makeWebsocketConnection conn uri headers= liftIO $ do
stream <- makeWSStreamFromConn conn
let
pc = WS.PendingConnection
{ WS.pendingOptions = WS.defaultConnectionOptions -- {connectionOnPong=xxx}
, WS.pendingRequest = NWS.RequestHead uri headers False -- RequestHead (BC.pack $ show uri)
-- (map parseh headers) False
, WS.pendingOnAccept = \_ -> return ()
, WS.pendingStream = stream
}
sconn <- WS.acceptRequest pc -- !!> "accept request"
WS.forkPingThread sconn 30
return sconn
servePages (method,uri, headers) = do
-- return () !> ("HTTP request",method,uri, headers)
conn <- getSData <|> error " servePageMode: no connection"
if isWebSocketsReq headers
then return ()
else do
let file= if BC.null uri then "index.html" else uri
{- TODO rendering in server
NEEDED: recodify View to use blaze-html in server. wlink to get path in server
does file exist?
if exist, send else do
store path, execute continuation
get the rendering
send trough HTTP
- put this logic as independent alternative programmer options
serveFile dirs <|> serveApi apis <|> serveNode nodeCode
-}
mcontent <- liftIO $ (Just <$> BL.readFile ( "./static/out.jsexe/"++ BC.unpack file) )
`catch` (\(e:: SomeException) -> return Nothing)
-- return "Not found file: index.html
please compile with ghcjs
ghcjs program.hs -o static/out")
case mcontent of
Just content -> do
cook <- liftIO $ readIORef rcookie
liftIO $ sendRaw conn $
"HTTP/1.0 200 OK\r\nContent-Type: text/html\r\nConnection: close\r\nContent-Length: "
<> BS.pack (show $ BL.length content) <>"\r\n"
<> "Set-Cookie:" <> "cookie=" <> cook -- <> "\r\n"
<>"\r\n\r\n" <> content
Nothing ->liftIO $ sendRaw conn $ BS.pack $
"HTTP/1.0 404 Not Found\nContent-Length: 13\r\nConnection: close\r\n\r\nNot Found 404"
empty
-- | forward all the result of the Transient computation to the opened connection
api :: TransIO BS.ByteString -> Cloud ()
api w= Cloud $ do
log <- getLog
if not $ recover log then empty else do
HTTPHeaders (_,_,vers) hdrs <- getState <|> error "api: no HTTP headers???"
let closeit= lookup "Connection" hdrs == Just "close"
conn <- getState <|> error "api: Need a connection opened with initNode, listen, simpleWebApp"
let send= sendRaw conn
r <- w
tr ("response",r)
send r
tr (vers, hdrs)
when (vers == http10 ||
BS.isPrefixOf http10 r ||
lookup "Connection" hdrs == Just "close" ||
closeInResponse r)
$ liftIO $ mclose conn
where
closeInResponse r=
let rest= findSubstring "Connection:" r
rest' = BS.dropWhile (==' ') rest
in if BS.isPrefixOf "close" rest' then True else False
where
findSubstring sub str
| BS.null str = str
| BS.isPrefixOf sub str = BS.drop (BS.length sub) str
| otherwise= findSubstring sub (BS.tail str)
http10= "HTTP/1.0"
isWebSocketsReq = not . null
. filter ( (== mk "Sec-WebSocket-Key") . fst)
data HTTPMethod= GET | POST deriving (Read,Show,Typeable,Eq)
instance Loggable HTTPMethod
getFirstLine= (,,) <$> getMethod <*> (BS.toStrict <$> getUri) <*> getVers
where
getMethod= parseString
getUri= parseString
getVers= parseString
getRawHeaders= dropSpaces >> (withGetParseString $ \s -> return $ scan mempty s)
where
scan res str
| "\r\n\r\n" `BS.isPrefixOf` str = (res, BS.drop 4 str)
| otherwise= scan ( BS.snoc res $ BS.head str) $ BS.tail str
-- line= do
-- dropSpaces
-- tTakeWhile (not . endline)
type PostParams = [(BS.ByteString, String)]
parsePostUrlEncoded :: TransIO PostParams
parsePostUrlEncoded= do
dropSpaces
many $ (,) <$> param <*> value
where
param= tTakeWhile' ( /= '=') !> "param"
value= unEscapeString <$> BS.unpack <$> tTakeWhile' (/= '&' )
getHeaders = manyTill paramPair (string "\r\n\r\n") -- !> (method, uri, vers)
where
paramPair= (,) <$> (mk <$> getParam) <*> getParamValue
getParam= do
dropSpaces
r <- tTakeWhile (\x -> x /= ':' && not (endline x))
if BS.null r || r=="\r" then empty else anyChar >> return (BS.toStrict r)
where
endline c= c== '\r' || c =='\n'
getParamValue= BS.toStrict <$> ( dropSpaces >> tTakeWhile (\x -> not (endline x)))
where
endline c= c== '\r' || c =='\n'
#endif
#ifdef ghcjs_HOST_OS
isBrowserInstance= True
api _= empty
#else
-- | Returns 'True' if we are running in the browser.
isBrowserInstance= False
#endif
{-# NOINLINE emptyPool #-}
emptyPool :: MonadIO m => m (MVar Pool)
emptyPool= liftIO $ newMVar []
-- | Create a node from a hostname (or IP address), port number and a list of
-- services.
createNodeServ :: HostName -> Int -> [Service] -> IO Node
createNodeServ h p svs= return $ Node h p Nothing svs
createNode :: HostName -> Int -> IO Node
createNode h p= createNodeServ h p []
createWebNode :: IO Node
createWebNode= do
pool <- emptyPool
port <- randomIO
return $ Node "webnode" port (Just pool) [[("webnode","")]]
instance Eq Node where
Node h p _ _ ==Node h' p' _ _= h==h' && p==p'
instance Show Node where
show (Node h p _ servs )= show (h,p, servs)
instance Read Node where
readsPrec n s=
let r= readsPrec n s
in case r of
[] -> []
[((h,p,ss),s')] -> [(Node h p Nothing ss ,s')]
nodeList :: TVar [Node]
nodeList = unsafePerformIO $ newTVarIO []
deriving instance Ord PortID
--myNode :: Int -> DBRef MyNode
--myNode= getDBRef $ key $ MyNode undefined
errorMyNode f= error $ f ++ ": Node not set. initialize it with connect, listen, initNode..."
-- | Return the local node i.e. the node where this computation is running.
getMyNode :: TransIO Node
getMyNode = do
Connection{myNode= node} <- getSData <|> errorMyNode "getMyNode" :: TransIO Connection
liftIO $ readIORef node
-- | empty if the node is not set
getMyNodeMaybe= do
Connection{myNode= node} <- getSData
liftIO $ readIORef node
-- | Return the list of nodes in the cluster.
getNodes :: MonadIO m => m [Node]
getNodes = liftIO $ atomically $ readTVar nodeList
-- | get the nodes that have the same service definition that the calling node
getEqualNodes = do
nodes <- getNodes
let srv= nodeServices $ Prelude.head nodes
case srv of
[] -> return $ filter (null . nodeServices) nodes
(srv:_) -> return $ filter (\n -> (not $ null $ nodeServices n) && Prelude.head (nodeServices n) == srv ) nodes
getWebNodes :: MonadIO m => m [Node]
getWebNodes = do
nodes <- getNodes
return $ filter ( (==) "webnode" . nodeHost) nodes
matchNodes f = do
nodes <- getNodes
return $ Prelude.map (\n -> filter f $ nodeServices n) nodes
-- | Add a list of nodes to the list of existing nodes know locally.
-- If the node is already present, It add his services to the already present node
-- services which have the first element equal (usually the "name" field) will be substituted if the match
addNodes :: [Node] -> TransIO ()
addNodes nodes= liftIO $ do
-- the local node should be the first
nodes' <- mapM fixNode nodes
atomically $ mapM_ insert nodes'
where
insert node= do
prevnodes <- readTVar nodeList -- !> ("ADDNODES", nodes)
let mn = filter(==node) prevnodes
case mn of
[] -> do tr "NUEVO NODO"; writeTVar nodeList $ (prevnodes) ++ [node]
[n] ->do
let nservices= nubBy (\s s' -> head s== head s') $ nodeServices node++ nodeServices n
writeTVar nodeList $ ((prevnodes) \\ [node]) ++ [n{nodeServices=nservices}]
_ -> error $ "duplicated node: " ++ show node
--writeTVar nodeList $ (prevnodes \\ nodes') ++ nodes'
delNodes nodes= liftIO $ atomically $ do
nodes' <- readTVar nodeList
writeTVar nodeList $ nodes' \\ nodes
fixNode n= case connection n of
Nothing -> do
pool <- emptyPool
return n{connection= Just pool}
Just _ -> return n
-- | set the list of nodes
setNodes nodes= liftIO $ do
nodes' <- mapM fixNode nodes
atomically $ writeTVar nodeList nodes'
-- | Shuffle the list of cluster nodes and return the shuffled list.
shuffleNodes :: MonadIO m => m [Node]
shuffleNodes= liftIO . atomically $ do
nodes <- readTVar nodeList
let nodes'= Prelude.tail nodes ++ [Prelude.head nodes]
writeTVar nodeList nodes'
return nodes'
--getInterfaces :: TransIO TransIO HostName
--getInterfaces= do
-- host <- logged $ do
-- ifs <- liftIO $ getNetworkInterfaces
-- liftIO $ mapM_ (\(i,n) ->putStrLn $ show i ++ "\t"++ show (ipv4 n) ++ "\t"++name n)$ zip [0..] ifs
-- liftIO $ putStrLn "Select one: "
-- ind <- input ( < length ifs)
-- return $ show . ipv4 $ ifs !! ind
-- #ifndef ghcjs_HOST_OS
--instance Read NS.SockAddr where
-- readsPrec _ ('[':s)=
-- let (s',r1)= span (/=']') s
-- [(port,r)]= readsPrec 0 $ tail $ tail r1
-- in [(NS.SockAddrInet6 port 0 (IP.toHostAddress6 $ read s') 0, r)]
-- readsPrec _ s=
-- let (s',r1)= span(/= ':') s
-- [(port,r)]= readsPrec 0 $ tail r1
-- in [(NS.SockAddrInet port (IP.toHostAddress $ read s'),r)]
-- #endif
-- | add this node to the list of know nodes in the remote node connected by a `wormhole`.
-- This is useful when the node is called back by the remote node.
-- In the case of web nodes with webSocket connections, this is the way to add it to the list of
-- known nodes in the server.
addThisNodeToRemote= do
n <- local getMyNode
atRemote $ local $ do
n' <- setConnectionIn n
addNodes [n']
setConnectionIn node=do
conn <- getState <|> error "addThisNodeToRemote: connection not found"
ref <- liftIO $ newMVar [conn]
return node{connection=Just ref}
-- | Add a node (first parameter) to the cluster using a node that is already
-- part of the cluster (second parameter). The added node starts listening for
-- incoming connections and the rest of the computation is executed on this
-- newly added node.
connect :: Node -> Node -> Cloud ()
#ifndef ghcjs_HOST_OS
connect node remotenode = do
listen node <|> return ()
connect' remotenode
-- | Reconcile the list of nodes in the cluster using a remote node already
-- part of the cluster. Reconciliation end up in each node in the cluster
-- having the same list of nodes.
connect' :: Node -> Cloud ()
connect' remotenode= loggedc $ do
nodes <- local getNodes
localIO $ putStr "connecting to: " >> print remotenode
newNodes <- runAt remotenode $ interchange nodes
--return () !> "interchange finish"
-- add the new nodes to the local nodes in all the nodes connected previously
let toAdd=remotenode:Prelude.tail newNodes
callNodes' nodes (<>) mempty $ local $ do
liftIO $ putStr "New nodes: " >> print toAdd !> "NEWNODES"
addNodes toAdd
where
-- receive new nodes and send their own
interchange nodes=
do
newNodes <- local $ do
conn@Connection{remoteNode=rnode} <- getSData <|>
error ("connect': need to be connected to a node: use wormhole/connect/listen")
-- if is a websockets node, add only this node
-- let newNodes = case cdata of
-- Node2Web _ -> [(head nodes){nodeServices=[("relay",show remotenode)]}]
-- _ -> nodes
let newNodes= nodes -- map (\n -> n{nodeServices= nodeServices n ++ [("relay",show (remotenode,n))]}) nodes
callingNode<- fixNode $ Prelude.head newNodes
liftIO $ writeIORef rnode $ Just callingNode
liftIO $ modifyMVar_ (fromJust $ connection callingNode) $ const $ return [conn]
-- onException $ \(e :: SomeException) -> do
-- liftIO $ putStr "connect:" >> print e
-- liftIO $ putStrLn "removing node: " >> print callingNode
-- -- topState >>= showThreads
-- nodes <- getNodes
-- setNodes $ nodes \\ [callingNode]
return newNodes
oldNodes <- local $ getNodes
mclustered . local $ do
liftIO $ putStrLn "New nodes: " >> print newNodes
addNodes newNodes
localIO $ atomically $ do
-- set the first node (local node) as is called from outside
-- tr "HOST2 set"
nodes <- readTVar nodeList
let nodes'= (Prelude.head nodes){nodeHost=nodeHost remotenode
,nodePort=nodePort remotenode}:Prelude.tail nodes
writeTVar nodeList nodes'
return oldNodes
#else
connect _ _= empty
connect' _ = empty
#endif
#ifndef ghcjs_HOST_OS
------------------------------- HTTP client ---------------
instance {-# Overlapping #-} Loggable Value where
serialize= return . lazyByteString =<< encode
deserialize = decodeIt
where
jsElem :: TransIO BS.ByteString -- just delimites the json string, do not parse it
jsElem= dropSpaces >> (jsonObject <|> array <|> atom)
atom= elemString
array= (brackets $ return "[" <> return "{}" <> chainSepBy mappend (return "," <> jsElem) (tChar ',')) <> return "]"
jsonObject= (braces $ return "{" <> chainMany mappend jsElem) <> return "}"
elemString= do
dropSpaces
tTakeWhile (\c -> c /= '}' && c /= ']' )
decodeIt= do
s <- jsElem
tr ("decode",s)
case eitherDecode s !> "DECODE" of
Right x -> return x
Left err -> empty
data HTTPHeaders= HTTPHeaders (BS.ByteString, B.ByteString, BS.ByteString) [(CI BC.ByteString,BC.ByteString)] deriving Show
rawHTTP :: Loggable a => Node -> String -> TransIO a
rawHTTP node restmsg = do
abduce -- is a parallel operation
tr ("***********************rawHTTP",nodeHost node)
--sock <- liftIO $ connectTo' 8192 (nodeHost node) (PortNumber $ fromIntegral $ nodePort node)
mcon <- getData :: TransIO (Maybe Connection)
c <- do
c <- mconnect' node
tr "after mconnect'"
sendRawRecover c $ BS.pack restmsg
c <- getState <|> error "rawHTTP: no connection?"
let blocked= isBlocked c -- TODO: the same flag is used now for sending and receiving
tr "before blocked"
liftIO $ takeMVar blocked
tr "after blocked"
ctx <- liftIO $ readIORef $ istream c
liftIO $ writeIORef (done ctx) False
modify $ \s -> s{parseContext= ctx} -- actualize the parse context
return c
`while` \c ->do
is <- isTLS c
px <- getHTTProxyParams is
tr ("PX=", px)
(if isJust px then return True else do c <- anyChar ; tPutStr $ BS.singleton c; tr "anyChar"; return True) <|> do
TOD t _ <- liftIO $ getClockTime
-- ("PUTMVAR",nodeHost node)
liftIO $ putMVar (isBlocked c) $ Just t
liftIO (writeIORef (connData c) Nothing)
mclose c
tr "CONNECTION EXHAUSTED,RETRYING WITH A NEW CONNECTION"
return False
modify $ \s -> s{execMode=Serial}
let blocked= isBlocked c -- TODO: the same flag is used now for sending and receiving
tr "after send"
--showNext "NEXT" 100
--try (do r <-tTake 10;liftIO $ print "NOTPARSED"; liftIO $ print r; empty) <|> return()
first@(vers,code,_) <- getFirstLineResp <|> do
r <- notParsed
error $ "No HTTP header received:\n"++ up r
tr ("FIRST line",first)
headers <- getHeaders
let hdrs= HTTPHeaders first headers
setState hdrs
--tr ("HEADERS", first, headers)
guard (BC.head code== '2')
<|> do Raw body <- parseBody headers
error $ show (hdrs,body) -- decode the body and print
result <- parseBody headers
when (vers == http10 ||
-- BS.isPrefixOf http10 str ||
lookup "Connection" headers == Just "close" )
$ do
TOD t _ <- liftIO $ getClockTime
liftIO $ putMVar blocked $ Just t
liftIO $ mclose c
liftIO $ takeMVar blocked
return()
--tr ("result", result)
--when (not $ null rest) $ error "THERE WERE SOME REST"
ctx <- gets parseContext
-- "SET PARSECONTEXT PREVIOUS"
liftIO $ writeIORef (istream c) ctx
TOD t _ <- liftIO $ getClockTime
-- ("PUTMVAR",nodeHost node)
liftIO $ putMVar blocked $ Just t
if (isJust mcon) then setData (fromJust mcon) else delData c
return result
where
isTLS c= liftIO $ do
cdata <- readIORef $ connData c
case cdata of
Just(TLSNode2Node _) -> return True
_ -> return False
while act fix= do r <- act; b <- fix r; if b then return r else act
parseBody headers= case lookup "Transfer-Encoding" headers of
Just "chunked" -> dechunk |- deserialize
_ -> case fmap (read . BC.unpack) $ lookup "Content-Length" headers of
Just length -> do
msg <- tTake length
tr ("GOT", length)
withParseString msg deserialize
_ -> do
str <- notParsed -- TODO: must be strict to avoid premature close
BS.length str `seq` withParseString str deserialize
getFirstLineResp= do
-- showNext "getFirstLineResp" 20
(,,) <$> httpVers <*> (BS.toStrict <$> getCode) <*> getMessage
where
httpVers= tTakeUntil (BS.isPrefixOf "HTTP" ) >> parseString
getCode= parseString
getMessage= tTakeUntilToken ("\r\n")
--con<- getState <|> error "rawHTTP: no connection?"
--mclose con xxx
--maybeClose vers headers c str
dechunk= do
n<- numChars
if n== 0 then do string "\r\n"; return SDone else do
r <- tTake $ fromIntegral n !> ("numChars",n)
--tr ("message", r)
trycrlf
tr "SMORE1"
return $ SMore r
<|> return SDone !> "SDone in dechunk"
where
trycrlf= try (string "\r\n" >> return()) <|> return ()
numChars= do l <- hex ; tDrop 2 >> return l
#endif
-- | crawl the nodes executing the same action in each node and accumulate the results using a binary operator
foldNet :: Loggable a => (Cloud a -> Cloud a -> Cloud a) -> Cloud a -> Cloud a -> Cloud a
foldNet op init action = atServer $ do
ref <- onAll $ liftIO $ newIORef Nothing -- eliminate additional results due to unneded parallelism when using (<|>)
r <- exploreNetExclude []
v <-localIO $ atomicModifyIORef ref $ \v -> (Just r, v)
case v of
Nothing -> return r
Just _ -> empty
where
exploreNetExclude nodesexclude = loggedc $ do
local $ tr "EXPLORENETTTTTTTTTTTT"
action `op` otherNodes
where
otherNodes= do
node <- local getMyNode
nodes <- local getNodes'
tr ("NODES to explore",nodes)
let nodesToExplore= Prelude.tail nodes \\ (node:nodesexclude)
callNodes' nodesToExplore op init $
exploreNetExclude (union (node:nodesexclude) nodes)
getNodes'= getEqualNodes -- if isBrowserInstance then return <$>getMyNode -- getEqualNodes
-- else (++) <$> getEqualNodes <*> getWebNodes
exploreNet :: (Loggable a,Monoid a) => Cloud a -> Cloud a
exploreNet = foldNet mappend mempty
exploreNetUntil :: (Loggable a) => Cloud a -> Cloud a
exploreNetUntil = foldNet (<|>) empty
-- | only execute if the the program is executing in the browser. The code inside can contain calls to the server.
-- Otherwise return empty (so it stop the computation and may execute alternative computations).
onBrowser :: Cloud a -> Cloud a
onBrowser x= do
r <- local $ return isBrowserInstance
if r then x else empty
-- | only executes the computaion if it is in the server, but the computation can call the browser. Otherwise return empty
onServer :: Cloud a -> Cloud a
onServer x= do
r <- local $ return isBrowserInstance
if not r then x else empty
-- | If the computation is running in the server, translates i to the browser and return back.
-- If it is already in the browser, just execute it
atBrowser :: Loggable a => Cloud a -> Cloud a
atBrowser x= do
r <- local $ return isBrowserInstance
if r then x else atRemote x
-- | If the computation is running in the browser, translates i to the server and return back.
-- If it is already in the server, just execute it
atServer :: Loggable a => Cloud a -> Cloud a
atServer x= do
r <- local $ return isBrowserInstance
tr ("AT SERVER",r)
if not r then x else atRemote x
------------------- timeouts -----------------------
-- delete connections.
-- delete receiving closures before sending closures
delta= 60 -- 3*60
connectionTimeouts :: TransIO ()
connectionTimeouts= do
labelState "loop connections"
threads 0 $ waitEvents $ return () --loop
liftIO $ threadDelay 10000000
-- tr "timeouts"
TOD time _ <- liftIO $ getClockTime
toClose <- liftIO $ atomicModifyIORef connectionList $ \ cons ->
Data.List.partition (\con ->
let mc= unsafePerformIO $ readMVar $ isBlocked con
in isNothing mc || -- check that is not doing some IO
((time - fromJust mc) < delta) ) cons -- !> Data.List.length cons
-- time etc are in a IORef
forM_ toClose $ \c -> liftIO $ do
tr "close "
tr $ idConn c
when (calling c) $ mclose c
cleanConnectionData c -- websocket connections close everithing on timeout
cleanConnectionData c= liftIO $ do
-- reset the remote accessible closures
modifyIORef globalFix $ \m -> M.insert (idConn c) (False,[]) m
modifyMVar_ (localClosures c) $ const $ return M.empty
modifyIORef globalFix $ \m -> M.insert (idConn c) (True,[]) m
loopClosures= do
labelState "loop closures"
threads 0 $ do -- in the current thread
waitEvents $ threadDelay 5000000 -- every 5 seconds
nodes <- getNodes -- get the nodes known
node <- choose $ tail nodes -- walk trough them, except my own node
guard (isJust $ connection node) -- when a node has connections
nc <- liftIO $ readMVar $ fromJust (connection node) -- get them
conn <- choose nc -- and walk trough them
lcs <- liftIO $ readMVar $ localClosures conn -- get the local endpoints of this node for that connection
(closLocal,(mv,clos,_,cont)) <- choose $ M.toList lcs -- walk trough them
chs <- liftIO $ readMVar $ children $ fromJust $ parent cont -- get the threads spawned by requests to this endpoint
return()
--return ("NUMBER=",length chs)
guard (null chs) -- when there is no activity
tr ("REMOVING", closLocal)
liftIO $ modifyMVar (localClosures conn) $ \lcs -> return $ (M.delete closLocal lcs,()) -- remove the closure
msend conn $ SLast $ ClosureData clos closLocal mempty -- notify the remote node
-- tr ("THREADS ***************", length chs)