{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} module Network.Hadoop.Hdfs ( Hdfs(..) , hdfsProtocol , getConnection , runHdfs , runHdfs' , hdfsInvoke , CreateParent , Recursive , Overwrite , getListing , getListing' , getListingRecursive , getFileInfo , getContentSummary , mkdirs , delete , rename , setPermissions ) where import Control.Applicative (Applicative(..), (<$>)) import Control.Concurrent.STM import Control.Exception (SomeException(..), throw) import Control.Monad (ap, when) import Control.Monad.Catch (MonadMask(..), MonadThrow(..), MonadCatch(..)) import Control.Monad.IO.Class (MonadIO(..)) import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as B import Data.Maybe (fromMaybe) import Data.Monoid ((<>)) import Data.Text (Text) import qualified Data.Text as T import qualified Data.Text.Encoding as T import qualified Data.Vector as V import Data.Word (Word32) import qualified Data.Hadoop.Protobuf.ClientNameNode as P import qualified Data.Hadoop.Protobuf.Hdfs as P import Data.ProtocolBuffers import Data.ProtocolBuffers.Orphans () import Data.Hadoop.Configuration import Data.Hadoop.HdfsPath import Data.Hadoop.Types import Network.Hadoop.Rpc import qualified Network.Hadoop.Socket as S ------------------------------------------------------------------------ newtype Hdfs a = Hdfs { unHdfs :: Connection -> IO a } instance Functor Hdfs where fmap f m = Hdfs $ \c -> fmap f (unHdfs m c) instance Applicative Hdfs where pure = return (<*>) = ap instance Monad Hdfs where return = Hdfs . const . return m >>= k = Hdfs $ \c -> unHdfs m c >>= \x -> unHdfs (k x) c instance MonadIO Hdfs where liftIO = Hdfs . const instance MonadThrow Hdfs where throwM = liftIO . throwM instance MonadCatch Hdfs where catch m k = Hdfs $ \c -> unHdfs m c `catch` \e -> unHdfs (k e) c instance MonadMask Hdfs where mask a = Hdfs $ \e -> mask $ \u -> unHdfs (a $ q u) e where q u (Hdfs b) = Hdfs (u . b) uninterruptibleMask a = Hdfs $ \e -> uninterruptibleMask $ \u -> unHdfs (a $ q u) e where q u (Hdfs b) = Hdfs (u . b) ------------------------------------------------------------------------ type CreateParent = Bool type Recursive = Bool type Overwrite = Bool ------------------------------------------------------------------------ getConnection :: Hdfs Connection getConnection = Hdfs return hdfsProtocol :: Protocol hdfsProtocol = Protocol "org.apache.hadoop.hdfs.protocol.ClientProtocol" 1 runHdfs :: Hdfs a -> IO a runHdfs hdfs = do config <- getHadoopConfig runHdfs' config hdfs runHdfs' :: HadoopConfig -> Hdfs a -> IO a runHdfs' config@HadoopConfig{..} hdfs = S.runTcp hcProxy nameNode session where session socket = do conn <- initConnectionV7 config hdfsProtocol socket unHdfs hdfs conn nameNode = case hcNameNodes of [] -> throw (ConfigError "Could not find name nodes in Hadoop configuration") (x:_) -> x hdfsInvoke :: (Encode a, Decode b) => Text -> a -> Hdfs b hdfsInvoke method arg = Hdfs $ \c -> invoke c method arg ------------------------------------------------------------------------ getListingRecursive :: HdfsPath -> Hdfs (TBQueue (Maybe (HdfsPath, Either SomeException (V.Vector FileStatus)))) getListingRecursive initialPath = do conn <- getConnection queue <- liftIO (newTBQueueIO 10) outstanding <- liftIO (newTVarIO 0) liftIO (getListingRecursive' conn outstanding queue initialPath) return queue getListingRecursive' :: Connection -> TVar Int -> TBQueue (Maybe (HdfsPath, Either SomeException (V.Vector FileStatus))) -> HdfsPath -> IO () getListingRecursive' conn outstanding queue rootPath = do getInitial rootPath where getInitial path = getPartial path B.empty getPartial path startAfter = do atomically $ modifyTVar' outstanding succ getPartialAsync conn path startAfter (onPartial path) onPartial path (Left err) = enqueueResult path (Left err) onPartial path (Right PartialListing{..}) = do when (lsRemaining /= 0) $ getPartial path (lastFileName lsFiles) V.mapM_ getInitial $ V.map (\x -> path x) $ dirs lsFiles enqueueResult path (Right lsFiles) enqueueResult path result = atomically $ do writeTBQueue queue $ Just (path, result) modifyTVar' outstanding pred n <- readTVar outstanding when (n == 0) (writeTBQueue queue Nothing) dirs :: V.Vector FileStatus -> V.Vector HdfsPath dirs = V.map fsPath . V.filter ((Dir ==) . fsFileType) getPartialAsync :: Connection -> HdfsPath -> HdfsPath -> (Either SomeException PartialListing -> IO ()) -> IO () getPartialAsync c path startAfter k = invokeAsync c "getListing" request k' where k' :: Either SomeException P.GetListingResponse -> IO () k' (Left err) = k (Left err) k' (Right proto) = k (fromProto proto) fromProto :: P.GetListingResponse -> Either SomeException PartialListing fromProto dl = case getField (P.lsDirList dl) of Nothing -> Left notExist Just x -> Right (fromProtoDirectoryListing x) notExist :: SomeException notExist = SomeException $ RemoteError ("Directory does not exist: " <> T.decodeUtf8 path) T.empty request = P.GetListingRequest { P.lsSrc = putField (T.decodeUtf8 path) , P.lsStartAfter = putField startAfter , P.lsNeedLocation = putField False } ------------------------------------------------------------------------ getListing :: HdfsPath -> Hdfs (Maybe (V.Vector FileStatus)) getListing path = do mDirList <- getPartialListing path "" case mDirList of Nothing -> return Nothing Just (PartialListing 0 fs) -> return (Just fs) Just (PartialListing _ fs) -> Just <$> loop [fs] (lastFileName fs) where loop :: [V.Vector FileStatus] -> ByteString -> Hdfs (V.Vector FileStatus) loop ps startAfter = do PartialListing{..} <- fromMaybe (PartialListing 0 V.empty) <$> getPartialListing path startAfter let ps' = ps ++ [lsFiles] if lsRemaining == 0 then return (V.concat ps') else loop ps' (lastFileName lsFiles) getListing' :: HdfsPath -> Hdfs (V.Vector FileStatus) getListing' path = fromMaybe V.empty <$> getListing path lastFileName :: V.Vector FileStatus -> ByteString lastFileName v | V.null v = "" | otherwise = fsPath (V.last v) ------------------------------------------------------------------------ getPartialListing :: HdfsPath -> HdfsPath -> Hdfs (Maybe PartialListing) getPartialListing path startAfter = fmap fromProtoDirectoryListing . getField . P.lsDirList <$> hdfsInvoke "getListing" P.GetListingRequest { P.lsSrc = putField (T.decodeUtf8 path) , P.lsStartAfter = putField startAfter , P.lsNeedLocation = putField False } getFileInfo :: HdfsPath -> Hdfs (Maybe FileStatus) getFileInfo path = fmap fromProtoFileStatus . getField . P.fiFileStatus <$> hdfsInvoke "getFileInfo" P.GetFileInfoRequest { P.fiSrc = putField (T.decodeUtf8 path) } getContentSummary :: HdfsPath -> Hdfs ContentSummary getContentSummary path = fromProtoContentSummary . getField . P.csSummary <$> hdfsInvoke "getContentSummary" P.GetContentSummaryRequest { P.csPath = putField (T.decodeUtf8 path) } mkdirs :: CreateParent -> HdfsPath -> Hdfs Bool mkdirs createParent path = getField . P.mdResult <$> hdfsInvoke "mkdirs" P.MkdirsRequest { P.mdSrc = putField (T.decodeUtf8 path) , P.mdMasked = putField (P.FilePermission (putField 0o755)) , P.mdCreateParent = putField createParent } delete :: Recursive -> HdfsPath -> Hdfs Bool delete recursive path = getField . P.dlResult <$> hdfsInvoke "delete" P.DeleteRequest { P.dlSrc = putField (T.decodeUtf8 path) , P.dlRecursive = putField recursive } rename :: Overwrite -> HdfsPath -> HdfsPath -> Hdfs () rename overwrite src dst = ignore <$> hdfsInvoke "rename2" P.Rename2Request { P.mvSrc = putField (T.decodeUtf8 src) , P.mvDst = putField (T.decodeUtf8 dst) , P.mvOverwrite = putField overwrite } where ignore :: P.Rename2Response -> () ignore = const () setPermissions :: Word32 -> HdfsPath -> Hdfs () setPermissions mode path = ignore <$> hdfsInvoke "setPermission" P.SetPermissionRequest { P.chmodPath = putField (T.decodeUtf8 path) , P.chmodMode = putField $ P.FilePermission { fpPerm = putField mode } } where ignore :: P.SetPermissionResponse -> () ignore = const () ------------------------------------------------------------------------ -- Awesome Protobuf Mapping (╯°□°)╯︵ ┻━┻ fromProtoContentSummary :: P.ContentSummary -> ContentSummary fromProtoContentSummary p = ContentSummary { csLength = getField $ P.csLength p , csFileCount = getField $ P.csFileCount p , csDirectoryCount = getField $ P.csDirectoryCount p , csQuota = getField $ P.csQuota p , csSpaceConsumed = getField $ P.csSpaceConsumed p , csSpaceQuota = getField $ P.csSpaceQuota p } fromProtoDirectoryListing :: P.DirectoryListing -> PartialListing fromProtoDirectoryListing p = PartialListing { lsRemaining = fromIntegral . getField $ P.dlRemaingEntries p , lsFiles = V.map fromProtoFileStatus . V.fromList . getField $ P.dlPartialListing p } fromProtoFileStatus :: P.FileStatus -> FileStatus fromProtoFileStatus p = FileStatus { fsFileType = fromProtoFileType . getField $ P.fsFileType p , fsPath = getField $ P.fsPath p , fsLength = getField $ P.fsLength p , fsPermission = fromIntegral . getField . P.fpPerm . getField $ P.fsPermission p , fsOwner = getField $ P.fsOwner p , fsGroup = getField $ P.fsGroup p , fsModificationTime = getField $ P.fsModificationTime p , fsAccessTime = getField $ P.fsAccessTime p , fsSymLink = getField $ P.fsSymLink p , fsBlockReplication = fromIntegral . fromMaybe 0 . getField $ P.fsBlockReplication p , fsBlockSize = fromMaybe 0 . getField $ P.fsBlockSize p } fromProtoFileType :: P.FileType -> FileType fromProtoFileType p = case p of P.IS_DIR -> Dir P.IS_FILE -> File P.IS_SYMLINK -> SymLink