----------------------------------------------------------------------------- -- | -- Module : Conjure.Logic.BlockManagement -- Copyright : (c) Lemmih 2005 -- License : BSD-like -- -- Maintainer : lemmih@gmail.com -- Stability : experimental -- Portability : non-portable (requires STM) -- -- The module serves as an intermedium between the backend (harddisk) -- and the frontend (network), providing relevant blocks and -- validating completed pieces. -- A block can either be 'Active' or 'Downloaded'. -- When a peer finishes a block, the blockmanager sends out a cancel signal -- to the other peers downloading the same block. And once a piece is fully -- downloaded, the blockmanager spawns a validator thread. If the piece is -- successfully validated then the local piecemap is updated. If the -- validation fails then all the blocks are reset to (Active []). -- The piece picker works by overlapping the pieces we want with the pieces -- the remote peer got, sorting that list by rarest first and then picking -- the block with the lowest score. Score is the number of peers engaged in -- active download of the block. ----------------------------------------------------------------------------- module Conjure.Logic.BlockManagement ( findInterestingPieces -- :: ActiveTorrent -> ConnectedPeer -> STM [Int] , registerNewBlock -- :: ConnectedPeer -> Torrent -> Piece -> Int -> STM (Int,Int) , requestNewBlock -- :: ActiveTorrent -> ConnectedPeer -> STM (Maybe Block) , abandonBlock -- :: ActiveTorrent -> ConnectedPeer -> Block -> STM () , registerBlock -- :: ActiveTorrent -> ConnectedPeer -> Block -> ByteString -> IO (Maybe Block) ) where import Conjure.Types import Conjure.Utils.Transaction import Conjure.Utils import Conjure.Protocol.PWP.Types import Conjure.Debug import Conjure.STM.PeerCtrl import Conjure.Torrent ( pieceLength, pieceCheckSum ) import Conjure.Constants ( blockSize ) import Conjure.Piecemap ( findNewPieces, setPiecemapBit , orderPieces ) import Conjure.Utils.SHA1 ( sha1 ) import Control.Concurrent ( forkIO ) import Control.Concurrent.STM import Control.Exception ( assert ) import Control.Monad ( when ) import System.Random ( mkStdGen ) import Data.List import Data.Ord ( comparing ) import Data.Array.IArray ( (!) ) import qualified Data.IntMap as IntMap import qualified Data.ByteString as BS import Data.ByteString (ByteString) import qualified Data.Map as Map ( delete, lookup ) -------------------------------------------------------------- -------------------------------------------------------------- -- FIXME: We never update the usecount. findInterestingPieces :: ActiveTorrent -> ConnectedPeer -> STM [Int] findInterestingPieces at (ConnectedPeer {cpPiecemap = remotePiecemap}) = do started <- readTVar (atPieces at) usecount <- readTVar (atUsecount at) ourpieces <- readTVar (atPiecemap at) remote <- readTVar remotePiecemap -- FIXME: constant stdGen. return (filter (remote!) (IntMap.keys started) -- prefer already started pieces ++ orderPieces (mkStdGen 42) usecount (findNewPieces ourpieces remote)) registerNewBlock :: ConnectedPeer -> Torrent -> Piece -> Int -> STM (Int,Int) registerNewBlock connPeer torrent piece blockIdx = do modifyTVar_ (pStatus piece) (worker blockIdx) let offset = blockIdx * blockSize return (offset,min blockSize (pieceLength torrent (pIndex piece) - offset)) where worker _ [] = error "Block index overflow" worker 0 (Downloaded:_) = error "Tried to start download at an already downloaded block" worker 0 (Active peers:xs) = Active (connPeer:peers):xs worker n (x:xs) = x:worker (n-1) xs withPieceStatus :: ([BlockStatus] -> a) -> Piece -> STM a withPieceStatus ac piece = do status <- readTVar (pStatus piece) return (ac status) getPieceScore :: ConnectedPeer -> Piece -> STM (Int,Int) getPieceScore peer = withPieceStatus (minimumBy (comparing snd) . zip [0..] . map getScore) where getScore Downloaded = maxBound getScore (Active lst) | peer `elem` lst = maxBound | otherwise = length lst isCompletedPiece :: Piece -> STM Bool isCompletedPiece = withPieceStatus (all isDownloaded) where isDownloaded Downloaded = True isDownloaded _ = False requestNewBlock :: ActiveTorrent -> ConnectedPeer -> STM (Maybe Block) requestNewBlock at connPeer = do pieces <- readTVar (atPieces at) let mkNewPiece idx = do status <- newTVar (emptyStatus idx) let piece = Piece idx status writeTVar (atPieces at) (IntMap.insert idx piece pieces) return piece findGoodPiece [] (prevPiece, prevScore, prevBlockIdx) | prevScore == maxBound = return Nothing | otherwise = fmap Just (registerFromPiece prevPiece prevBlockIdx) findGoodPiece (x:xs) (prevPiece, prevScore, prevBlockIdx) = case IntMap.lookup x pieces of Nothing -> do piece <- mkNewPiece x fmap Just (registerFromPiece piece 0) Just piece -> do (blockIdx,score) <- getPieceScore connPeer piece case score of 0 -> fmap Just (registerFromPiece piece blockIdx) _ | score < prevScore -> findGoodPiece xs (piece, score, blockIdx) | otherwise -> findGoodPiece xs (prevPiece, prevScore, prevBlockIdx) pieceLst <- findInterestingPieces at connPeer findGoodPiece pieceLst (undefined, maxBound, undefined) where emptyStatus idx = let intDiv a b = (a+b-1) `div` b nBlocks = pieceLength (atTorrent at) idx `intDiv` blockSize in replicate nBlocks (Active []) registerFromPiece piece blockidx = do (blockidx, blockLen) <- registerNewBlock connPeer (atTorrent at) piece blockidx return (piece, blockidx, blockLen) -------------------------------------------------------------- -------------------------------------------------------------- -- | Claim the completed block for your own and tell the other -- downloaders to cancel their download. claimCompletedBlock :: ConnectedPeer -> Block -> STM Bool claimCompletedBlock connPeer block@(piece, offset, _) = do status <- readTVar (pStatus piece) let (status', peers, first) = mkNewStatus 0 [] status writeTVar (pStatus piece) status' flip mapM_ (filter (/=connPeer) peers) $ \peer -> do sendCancel peer block modifyTVar_ (cpPendingBlocks peer) (Map.delete (pIndex piece, offset)) return first where mkNewStatus _ _ [] = error "Walked through piece status without hitting the wanted offset: mkNewStatus" mkNewStatus n ac (x:xs) | n == offset = case x of Active peers -> (reverse ac ++ Downloaded:xs,peers,True) _ -> (reverse ac ++ Downloaded:xs,[], False) | otherwise = mkNewStatus (n+blockSize) (x:ac) xs abandonBlock :: ConnectedPeer -> Block -> STM () abandonBlock connPeer (piece, offset, _) = modifyTVar_ (pStatus piece) $ abandonPeer 0 where abandonPeer _ [] = error "Walked through piece status without hitting the wanted offset: abandonBlock" abandonPeer n (x:xs) | n == offset = case x of Active peers -> assert (connPeer `elem` peers) $ Active (filter (/=connPeer) peers):xs _ -> error "Tried to abort download of a block which wasn't being downloaded" | otherwise = x:abandonPeer (n+blockSize) xs -------------------------------------------------------------- -------------------------------------------------------------- -- | Register a block as downloaded and, if possible, validate the -- complete piece. registerBlock :: ActiveTorrent -> ConnectedPeer -> (Int,Int,Int) -> ByteString -> Transaction () registerBlock at connPeer (idx, offset, _) blockData = do blocks <- stm $ readTVar (cpPendingBlocks connPeer) case Map.lookup (idx,offset) blocks of Nothing -> onCommit (debug "Received unexpected block, ignored.") Just block -> do first <- stm $ claimCompletedBlock connPeer block stm $ writeTVar (cpPendingBlocks connPeer) (Map.delete (idx,offset) blocks) when first $ do let (piece,_,_) = block isComplete <- stm $ isCompletedPiece piece onCommit $ do writeBlock (atBackend at) idx offset blockData forkIO $ when isComplete $ validatePiece at idx offset validatePiece :: ActiveTorrent -> Int -> Int -> IO () validatePiece at idx _ = do pieceData <- readPiece (atBackend at) idx let pieceLen = fromIntegral (BS.length pieceData) if sha1 pieceData == pieceCheckSum torrent idx then commitPiece (atBackend at) idx >> atomically ( do setPiecemapBit (atPiecemap at) idx True modifyTVar_ (atPieces at) (IntMap.delete idx) modifyTVar_ (atDownloaded at) (+ pieceLen) modifyTVar_ (atLeft at) (subtract pieceLen) peers <- readTVar (atPeers at) flip mapM_ peers $ \peer -> do sendMessage peer (Have idx) recalculateInterest True idx at peer) else do atomically $ do pieces <- readTVar (atPieces at) writeTVar (atPieces at) (IntMap.delete idx pieces) debug $ "Piece failed validation: " ++ show idx where torrent = atTorrent at