module Control.CUtils.NetChan (NetSend, NetRecv, localHost, newNetChan, newNetSend, newNetRecv, send, recv, recvSend, sendRecv, recvRecv, activateSend, activateRecv) where
import System.IO
import System.Process
import Data.List (find, isPrefixOf, isInfixOf, (\\))
import Network
import Network.Socket (socketToHandle, SockAddr(..))
import Network.BSD
import Control.Concurrent
import Control.Monad
import Data.ByteString.Lazy (ByteString, hGet, hPut, length, fromChunks, append, empty)
import qualified Data.ByteString as B
import Data.Binary
import qualified Data.Map as M
import Data.Maybe
import Data.Char
import Data.IORef
import Data.Bits
import Control.Exception
import System.IO.Unsafe
import Prelude hiding (lookup, length, catch)
import Control.CUtils.Split
type Ident = ByteString
serverup = unsafePerformIO (newMVar False)
table :: MVar (M.Map Ident (ByteString -> IO ()))
table = unsafePerformIO (newMVar (M.singleton empty (\_ -> return ())))
data ChannelFibre t = ChannelFibre (MVar Bool) Handle
data NetSend t = NetSend HostName Ident (MVar [HostName]) (MVar [ChannelFibre t])
data NetRecv t = NetRecv Ident (NetSend t) (NetSend HostName) (Chan t)
instance Eq (ChannelFibre t) where
ChannelFibre _ hdl == ChannelFibre _ hdl2 = hdl == hdl2
instance Eq (NetSend t) where
NetSend _ ident _ _ == NetSend _ ident2 _ _ = ident == ident2
instance Eq (NetRecv t) where
NetRecv ident _ _ _ == NetRecv ident2 _ _ _ = ident == ident2
port = 2999
getIPAddress :: String -> Word32
getIPAddress ip = shiftL n4 24 .|. shiftL n3 16 .|. shiftL n2 8 .|. n1 where
[n1,n2,n3,n4] = map read $ split '.' ip
localHost = liftM (drop 39 . head . dropWhile (not . isPrefixOf " IPv4") . lines) $ readProcess "ipconfig" [] []
identifier :: String -> Word32 -> Ident
identifier ip entry = encode (entry, getIPAddress ip)
newNetChan :: (Binary t) => IO (NetRecv t, NetSend t)
newNetChan = do
mp <- readMVar table
host <- localHost
let ident = identifier host (fromIntegral (M.size mp))
liftM2 (,) (__newNetRecv True Nothing ident) (__newNetSend True host ident)
modifyIdent b ident = append (fromChunks [B.pack $ map (fromIntegral . ord) $ if b then "main" else "back"]) ident
__emptyNetSend :: Bool -> NetSend HostName -> HostName -> Ident -> IO (NetSend t)
__emptyNetSend b backDown hostName ident = do
let ident' = modifyIdent b ident
buffer <- newMVar []
if b then do
backR <- __newNetRecv False (Just backDown) ident
let loop = do
host <- recv backR
modifyMVar_ buffer (return . (host:))
loop
forkIO loop
else
return undefined
mvar <- newMVar []
return (NetSend hostName ident' buffer mvar)
__addConnection s@(NetSend _ ident buffer mvar) hostName = do
mvar2 <- newMVar False
hdl <- withSocketsDo $ connectTo hostName (PortNumber port)
hSetBuffering hdl (BlockBuffering (Just 1024))
hPut hdl ident
upstreams <- readMVar buffer
let bs = encode (hostName : upstreams)
hPut hdl $ encode $ length bs
hPut hdl bs
hFlush hdl
modifyMVar_ mvar (return . (ChannelFibre mvar2 hdl:))
__newNetSend b hostName ident = do
s <- if b then
__emptyNetSend False undefined "" ident
else
return undefined
s <- __emptyNetSend b s hostName ident
__addConnection s hostName
return s
newNetSend hostName = __newNetSend True hostName (identifier hostName 0)
readLoop f hdl = do
n <- liftM decode (hGet hdl 8)
bs <- hGet hdl n
f bs
readLoop f hdl
server socket = withSocketsDo $ do
let loop = do
(hdl, host, _) <- accept socket
ident <- hGet hdl 12
may <- liftM (M.lookup ident) $ readMVar table
maybe
(hPutStrLn stderr ("The host " ++ host ++ " used an invalid Ident: " ++ show ident))
(\f -> forkIO (withSocketsDo (readLoop f hdl)) >> return ())
may
loop
loop
__newNetRecv :: (Binary t) => Bool -> Maybe (NetSend t) -> Ident -> IO (NetRecv t)
__newNetRecv b may ident = do
chan <- newChan
backS <- if b then
__emptyNetSend False undefined "" ident
else
return undefined
downstream <- maybe
(__emptyNetSend b backS "" ident)
return
may
let ident' = modifyIdent b ident
gotUpstreams <- newIORef False
let listener bs = do
got <- readIORef gotUpstreams
if got then do
let x = decode bs
writeChan chan x
send downstream x
else do
writeIORef gotUpstreams True
let x:xs = decode bs
when b $ do
let NetSend _ _ buffer _ = backS
modifyMVar_ buffer (\_ -> return xs)
__addConnection backS x
modifyMVar_ table (return . M.insert ident' listener)
modifyMVar_ serverup (\b -> unless b (withSocketsDo $ listenOn (PortNumber port) >>= forkIO . server >> return ()) >> return True)
return (NetRecv ident' downstream backS chan)
newNetRecv :: (Binary t) => IO (NetRecv t)
newNetRecv = localHost >>= \host -> __newNetRecv True Nothing (identifier host 0)
routeAround fib s@(NetSend _ ident buffer mvar) = do
hosts <- modifyMVar buffer (\ls -> return ([], ls))
mapM_ (__addConnection s) hosts
modifyMVar_ mvar (return . (\\[fib]))
send :: (Binary t) => NetSend t -> t -> IO ()
send snd@(NetSend _ ident _ mvar) x = readMVar mvar >>= mapM_ (\fib@(ChannelFibre mvar hdl) -> do
b <- modifyMVar mvar (\b -> let s = encode x in
s `seq` catch (hPut hdl (encode (length s)) >> hPut hdl s) (\(_ :: SomeException) -> routeAround fib snd >> send snd x)
>> return (True, b))
unless b $ void $ forkIO $ do
threadDelay 100000
modifyMVar_ mvar (\_ -> return False)
catch (hFlush hdl) (\(_ :: SomeException) -> routeAround fib snd >> send snd x))
recv (NetRecv _ _ _ chan) = readChan chan
recvSend r = recv r >>= activateSend
sendRecv s@(NetSend hostName _ _ mvar) r@(NetRecv ident s2 backS _) = do
send s r
__addConnection s2 hostName
send backS hostName
recvRecv r = recv r >>= activateRecv
instance Binary (NetSend t) where
put (NetSend hostName ident _ _) = put hostName >> put ident
get = liftM2 (\x y -> NetSend x y undefined undefined) get get
instance Binary (NetRecv t) where
put (NetRecv ident _ _ _) = put ident
get = liftM (\x -> NetRecv x undefined undefined undefined) get
activateSend :: NetSend t -> IO (NetSend t)
activateSend (NetSend hostName ident _ _) = __newNetSend True hostName ident
activateRecv :: (Binary t) => NetRecv t -> IO (NetRecv t)
activateRecv (NetRecv x _ _ _) = __newNetRecv True Nothing x