----------------------------------------------------------------------------- -- | -- Module : Conjure.STM.PeerCtrl -- Copyright : (c) Lemmih 2006 -- License : BSD-like -- -- Maintainer : lemmih@gmail.com -- Stability : experimental -- Portability : non-portable (requires STM) -- ----------------------------------------------------------------------------- module Conjure.STM.PeerCtrl ( -- Peer actions disconnectPeer , sendMessage , sendPiece , cancelUpload , cancelUploads , sendCancel , sendRequest -- Network timings , recordTiming , getTimingAge , getTiming -- Utilities for peer helper threads , registerThread , unregisterThread -- Peer state , getLocalChoke , setLocalChoke , getRemoteChoke , setRemoteChoke , getLocalInterest , getRemoteInterest , setRemoteInterest -- Peer interest calculations , rescanInterest , recalculateInterest ) where import Data.List import Data.Array.Base import Conjure.Types import Conjure.Piecemap import Conjure.Protocol.PWP.Types import Control.Concurrent.STM import Control.Exception import Control.Monad import GHC.Conc import qualified Data.Set as Set disconnectPeer :: ConnectedPeer -> STM () disconnectPeer peer = writeTChan (cpMsgChan peer) Disconnect sendMessage :: ConnectedPeer -> Message -> STM () sendMessage connPeer msg = writeTChan (cpOutChan connPeer) (SendMessage msg) sendPiece :: ConnectedPeer -> Int -> Int -> Int -> STM () sendPiece connPeer idx offset len = writeTChan (cpOutChan connPeer) (SendPiece idx offset len) cancelUpload :: ConnectedPeer -> Int -> Int -> Int -> STM () cancelUpload connPeer idx offset len = writeTChan (cpOutChan connPeer) (ClearPiece idx offset len) cancelUploads :: ConnectedPeer -> STM () cancelUploads connPeer = writeTChan (cpOutChan connPeer) ClearPieces sendCancel :: ConnectedPeer -> Block -> STM () sendCancel connPeer (piece, offset, len) = sendMessage connPeer (Cancel (pIndex piece) offset len) sendRequest :: ConnectedPeer -> Block -> STM () sendRequest connPeer (piece, offset, len) = sendMessage connPeer (Request (pIndex piece) offset len) recordTiming :: TVar Timing -> Integer -> Int -> STM () recordTiming tvar date size = do Timing _ tmings <- readTVar tvar writeTVar tvar (Timing date $ take 40 $ (date,size):tmings) -- We only want to keep 40 timings. --bps <- getTiming date 20000 tvar --stmDebug $ " b/s: " ++ show bps ++ ", kb/s: " ++ show (bps `div` 1024) getTimingAge :: Integer -> TVar Timing -> STM Integer getTimingAge now tvar = do Timing lastRecording _ <- readTVar tvar return (now - lastRecording) getTiming :: Integer -- ^ Current date in milliseconds. -> Integer -- ^ Age limit in milliseconds. -- 20000 => only consider timings from the last 20 seconds. -> TVar Timing -- TVar containing the timings. -> STM Int -- bps getTiming now limit tvar = do Timing _ tmings <- readTVar tvar let t' = filter ((>now-limit) . fst) tmings timeSpan = fromIntegral (now - fst (last t')) bps = (fromIntegral (sum (map snd t')) / (timeSpan / 1000) ::Double) if null t' then return 0 else return (round bps) registerThread :: ConnectedPeer -> ThreadId -> STM () registerThread peer tid = do threads <- readTVar (cpThreads peer) writeTVar (cpThreads peer) (Set.insert tid threads) unregisterThread :: ConnectedPeer -> ThreadId -> STM () unregisterThread peer tid = do threads <- readTVar (cpThreads peer) assert (Set.member tid threads) $ writeTVar (cpThreads peer) (Set.delete tid threads) getLocalChoke :: ConnectedPeer -> STM Bool getLocalChoke = readTVar . cpLocalChoke setLocalChoke :: ConnectedPeer -> Bool -> STM () setLocalChoke connPeer val = writeTVar (cpLocalChoke connPeer) val getRemoteChoke :: ConnectedPeer -> STM Bool getRemoteChoke = readTVar . cpRemoteChoke setRemoteChoke :: ConnectedPeer -> Bool -> STM () setRemoteChoke connPeer val = writeTVar (cpRemoteChoke connPeer) val getLocalInterest :: ConnectedPeer -> STM Bool getLocalInterest = readTVar . cpLocalInterest setLocalInterest :: ConnectedPeer -> Bool -> STM () setLocalInterest connPeer val = writeTVar (cpLocalInterest connPeer) val getRemoteInterest :: ConnectedPeer -> STM Bool getRemoteInterest = readTVar . cpRemoteInterest setRemoteInterest :: ConnectedPeer -> Bool -> STM () setRemoteInterest connPeer val = writeTVar (cpRemoteInterest connPeer) val {- Local interest \ piece origins. | Local | Remote | Interested | True | False | Not interested | False | True | -} possibleStateChange :: Bool {- local piece -} -> Bool {- local interest -} -> Bool possibleStateChange True lInterest = lInterest possibleStateChange False lInterest = not lInterest {- Perform a complete interest rescan. This is necessary after we receive bitfields. -} rescanInterest :: ActiveTorrent -> ConnectedPeer -> STM () rescanInterest at connPeer = do lInterest <- getLocalInterest connPeer when (possibleStateChange False lInterest) $ rescanInterest' lInterest at connPeer {- Perform a minimal interest re-calculation. Only a single bit has changed so we may be able to tell our interest without costly calculations. -} recalculateInterest :: Bool -> Int -> ActiveTorrent -> ConnectedPeer -> STM () recalculateInterest local idx at connPeer = do lInterest <- getLocalInterest connPeer when (possibleStateChange local lInterest) $ do piecemap <- readTVar $ if local then cpPiecemap connPeer else atPiecemap at case (unsafeAt piecemap idx, local) of -- We received a new piece and he has it too. Are we still interested? (True, True) -> rescanInterest' lInterest at connPeer -- He received a new piece which we don't have. Assert our interest. (False, False) -> do setLocalInterest connPeer True sendMessage connPeer Interested -- Either he received a piece that we have or we received a piece that he hasn't. -- No interest change possible. _ -> return () rescanInterest' :: Bool -> ActiveTorrent -> ConnectedPeer -> STM () rescanInterest' lInterest at connPeer = do rPiecemap <- readTVar (cpPiecemap connPeer) lPiecemap <- readTVar (atPiecemap at) case findNewPieces lPiecemap rPiecemap of [] -> when lInterest $ do setLocalInterest connPeer False sendMessage connPeer NotInterested _ -> unless lInterest $ do setLocalInterest connPeer True sendMessage connPeer Interested