{- P2P protocol over HTTP, client - - https://git-annex.branchable.com/design/p2p_protocol_over_http/ - - Copyright 2024 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} {-# LANGUAGE BangPatterns #-} {-# LANGUAGE DataKinds, TypeApplications #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE CPP #-} module P2P.Http.Client ( module P2P.Http.Client, module P2P.Http.Types, Validity(..), ) where import Types import P2P.Http.Types import P2P.Protocol hiding (Offset, Bypass, auth, FileSize) import Utility.Metered import Utility.FileSize import Types.NumCopies import Types.Remote import Annex.Common import qualified Git import qualified Annex import Annex.UUID import Annex.Url import P2P.Http import P2P.Http.Url import Annex.Concurrent import Utility.Url (BasicAuth(..)) import Utility.HumanTime import Utility.STM import qualified Utility.FileIO as F import qualified Git.Credential as Git import Servant hiding (BasicAuthData(..)) import Servant.Client.Streaming import qualified Servant.Types.SourceT as S import Network.HTTP.Types.Status import Network.HTTP.Client import qualified Data.ByteString as B import qualified Data.ByteString.Lazy.Internal as LI import qualified Data.Map as M import Control.Concurrent.Async import Control.Concurrent import System.IO.Unsafe import Data.Time.Clock.POSIX import qualified Data.ByteString.Lazy as L type ClientAction a = ClientEnv -> ProtocolVersion -> B64UUID ServerSide -> B64UUID ClientSide -> [B64UUID Bypass] -> Maybe Auth -> Annex (Either ClientError a) p2pHttpClient :: Remote -> (String -> Annex a) -> ClientAction a -> Annex a p2pHttpClient rmt fallback clientaction = p2pHttpClientVersions (const True) rmt fallback clientaction >>= \case Just res -> return res Nothing -> fallback "git-annex HTTP API server is missing an endpoint" p2pHttpClientVersions :: (ProtocolVersion -> Bool) -> Remote -> (String -> Annex a) -> ClientAction a -> Annex (Maybe a) p2pHttpClientVersions allowedversion rmt fallback clientaction = do rmtrepo <- getRepo rmt p2pHttpClientVersions' allowedversion rmt rmtrepo fallback clientaction p2pHttpClientVersions' :: (ProtocolVersion -> Bool) -> Remote -> Git.Repo -> (String -> Annex a) -> ClientAction a -> Annex (Maybe a) p2pHttpClientVersions' allowedversion rmt rmtrepo fallback clientaction = case p2pHttpBaseUrl <$> remoteAnnexP2PHttpUrl (gitconfig rmt) of Nothing -> error "internal" Just baseurl -> do mgr <- httpManager <$> getUrlOptions Nothing let clientenv = mkClientEnv mgr baseurl ccv <- Annex.getRead Annex.gitcredentialcache Git.CredentialCache cc <- liftIO $ atomically $ readTMVar ccv case M.lookup (Git.CredentialBaseURL credentialbaseurl) cc of Nothing -> go clientenv Nothing False Nothing versions Just cred -> go clientenv (Just cred) True (credauth cred) versions where versions = filter allowedversion allProtocolVersions go clientenv mcred credcached mauth (v:vs) = do myuuid <- getUUID res <- catchclienterror $ clientaction clientenv v (B64UUID (uuid rmt)) (B64UUID myuuid) [] mauth case res of Right resp -> do unless credcached $ cachecred mcred return (Just resp) Left (FailureResponse _ resp) | statusCode (responseStatusCode resp) == 404 && not (null vs) -> go clientenv mcred credcached mauth vs | statusCode (responseStatusCode resp) == 401 -> case mcred of Nothing -> authrequired clientenv (v:vs) Just cred -> do inRepo $ Git.rejectUrlCredential cred Just <$> fallback (showstatuscode resp) | otherwise -> Just <$> fallback (showstatuscode resp) Left (ConnectionError ex) -> case fromException ex of Just (HttpExceptionRequest _ (ConnectionFailure err)) -> Just <$> fallback ("unable to connect to HTTP server: " ++ show err) _ -> Just <$> fallback (show ex) Left clienterror -> Just <$> fallback ("git-annex HTTP API server returned an unexpected response: " ++ show clienterror) go _ _ _ _ [] = return Nothing catchclienterror a = a `catch` \(ex :: ClientError) -> pure (Left ex) authrequired clientenv vs = do cred <- prompt $ inRepo $ Git.getUrlCredential credentialbaseurl go clientenv (Just cred) False (credauth cred) vs showstatuscode resp = show (statusCode (responseStatusCode resp)) ++ " " ++ decodeBS (statusMessage (responseStatusCode resp)) credentialbaseurl = case remoteAnnexP2PHttpUrl (gitconfig rmt) of Just p2phttpurl | isP2PHttpSameHost p2phttpurl rmtrepo -> Git.repoLocation rmtrepo | otherwise -> p2pHttpUrlString p2phttpurl Nothing -> error "internal" credauth cred = do ba <- Git.credentialBasicAuth cred return $ Auth (encodeBS (basicAuthUser ba)) (encodeBS (basicAuthPassword ba)) cachecred mcred = case mcred of Just cred -> do inRepo $ Git.approveUrlCredential cred ccv <- Annex.getRead Annex.gitcredentialcache liftIO $ atomically $ do Git.CredentialCache cc <- takeTMVar ccv putTMVar ccv $ Git.CredentialCache $ M.insert (Git.CredentialBaseURL credentialbaseurl) cred cc Nothing -> noop clientGet :: Key -> AssociatedFile -> (L.ByteString -> IO BytesProcessed) -- ^ Must consume the entire ByteString before returning its -- total size. -> Maybe FileSize -- ^ Size of existing file, when resuming. -> ClientAction Validity clientGet k af consumer startsz clientenv (ProtocolVersion ver) su cu bypass auth = liftIO $ do let offset = fmap (Offset . fromIntegral) startsz withClientM (cli (B64Key k) cu bypass baf offset auth) clientenv $ \case Left err -> return (Left err) Right respheaders -> do b <- S.unSourceT (getResponse respheaders) gather BytesProcessed len <- consumer b let DataLength dl = case lookupResponseHeader @DataLengthHeader' respheaders of Header hdr -> hdr _ -> error "missing data length header" return $ Right $ if dl == len then Valid else Invalid where cli =case ver of 4 -> v4 su V4 3 -> v3 su V3 2 -> v2 su V2 1 -> v1 su V1 0 -> v0 su V0 _ -> error "unsupported protocol version" v4 :<|> v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI gather = unsafeInterleaveIO . gather' gather' S.Stop = return LI.Empty gather' (S.Error err) = giveup err gather' (S.Skip s) = gather' s gather' (S.Effect ms) = ms >>= gather' gather' (S.Yield v s) = LI.Chunk v <$> unsafeInterleaveIO (gather' s) baf = associatedFileToB64FilePath af clientCheckPresent :: Key -> ClientAction Bool clientCheckPresent key clientenv (ProtocolVersion ver) su cu bypass auth = liftIO $ withClientM (cli su (B64Key key) cu bypass auth) clientenv $ \case Left err -> return (Left err) Right (CheckPresentResult res) -> return (Right res) where cli = case ver of 4 -> flip v4 V4 3 -> flip v3 V3 2 -> flip v2 V2 1 -> flip v1 V1 0 -> flip v0 V0 _ -> error "unsupported protocol version" _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> v4 :<|> v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI -- Similar to P2P.Protocol.remove. clientRemoveWithProof :: Maybe SafeDropProof -> Key -> Annex RemoveResultPlus -> Remote -> Annex RemoveResultPlus clientRemoveWithProof proof k unabletoremove remote = case safeDropProofEndTime =<< proof of Nothing -> removeanytime Just endtime -> removebefore endtime where removeanytime = p2pHttpClient remote giveup (clientRemove k) removebefore endtime = p2pHttpClientVersions useversion remote giveup clientGetTimestamp >>= \case Just (GetTimestampResult (Timestamp remotetime)) -> removebefore' endtime remotetime -- Peer is too old to support REMOVE-BEFORE. Nothing -> removeanytime removebefore' endtime remotetime = canRemoveBefore endtime remotetime (liftIO getPOSIXTime) >>= \case Just remoteendtime -> p2pHttpClient remote giveup $ clientRemoveBefore k (Timestamp remoteendtime) Nothing -> unabletoremove useversion v = v >= ProtocolVersion 3 clientRemove :: Key -> ClientAction RemoveResultPlus clientRemove k clientenv (ProtocolVersion ver) su cu bypass auth = liftIO $ withClientM cli clientenv return where bk = B64Key k cli = case ver of 4 -> v4 su V4 bk cu bypass auth 3 -> v3 su V3 bk cu bypass auth 2 -> v2 su V2 bk cu bypass auth 1 -> plus <$> v1 su V1 bk cu bypass auth 0 -> plus <$> v0 su V0 bk cu bypass auth _ -> error "unsupported protocol version" _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> v4 :<|> v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI clientRemoveBefore :: Key -> Timestamp -> ClientAction RemoveResultPlus clientRemoveBefore k ts clientenv (ProtocolVersion ver) su cu bypass auth = liftIO $ withClientM (cli su (B64Key k) cu bypass ts auth) clientenv return where cli = case ver of 4 -> flip v4 V4 3 -> flip v3 V3 _ -> error "unsupported protocol version" _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|>_ :<|> _ :<|> _ :<|> _ :<|> v4 :<|> v3 :<|> _ = client p2pHttpAPI clientGetTimestamp :: ClientAction GetTimestampResult clientGetTimestamp clientenv (ProtocolVersion ver) su cu bypass auth = liftIO $ withClientM (cli su cu bypass auth) clientenv return where cli = case ver of 4 -> flip v4 V4 3 -> flip v3 V3 _ -> error "unsupported protocol version" _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> v4 :<|> v3 :<|> _ = client p2pHttpAPI clientPut :: MeterUpdate -> Key -> Maybe Offset -> AssociatedFile -> OsPath -> FileSize -> Annex Bool -- ^ Called after sending the file to check if it's valid. -> Bool -- ^ Set data-present parameter and do not actually send data -- (v4+ only) -> ClientAction PutResultPlus clientPut meterupdate k moffset af contentfile contentfilesize validitycheck datapresent clientenv (ProtocolVersion ver) su cu bypass auth | datapresent = liftIO $ withClientM (cli mempty) clientenv return | otherwise = do checkv <- liftIO newEmptyTMVarIO checkresultv <- liftIO newEmptyTMVarIO let checker = do liftIO $ atomically $ takeTMVar checkv validitycheck >>= liftIO . atomically . putTMVar checkresultv checkerthread <- liftIO . async =<< forkState checker v <- liftIO $ F.withBinaryFile contentfile ReadMode $ \h -> do when (offset /= 0) $ hSeek h AbsoluteSeek offset withClientM (cli (stream h checkv checkresultv)) clientenv return case v of Left err -> do void $ liftIO $ atomically $ tryPutTMVar checkv () join $ liftIO (wait checkerthread) return (Left err) Right res -> do join $ liftIO (wait checkerthread) return (Right res) where stream h checkv checkresultv = S.SourceT $ \a -> do bl <- hGetContentsMetered h meterupdate v <- newMVar (0, filter (not . B.null) (L.toChunks bl)) a (go v) where go v = S.fromActionStep B.null $ modifyMVar v $ \case (n, (b:[])) -> do let !n' = n + B.length b ifM (checkvalid n') ( return ((n', []), b) -- The key's content is invalid, but -- the amount of data is the same as -- the DataLengthHeader indicates. -- Truncate the stream by one byte to -- indicate to the server that it's -- not valid. , return ( (n' - 1, []) , B.take (B.length b - 1) b ) ) (n, []) -> do void $ checkvalid n return ((n, []), mempty) (n, (b:bs)) -> let !n' = n + B.length b in return ((n', bs), b) checkvalid n = do void $ liftIO $ atomically $ tryPutTMVar checkv () valid <- liftIO $ atomically $ readTMVar checkresultv if not valid then return (n /= fromIntegral nlen) else return True baf = case af of AssociatedFile Nothing -> Nothing AssociatedFile (Just f) -> Just (B64FilePath f) len = DataLength nlen nlen = contentfilesize - offset offset = case moffset of Nothing -> 0 Just (Offset o) -> fromIntegral o bk = B64Key k cli src = case ver of 4 -> v4 su V4 (if datapresent then Just True else Nothing) len bk cu bypass baf moffset src auth 3 -> v3 su V3 len bk cu bypass baf moffset src auth 2 -> v2 su V2 len bk cu bypass baf moffset src auth 1 -> plus <$> v1 su V1 len bk cu bypass baf moffset src auth 0 -> plus <$> v0 su V0 len bk cu bypass baf moffset src auth _ -> error "unsupported protocol version" _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> v4 :<|> v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI clientPutOffset :: Key -> ClientAction PutOffsetResultPlus clientPutOffset k clientenv (ProtocolVersion ver) su cu bypass auth | ver == 0 = return (Right (PutOffsetResultPlus (Offset 0))) | otherwise = liftIO $ withClientM cli clientenv return where bk = B64Key k cli = case ver of 4 -> v4 su V4 bk cu bypass auth 3 -> v3 su V3 bk cu bypass auth 2 -> v2 su V2 bk cu bypass auth 1 -> plus <$> v1 su V1 bk cu bypass auth _ -> error "unsupported protocol version" _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> v4 :<|> v3 :<|> v2 :<|> v1 :<|> _ = client p2pHttpAPI clientLockContent :: Key -> ClientAction LockResult clientLockContent k clientenv (ProtocolVersion ver) su cu bypass auth = liftIO $ withClientM (cli (B64Key k) cu bypass auth) clientenv return where cli = case ver of 4 -> v4 su V4 3 -> v3 su V3 2 -> v2 su V2 1 -> v1 su V1 0 -> v0 su V0 _ -> error "unsupported protocol version" _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> v4 :<|> v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI clientKeepLocked :: LockID -> UUID -> a -> (VerifiedCopy -> Annex a) -- ^ Callback is run only after successfully connecting to the http -- server. The lock will remain held until the callback returns, -- and then will be dropped. -> ClientAction a clientKeepLocked lckid remoteuuid unablelock callback clientenv (ProtocolVersion ver) su cu bypass auth = do readyv <- liftIO newEmptyTMVarIO keeplocked <- liftIO newEmptyTMVarIO let cli' = cli lckid (Just cu) bypass auth (Just connectionKeepAlive) (Just keepAlive) (S.fromStepT (unlocksender readyv keeplocked)) starttime <- liftIO getPOSIXTime tid <- liftIO $ async $ withClientM cli' clientenv $ \case Right (LockResult _ _) -> atomically $ writeTMVar readyv (Right False) Left err -> atomically $ writeTMVar readyv (Left err) let releaselock = liftIO $ do atomically $ putTMVar keeplocked False wait tid liftIO (atomically $ takeTMVar readyv) >>= \case Left err -> do liftIO $ wait tid return (Left err) Right False -> do liftIO $ wait tid return (Right unablelock) Right True -> do let checker = return $ Left $ starttime + retentionduration Right <$> withVerifiedCopy LockedCopy remoteuuid checker callback `finally` releaselock where retentionduration = fromIntegral $ durationSeconds p2pDefaultLockContentRetentionDuration unlocksender readyv keeplocked = S.Yield (UnlockRequest False) $ S.Effect $ do return $ S.Effect $ do liftIO $ atomically $ void $ tryPutTMVar readyv (Right True) stilllocked <- liftIO $ atomically $ takeTMVar keeplocked return $ if stilllocked then unlocksender readyv keeplocked else S.Yield (UnlockRequest True) S.Stop cli = case ver of 4 -> v4 su V4 3 -> v3 su V3 2 -> v2 su V2 1 -> v1 su V1 0 -> v0 su V0 _ -> error "unsupported protocol version" _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> _ :<|> v4 :<|> v3 :<|> v2 :<|> v1 :<|> v0 :<|> _ = client p2pHttpAPI