{- git over XMPP - - Copyright 2012 Joey Hess - - Licensed under the GNU GPL version 3 or higher. -} {-# LANGUAGE CPP #-} module Assistant.XMPP.Git where import Assistant.Common import Assistant.NetMessager import Assistant.Types.NetMessager import Assistant.XMPP import Assistant.XMPP.Buddies import Assistant.DaemonStatus import Assistant.Alert import Assistant.MakeRemote import Assistant.Sync import qualified Command.Sync import qualified Annex.Branch import Annex.UUID import Logs.UUID import Annex.TaggedPush import Annex.CatFile import Config import Git import qualified Git.Branch import Config.Files import qualified Types.Remote as Remote import qualified Remote as Remote import Remote.List import Utility.FileMode import Utility.Shell import Utility.Env import Network.Protocol.XMPP import qualified Data.Text as T import System.Posix.Types import System.Process (std_in, std_out, std_err) import Control.Concurrent import System.Timeout import qualified Data.ByteString as B import qualified Data.Map as M {- Largest chunk of data to send in a single XMPP message. -} chunkSize :: Int chunkSize = 4096 {- How long to wait for an expected message before assuming the other side - has gone away and canceling a push. - - This needs to be long enough to allow a message of up to 2+ times - chunkSize to propigate up to a XMPP server, perhaps across to another - server, and back down to us. On the other hand, other XMPP pushes can be - delayed for running until the timeout is reached, so it should not be - excessive. -} xmppTimeout :: Int xmppTimeout = 120000000 -- 120 seconds finishXMPPPairing :: JID -> UUID -> Assistant () finishXMPPPairing jid u = void $ alertWhile alert $ makeXMPPGitRemote buddy (baseJID jid) u where buddy = T.unpack $ buddyName jid alert = pairRequestAcknowledgedAlert buddy Nothing gitXMPPLocation :: JID -> String gitXMPPLocation jid = "xmpp::" ++ T.unpack (formatJID $ baseJID jid) makeXMPPGitRemote :: String -> JID -> UUID -> Assistant Bool makeXMPPGitRemote buddyname jid u = do remote <- liftAnnex $ addRemote $ makeGitRemote buddyname $ gitXMPPLocation jid liftAnnex $ storeUUID (remoteConfig (Remote.repo remote) "uuid") u liftAnnex $ void remoteListRefresh remote' <- liftAnnex $ fromMaybe (error "failed to add remote") <$> Remote.byName (Just buddyname) syncRemote remote' return True {- Pushes over XMPP, communicating with a specific client. - Runs an arbitrary IO action to push, which should run git-push with - an xmpp:: url. - - To handle xmpp:: urls, git push will run git-remote-xmpp, which is - injected into its PATH, and in turn runs git-annex xmppgit. The - dataflow them becomes: - - git push <--> git-annex xmppgit <--> xmppPush <-------> xmpp - | - git receive-pack <--> xmppReceivePack <---------------> xmpp - - The pipe between git-annex xmppgit and us is set up and communicated - using two environment variables, relayIn and relayOut, that are set - to the file descriptors to use. Another, relayControl, is used to - propigate the exit status of git receive-pack. - - We listen at the other end of the pipe and relay to and from XMPP. -} xmppPush :: ClientID -> (Git.Repo -> IO Bool) -> Assistant Bool xmppPush cid gitpush = do u <- liftAnnex getUUID sendNetMessage $ Pushing cid (StartingPush u) (Fd inf, writepush) <- liftIO createPipe (readpush, Fd outf) <- liftIO createPipe (Fd controlf, writecontrol) <- liftIO createPipe tmpdir <- gettmpdir installwrapper tmpdir env <- liftIO getEnvironment path <- liftIO getSearchPath let myenv = M.fromList [ ("PATH", intercalate [searchPathSeparator] $ tmpdir:path) , (relayIn, show inf) , (relayOut, show outf) , (relayControl, show controlf) ] `M.union` M.fromList env inh <- liftIO $ fdToHandle readpush outh <- liftIO $ fdToHandle writepush controlh <- liftIO $ fdToHandle writecontrol t1 <- forkIO <~> toxmpp 0 inh t2 <- forkIO <~> fromxmpp outh controlh {- This can take a long time to run, so avoid running it in the - Annex monad. Also, override environment. -} g <- liftAnnex gitRepo r <- liftIO $ gitpush $ g { gitEnv = Just $ M.toList myenv } liftIO $ do mapM_ killThread [t1, t2] mapM_ hClose [inh, outh, controlh] mapM_ closeFd [Fd inf, Fd outf, Fd controlf] return r where toxmpp seqnum inh = do b <- liftIO $ B.hGetSome inh chunkSize if B.null b then liftIO $ killThread =<< myThreadId else do let seqnum' = succ seqnum sendNetMessage $ Pushing cid $ SendPackOutput seqnum' b toxmpp seqnum' inh fromxmpp outh controlh = withPushMessagesInSequence cid SendPack handle where handle (Just (Pushing _ (ReceivePackOutput _ b))) = liftIO $ writeChunk outh b handle (Just (Pushing _ (ReceivePackDone exitcode))) = liftIO $ do hPrint controlh exitcode hFlush controlh handle (Just _) = noop handle Nothing = do debug ["timeout waiting for git receive-pack output via XMPP"] -- Send a synthetic exit code to git-annex -- xmppgit, which will exit and cause git push -- to die. liftIO $ do hPrint controlh (ExitFailure 1) hFlush controlh killThread =<< myThreadId installwrapper tmpdir = liftIO $ do createDirectoryIfMissing True tmpdir let wrapper = tmpdir "git-remote-xmpp" program <- readProgramFile writeFile wrapper $ unlines [ shebang_local , "exec " ++ program ++ " xmppgit" ] modifyFileMode wrapper $ addModes executeModes {- Use GIT_ANNEX_TMP_DIR if set, since that may be a better temp - dir (ie, not on a crippled filesystem where we can't make - the wrapper executable). -} gettmpdir = do v <- liftIO $ getEnv "GIT_ANNEX_TMP_DIR" case v of Nothing -> do tmp <- liftAnnex $ fromRepo gitAnnexTmpDir return $ tmp "xmppgit" Just d -> return $ d "xmppgit" type EnvVar = String envVar :: String -> EnvVar envVar s = "GIT_ANNEX_XMPPGIT_" ++ s relayIn :: EnvVar relayIn = envVar "IN" relayOut :: EnvVar relayOut = envVar "OUT" relayControl :: EnvVar relayControl = envVar "CONTROL" relayHandle :: EnvVar -> IO Handle relayHandle var = do v <- getEnv var case readish =<< v of Nothing -> error $ var ++ " not set" Just n -> fdToHandle $ Fd n {- Called by git-annex xmppgit. - - git-push is talking to us on stdin - we're talking to git-push on stdout - git-receive-pack is talking to us on relayIn (via XMPP) - we're talking to git-receive-pack on relayOut (via XMPP) - git-receive-pack's exit code will be passed to us on relayControl -} xmppGitRelay :: IO () xmppGitRelay = do flip relay stdout =<< relayHandle relayIn relay stdin =<< relayHandle relayOut code <- hGetLine =<< relayHandle relayControl exitWith $ fromMaybe (ExitFailure 1) $ readish code where {- Is it possible to set up pipes and not need to copy the data - ourselves? See splice(2) -} relay fromh toh = void $ forkIO $ forever $ do b <- B.hGetSome fromh chunkSize when (B.null b) $ do hClose fromh hClose toh killThread =<< myThreadId writeChunk toh b {- Relays git receive-pack stdin and stdout via XMPP, as well as propigating - its exit status to XMPP. -} xmppReceivePack :: ClientID -> Assistant Bool xmppReceivePack cid = do repodir <- liftAnnex $ fromRepo repoPath let p = (proc "git" ["receive-pack", repodir]) { std_in = CreatePipe , std_out = CreatePipe , std_err = Inherit } (Just inh, Just outh, _, pid) <- liftIO $ createProcess p readertid <- forkIO <~> relayfromxmpp inh relaytoxmpp 0 outh code <- liftIO $ waitForProcess pid void $ sendNetMessage $ Pushing cid $ ReceivePackDone code liftIO $ do killThread readertid hClose inh hClose outh return $ code == ExitSuccess where relaytoxmpp seqnum outh = do b <- liftIO $ B.hGetSome outh chunkSize -- empty is EOF, so exit unless (B.null b) $ do let seqnum' = succ seqnum sendNetMessage $ Pushing cid $ ReceivePackOutput seqnum' b relaytoxmpp seqnum' outh relayfromxmpp inh = withPushMessagesInSequence cid ReceivePack handle where handle (Just (Pushing _ (SendPackOutput _ b))) = liftIO $ writeChunk inh b handle (Just _) = noop handle Nothing = do debug ["timeout waiting for git send-pack output via XMPP"] -- closing the handle will make git receive-pack exit liftIO $ do hClose inh killThread =<< myThreadId xmppRemotes :: ClientID -> UUID -> Assistant [Remote] xmppRemotes cid theiruuid = case baseJID <$> parseJID cid of Nothing -> return [] Just jid -> do let loc = gitXMPPLocation jid um <- liftAnnex uuidMap filter (matching loc . Remote.repo) . filter (knownuuid um) . syncGitRemotes <$> getDaemonStatus where matching loc r = repoIsUrl r && repoLocation r == loc knownuuid um r = Remote.uuid r == theiruuid || M.member theiruuid um {- Returns the ClientID that it pushed to. -} runPush :: (Remote -> Assistant ()) -> NetMessage -> Assistant (Maybe ClientID) runPush checkcloudrepos (Pushing cid (PushRequest theiruuid)) = go =<< liftAnnex (inRepo Git.Branch.current) where go Nothing = return Nothing go (Just branch) = do rs <- xmppRemotes cid theiruuid liftAnnex $ Annex.Branch.commit "update" (g, u) <- liftAnnex $ (,) <$> gitRepo <*> getUUID liftIO $ Command.Sync.updateBranch (Command.Sync.syncBranch branch) g selfjid <- ((T.unpack <$>) . xmppClientID) <$> getDaemonStatus if null rs then return Nothing else do forM_ rs $ \r -> do void $ alertWhile (syncAlert [r]) $ xmppPush cid (taggedPush u selfjid branch r) checkcloudrepos r return $ Just cid runPush checkcloudrepos (Pushing cid (StartingPush theiruuid)) = do rs <- xmppRemotes cid theiruuid if null rs then return Nothing else do void $ alertWhile (syncAlert rs) $ xmppReceivePack cid mapM_ checkcloudrepos rs return $ Just cid runPush _ _ = return Nothing {- Check if any of the shas that can be pushed are ones we do not - have. - - (Older clients send no shas, so when there are none, always - request a push.) -} handlePushNotice :: NetMessage -> Assistant () handlePushNotice (Pushing cid (CanPush theiruuid shas)) = unlessM (null <$> xmppRemotes cid theiruuid) $ if null shas then go else ifM (haveall shas) ( debug ["ignoring CanPush with known shas"] , go ) where go = do u <- liftAnnex getUUID sendNetMessage $ Pushing cid (PushRequest u) haveall l = liftAnnex $ not <$> anyM donthave l donthave sha = isNothing <$> catObjectDetails sha handlePushNotice _ = noop writeChunk :: Handle -> B.ByteString -> IO () writeChunk h b = do B.hPut h b hFlush h {- Gets NetMessages for a PushSide, ensures they are in order, - and runs an action to handle each in turn. The action will be passed - Nothing on timeout. - - Does not currently reorder messages, but does ensure that any - duplicate messages, or messages not in the sequence, are discarded. -} withPushMessagesInSequence :: ClientID -> PushSide -> (Maybe NetMessage -> Assistant ()) -> Assistant () withPushMessagesInSequence cid side a = loop 0 where loop seqnum = do m <- timeout xmppTimeout <~> waitInbox cid side let go s = a m >> loop s let next = seqnum + 1 case extractSequence =<< m of Just seqnum' | seqnum' == next -> go next | seqnum' == 0 -> go seqnum | seqnum' == seqnum -> do debug ["ignoring duplicate sequence number", show seqnum] loop seqnum | otherwise -> do debug ["ignoring out of order sequence number", show seqnum', "expected", show next] loop seqnum Nothing -> go seqnum extractSequence :: NetMessage -> Maybe Int extractSequence (Pushing _ (ReceivePackOutput seqnum _)) = Just seqnum extractSequence (Pushing _ (SendPackOutput seqnum _)) = Just seqnum extractSequence _ = Nothing