----------------------------------------------------------------------------- -- | -- Module : Conjure.Network.Client -- Copyright : (c) Lemmih 2005-2006 -- License : BSD-like -- -- Maintainer : lemmih@gmail.com -- Stability : experimental -- Portability : non-portable (requires STM) -- ----------------------------------------------------------------------------- module Conjure.Network.Client ( runClient , shutdownClient , shutdownAll , shutdownClient' ) where import Conjure.Types import Conjure.Utils import Conjure.Torrent import Conjure.STM.PeerCtrl import Conjure.Piecemap ( scanTorrent, drawPiecemap, emptyUsecount ) import Conjure.Network.Peer import qualified Data.ByteString.Char8 as BS import Data.ByteString (ByteString) import Conjure.Protocol.THP import Control.Concurrent import Control.Exception import Control.Concurrent.STM import Control.Monad import System.Random ( randomRIO ) import Network import Text.Printf ( printf ) import qualified Data.IntMap as IntMap import qualified Data.Map as Map import Data.Map ( Map ) import Data.Array.Diff ( assocs ) import Data.List ( nubBy ) import Prelude hiding (catch) newActiveTorrent :: Torrent -> Backend -> PeerId -> ThreadId -> Int -> IO ActiveTorrent newActiveTorrent torrent backend myPeerId tid nPieces = do piecemap <- scanTorrent torrent backend putStrLn ("Piecemap: " ++ drawPiecemap piecemap "") let incomplete_pieces = [ i | (i,False) <- assocs piecemap ] atomically $ do usecount <- newTVar (emptyUsecount nPieces) piecemapVar <- newTVar piecemap pieces <- newTVar (IntMap.empty) peers <- newTVar [] downloaded <- newTVar 0 uploaded <- newTVar 0 left <- newTVar $ fromIntegral $ sum $ map (pieceLength torrent) incomplete_pieces return (ActiveTorrent { atTorrent = torrent, atClient = tid, atBackend = backend , atPeerId = myPeerId, atUsecount = usecount , atPiecemap = piecemapVar, atPieces = pieces, atPeers = peers , atDownloaded = downloaded, atUploaded = uploaded, atLeft = left}) sendTrackerRequest :: PortNumber -> Maybe Event -> ActiveTorrent -> IO Response sendTrackerRequest portNum event at = do up <- atomically $ readTVar (atUploaded at) down <- atomically $ readTVar (atDownloaded at) left <- atomically $ readTVar (atLeft at) queryTracker torrent myPeerId Nothing portNum up down left event `catch` \e -> return $ Error $ show e where torrent = atTorrent at myPeerId = atPeerId at -- FIXME: we need some kind of mechanism to implement "try to maintain N peers, but no more than M, N TVar [ConnectedPeer] -- Use a Set? -> TVar (Map ByteString ActiveTorrent) -> Torrent -> Backend -> IO () runClient portNum connectedPeers torrentMap torrent backend = do myPeerIdStr <- liftM BS.pack $ replicateM 20 (randomRIO ('a','z')) -- ('\0','\255')) let myPeerId = PeerId myPeerIdStr tid <- myThreadId at <- newActiveTorrent torrent backend myPeerId tid nPieces atomically $ modifyTVar_ torrentMap $ Map.insert (tInfoHash torrent) at let loop event = do putStrLn "Sending request to tracker" resp <- sendTrackerRequest portNum event at case resp of Error str -> do putStrLn $ "Error: " ++ str threadDelay (60 * 10^(6::Int)) Info { rspInterval = interval , rspPeers = peers} -> do flip mapM_ (distinct peers) $ \peer -> forkIO $ do peerHandle <- connectTo (iPeerIp peer) (PortNumber (fromIntegral (iPeerPort peer))) printf " %s:%d\t %s\n" (iPeerIp peer) (iPeerPort peer) (show peerHandle) connectionToPeer at connectedPeers peerHandle `catch` \e -> print e >> return () putStrLn $ "Delaying " ++ show interval ++ "secs" replicateM_ 10 (threadDelay (interval * 10^(5::Int))) loop Nothing loop (Just Started) `catch` clientExceptionHandler connectedPeers torrentMap torrent at where nPieces = infoNumPieces torrent distinct peers = nubBy equalHostAndPort peers equalHostAndPort peer1 peer2 = (iPeerIp peer1, iPeerPort peer1) == (iPeerIp peer2, iPeerPort peer2) shutdownClient :: Torrent -> TVar (Map ByteString ActiveTorrent) -> IO () shutdownClient torrent torrentMap = do mbAt <- atomically $ fmap (Map.lookup (tInfoHash torrent)) (readTVar torrentMap) case mbAt of Nothing -> error "Conjure.Network.Client.shutdownClient: torrent not found." Just at -> do shutdownClient' at atomically $ do m <- readTVar torrentMap when (Map.member (tInfoHash torrent) m) retry shutdownAll :: TVar (Map ByteString ActiveTorrent) -> IO () shutdownAll torrentMap = do ats <- atomically $ readTVar torrentMap mapM_ shutdownClient' (Map.elems ats) atomically $ do ats' <- readTVar torrentMap unless (Map.null ats') retry shutdownClient' :: ActiveTorrent -> IO () shutdownClient' at = throwTo (atClient at) (ErrorCall "shutdown") clientExceptionHandler :: TVar [ConnectedPeer] -> TVar (Map ByteString ActiveTorrent) -> Torrent -> ActiveTorrent -> Exception -> IO () clientExceptionHandler peers torrentMap torrent at _ = do sendTrackerRequest 0 (Just Stopped) at atomically $ do mapM_ disconnectPeer =<< readTVar peers modifyTVar_ torrentMap $ Map.delete (tInfoHash torrent) return ()