----------------------------------------------------------------------------- -- | -- Module : Conjure.Network.Peer -- Copyright : (c) Lemmih 2005 -- License : BSD-like -- -- Maintainer : lemmih@gmail.com -- Stability : experimental -- Portability : non-portable (requires STM) -- ----------------------------------------------------------------------------- -- FIXME: refactor module Conjure.Network.Peer ( connectionFromPeer , connectionToPeer ) where import Conjure.Utils.Transaction import Conjure.Types import Conjure.Utils import Conjure.Torrent ( infoNumPieces ) import Conjure.Piecemap ( mkPiecemap, setPiecemapBit , emptyPiecemap, fromPiecemap , addPiecemap, addPiece, delPiecemap ) import Conjure.Logic.BlockManagement import Conjure.Logic.QueueManager import Conjure.Protocol.PWP.Types as PWP import Conjure.Protocol.PWP.Printer import Conjure.Protocol.PWP.Parser import Conjure.Debug import Conjure.STM.PeerCtrl import Control.Concurrent import Control.Concurrent.STM import Control.Exception ( Exception (..), catch, throw, finally, assert , bracket ) import Control.Monad ( when, mplus, forever ) import System.IO ( Handle, hClose ) import qualified Data.Map as Map import Data.Map (Map) import qualified Data.Set as Set import Data.List ( delete ) import qualified Data.ByteString as BS import Data.ByteString (ByteString) import Prelude hiding (catch) -------------------------------------------------------------- -- Interface -------------------------------------------------------------- connectionFromPeer :: TVar [ConnectedPeer] -> TVar (Map ByteString ActiveTorrent) -> Handle -> IO () connectionFromPeer connectedPeers torrentMap handle = do Handshake infohash peerid <- hGetHandshake handle torrents <- atomically $ readTVar torrentMap at <- case Map.lookup infohash torrents of Nothing -> disconnect Just at -> return at hPutHandshake handle (Handshake infohash (atPeerId at)) doConnection peerid at connectedPeers handle `catchDisconnect` handle connectionToPeer :: ActiveTorrent -> TVar [ConnectedPeer] -> Handle -> IO () connectionToPeer at connectedPeers handle = do left <- atomically $ readTVar (atLeft at) when (left == 0) disconnect hPutHandshake handle (Handshake myInfohash myPeerId) Handshake infohash peerid <- hGetHandshake handle when (infohash /= myInfohash) disconnect doConnection peerid at connectedPeers handle `catchDisconnect` handle where myInfohash = tInfoHash torrent myPeerId = atPeerId at torrent = atTorrent at withConnectedPeer :: PeerId -> ActiveTorrent -> TVar [ConnectedPeer] -> (ConnectedPeer -> IO a) -> IO a withConnectedPeer remotePeerId at connectedPeers = bracket createPeer destroyPeer where createPeer = atomically $ do connPeer <- registerConnectedPeer connectedPeers remotePeerId (atTorrent at) modifyTVar_ (atPeers at) (connPeer:) return connPeer destroyPeer peer = do atomically $ do piecemap <- readTVar (cpPiecemap peer) modifyTVar_ (atUsecount at) (delPiecemap piecemap) removePeer connectedPeers remotePeerId removePeer (atPeers at) remotePeerId tids <- atomically (readTVar (cpThreads peer)) mapM_ killThread (Set.toList tids) doConnection :: PeerId -> ActiveTorrent -> TVar [ConnectedPeer] -> Handle -> IO () doConnection remotePeerId at connectedPeers handle = withConnectedPeer remotePeerId at connectedPeers $ \connPeer -> do piecemap <- atomically (readTVar (atPiecemap at)) hPutMessage handle (BitField (fromPiecemap piecemap)) handlePeer handle at connPeer -------------------------------------------------------------- -- Network utility functions -------------------------------------------------------------- -- FIXME: refactor handleNewInput :: Handle -> ActiveTorrent -> ConnectedPeer -> Message -> Transaction () handleNewInput _ at connPeer (RequestedPiece idx offset blockData) = registerBlock at connPeer (idx, offset, BS.length blockData) blockData handleNewInput _ at connPeer msg = stm $ case msg of Choke -> do setRemoteChoke connPeer True abandonBlocks connPeer Unchoke -> setRemoteChoke connPeer False Interested -> setRemoteInterest connPeer True NotInterested -> setRemoteInterest connPeer False BitField fs -> do let piecemap = mkPiecemap nPieces fs writeTVar (cpPiecemap connPeer) piecemap modifyTVar_ (atUsecount at) (addPiecemap piecemap) rescanInterest at connPeer Have num -> do setPiecemapBit (cpPiecemap connPeer) num True modifyTVar_ (atUsecount at) (addPiece num) recalculateInterest False num at connPeer Request idx offset len -> do status <- queryUploadLegality connPeer case status of UploadIllegal -> stmDebug "Illegal download request." >> disconnect UploadIgnore -> stmDebug "Ignoring download request." >> return () UploadGrant -> sendPiece connPeer idx offset len -- ^ Tell the upload handler to add this piece to the upload queue. Cancel idx offset len -> cancelUpload connPeer idx offset len KeepAlive -> return () _ -> stmDebug $ "Unhandled message: " ++ show msg where nPieces = infoNumPieces (cpTorrent connPeer) keepAliveThread :: Handle -> ConnectedPeer -> IO () keepAliveThread _ peer = loop where loop = do threadDelay (30 * 10^(6::Int)) atomically $ do lChoke <- getLocalChoke peer rChoke <- getRemoteChoke peer lInterest <- getLocalInterest peer rInterest <- getRemoteInterest peer let download = not rChoke && lInterest upload = not lChoke && rInterest if not download && not upload then sendMessage peer KeepAlive else retry loop forkThread :: String -> ConnectedPeer -> IO () -> IO () forkThread _ connPeer action = do --debug $ "Forking thread: " ++ name block <- newEmptyMVar tid <- forkIO (do tid <- readMVar block action `finally` do atomically $ unregisterThread connPeer tid ) atomically $ registerThread connPeer tid putMVar block tid -- FIXME: refactor handlePeer :: Handle -> ActiveTorrent -> ConnectedPeer -> IO () handlePeer handle at connPeer = do forkThread "outputThread" connPeer (vital outputThread) forkThread "inputThread" connPeer (vital inputThread) forkThread "keepAliveThread" connPeer (keepAliveThread handle connPeer) forkThread "queueManager" connPeer (queueManager at connPeer) let loop = forever $ runT $ do stm $ stmTransactionName "handlePeer" action <- stm $ readTChan (cpMsgChan connPeer) case action of NewInput msg -> handleNewInput handle at connPeer msg Disconnect -> disconnect SockError e -> do stm $ stmDebug $ "SockError: " ++ show e disconnect loop `finally` atomically (abandonBlocks connPeer) where vital action = action `catch` \e -> atomically $ writeTChan (cpMsgChan connPeer) (SockError e) outputThread = do msgList <- atomically $ newTVar [] forever $ runT $ do action <- stm $ readTChan (cpOutChan connPeer) case action of SendMessage msg -> onCommit (hPutMessage handle msg) SendPiece idx offset len -> stm $ modifyTVar_ msgList ((idx,offset,len):) ClearPiece idx offset len -> stm $ do pieces <- readTVar msgList assert ((idx,offset,len) `elem` pieces) $ return () writeTVar msgList (delete (idx,offset,len) pieces) ClearPieces -> stm (writeTVar msgList []) `mplus` do msg <- stm $ readTVar msgList case msg of [] -> stm $ retry ((idx,offset,len):xs) -> do onCommit $ do payload <- readBlock (atBackend at) idx offset len assert (BS.length payload == len) $ hPutMessage handle (RequestedPiece idx offset payload) now <- getCurrentTime atomically $ do recordTiming (cpUploadTimings connPeer) now len modifyTVar_ (atUploaded at) (+ (fromIntegral len)) stm $ writeTVar msgList xs reporter now bps = atomically $ recordTiming (cpDownloadTimings connPeer) now bps inputThread = do msg <- hGetMessage reporter handle atomically $ writeTChan (cpMsgChan connPeer) (NewInput msg) inputThread abandonBlocks :: ConnectedPeer -> STM () abandonBlocks connPeer = do blocks <- readTVar (cpPendingBlocks connPeer) mapM_ (abandonBlock connPeer) (Map.elems blocks) writeTVar (cpPendingBlocks connPeer) Map.empty -------------------------------------------------------------- -- ConnectedPeer utility functions -------------------------------------------------------------- newConnectedPeer :: PeerId -> Torrent -> STM ConnectedPeer newConnectedPeer peerid torrent = do threads <- newTVar Set.empty ctrlChan <- newTChan outChan <- newTChan piecemap <- newTVar (emptyPiecemap (infoNumPieces torrent)) localChoke <- newTVar True remoteChoke <- newTVar True localInterest <- newTVar False remoteInterest <- newTVar False uploadTimings <- newTVar (Timing 0 []) downloadTimings <- newTVar (Timing 0 []) pBlocks <- newTVar Map.empty qLength <- newTVar 2 -- isDownloading <- newTVar False -- isUploading <- newTVar False return $ ConnectedPeer { cpThreads = threads , cpMsgChan = ctrlChan , cpOutChan = outChan , cpPeerId = peerid , cpTorrent = torrent , cpPiecemap = piecemap , cpLocalChoke = localChoke , cpRemoteChoke = remoteChoke , cpLocalInterest = localInterest , cpRemoteInterest = remoteInterest , cpUploadTimings = uploadTimings , cpDownloadTimings = downloadTimings , cpPendingBlocks = pBlocks , cpQueueLength = qLength } registerConnectedPeer :: TVar [ConnectedPeer] -> PeerId -> Torrent -> STM ConnectedPeer registerConnectedPeer connectedPeers peerid torrent = do when (BS.null $ unPeerId peerid) disconnect -- Null PeerID means that this is handshake from -- tracker which tries to check whether we are -- behind the NAT. We should disconnect, because -- tracker closes connection immediately after this -- handshake peers <- readTVar connectedPeers -- FIXME: use Sets let torrentPeers = [ cpPeerId peer | peer <- peers , tInfoHash (cpTorrent peer) == tInfoHash torrent] when (peerid `elem` torrentPeers) disconnect connPeer <- newConnectedPeer peerid torrent writeTVar connectedPeers (connPeer:peers) return connPeer -- FIXME: move this data UploadLegality = UploadIllegal | UploadIgnore | UploadGrant queryUploadLegality :: ConnectedPeer -> STM UploadLegality queryUploadLegality connPeer = do sc <- getLocalChoke connPeer ri <- getRemoteInterest connPeer case (sc,ri) of (_,False) -> return UploadIllegal (True,_) -> return UploadIgnore _ -> return UploadGrant removePeer :: TVar [ConnectedPeer] -> PeerId -> STM () removePeer peerLst peerid = modifyTVar_ peerLst $ \lst -> assert (length (filter (\p -> cpPeerId p == peerid) lst) == 1) $ filter (\p -> cpPeerId p /= peerid) lst -------------------------------------------------------------- -- Other utility functions -------------------------------------------------------------- disconnect :: Monad m => m a disconnect = error "disconnect" catchDisconnect :: IO () -> Handle -> IO () catchDisconnect action h = action `catch` \e -> hClose h >> handle e where handle (ErrorCall "disconnect") = putStrLn $ "Disconnecting from: " ++ show h handle (ErrorCall errMsg) = throw (ErrorCall ("Error: " ++ errMsg)) handle e = throw e