module Network.Hadoop.Hdfs
( Hdfs(..)
, hdfsProtocol
, getConnection
, runHdfs
, runHdfs'
, hdfsInvoke
, CreateParent
, Recursive
, Overwrite
, getListing
, getListing'
, getFileInfo
, getContentSummary
, mkdirs
, delete
, rename
, setPermissions
) where
import Control.Applicative (Applicative(..), (<$>))
import Control.Exception (throw)
import Control.Monad (ap)
import Control.Monad.Catch (MonadMask(..), MonadThrow(..), MonadCatch(..))
import Control.Monad.IO.Class (MonadIO(..))
import Data.ByteString (ByteString)
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import qualified Data.Text.Encoding as T
import qualified Data.Vector as V
import Data.Word (Word32)
import qualified Data.Hadoop.Protobuf.ClientNameNode as P
import qualified Data.Hadoop.Protobuf.Hdfs as P
import Data.ProtocolBuffers
import Data.ProtocolBuffers.Orphans ()
import Data.Hadoop.Configuration
import Data.Hadoop.Types
import Network.Hadoop.Rpc
import qualified Network.Hadoop.Socket as S
newtype Hdfs a = Hdfs { unHdfs :: Connection -> IO a }
instance Functor Hdfs where
fmap f m = Hdfs $ \c -> fmap f (unHdfs m c)
instance Applicative Hdfs where
pure = return
(<*>) = ap
instance Monad Hdfs where
return x = Hdfs $ \_ -> return x
m >>= k = Hdfs $ \c -> unHdfs m c >>= \x -> unHdfs (k x) c
instance MonadIO Hdfs where
liftIO io = Hdfs $ const io
instance MonadThrow Hdfs where
throwM = liftIO . throwM
instance MonadCatch Hdfs where
catch m k = Hdfs $ \c -> unHdfs m c `catch` \e -> unHdfs (k e) c
instance MonadMask Hdfs where
mask a = Hdfs $ \e -> mask $ \u -> unHdfs (a $ q u) e
where q u (Hdfs b) = Hdfs (u . b)
uninterruptibleMask a =
Hdfs $ \e -> uninterruptibleMask $ \u -> unHdfs (a $ q u) e
where q u (Hdfs b) = Hdfs (u . b)
type CreateParent = Bool
type Recursive = Bool
type Overwrite = Bool
getConnection :: Hdfs Connection
getConnection = Hdfs return
hdfsProtocol :: Protocol
hdfsProtocol = Protocol "org.apache.hadoop.hdfs.protocol.ClientProtocol" 1
runHdfs :: Hdfs a -> IO a
runHdfs hdfs = do
config <- getHadoopConfig
runHdfs' config hdfs
runHdfs' :: HadoopConfig -> Hdfs a -> IO a
runHdfs' config@HadoopConfig{..} hdfs = S.runTcp hcProxy nameNode session
where
session socket = do
conn <- initConnectionV7 config hdfsProtocol socket
unHdfs hdfs conn
nameNode = case hcNameNodes of
[] -> throw (ConfigError "Could not find name nodes in Hadoop configuration")
(x:_) -> x
hdfsInvoke :: (Encode a, Decode b) => Text -> a -> Hdfs b
hdfsInvoke method arg = Hdfs $ \c -> invoke c method arg
getListing :: HdfsPath -> Hdfs (Maybe (V.Vector FileStatus))
getListing path = do
mDirList <- getPartialListing path ""
case mDirList of
Nothing -> return Nothing
Just dirList -> do
let p = partialListing dirList
if hasRemainingEntries dirList
then Just <$> loop [p] (lastFileName p)
else return (Just p)
where
partialListing :: P.DirectoryListing -> V.Vector FileStatus
partialListing = V.map fromProtoFileStatus
. V.fromList
. getField
. P.dlPartialListing
hasRemainingEntries :: P.DirectoryListing -> Bool
hasRemainingEntries = (/= 0) . getField . P.dlRemaingEntries
lastFileName :: V.Vector FileStatus -> ByteString
lastFileName v | V.null v = ""
| otherwise = fsPath . V.last $ v
loop :: [V.Vector FileStatus] -> ByteString -> Hdfs (V.Vector FileStatus)
loop ps startAfter = do
dirList <- fromMaybe emptyListing <$> getPartialListing path startAfter
let p = partialListing dirList
ps' = ps ++ [p]
if hasRemainingEntries dirList
then loop ps' (lastFileName p)
else return (V.concat ps')
emptyListing = P.DirectoryListing (putField []) (putField 0)
getListing' :: HdfsPath -> Hdfs (V.Vector FileStatus)
getListing' path = fromMaybe V.empty <$> getListing path
getPartialListing :: HdfsPath -> ByteString -> Hdfs (Maybe P.DirectoryListing)
getPartialListing path startAfter = getField . P.lsDirList <$>
hdfsInvoke "getListing" P.GetListingRequest
{ P.lsSrc = putField (T.decodeUtf8 path)
, P.lsStartAfter = putField startAfter
, P.lsNeedLocation = putField False
}
getFileInfo :: HdfsPath -> Hdfs (Maybe FileStatus)
getFileInfo path = fmap fromProtoFileStatus . getField . P.fiFileStatus <$>
hdfsInvoke "getFileInfo" P.GetFileInfoRequest
{ P.fiSrc = putField (T.decodeUtf8 path)
}
getContentSummary :: HdfsPath -> Hdfs ContentSummary
getContentSummary path = fromProtoContentSummary . getField . P.csSummary <$>
hdfsInvoke "getContentSummary" P.GetContentSummaryRequest
{ P.csPath = putField (T.decodeUtf8 path)
}
mkdirs :: CreateParent -> HdfsPath -> Hdfs Bool
mkdirs createParent path = getField . P.mdResult <$>
hdfsInvoke "mkdirs" P.MkdirsRequest
{ P.mdSrc = putField (T.decodeUtf8 path)
, P.mdMasked = putField (P.FilePermission (putField 0o755))
, P.mdCreateParent = putField createParent
}
delete :: Recursive -> HdfsPath -> Hdfs Bool
delete recursive path = getField . P.dlResult <$>
hdfsInvoke "delete" P.DeleteRequest
{ P.dlSrc = putField (T.decodeUtf8 path)
, P.dlRecursive = putField recursive
}
rename :: Overwrite -> HdfsPath -> HdfsPath -> Hdfs ()
rename overwrite src dst = ignore <$>
hdfsInvoke "rename2" P.Rename2Request
{ P.mvSrc = putField (T.decodeUtf8 src)
, P.mvDst = putField (T.decodeUtf8 dst)
, P.mvOverwrite = putField overwrite
}
where
ignore :: P.Rename2Response -> ()
ignore = const ()
setPermissions :: Word32 -> HdfsPath -> Hdfs ()
setPermissions mode path = ignore <$>
hdfsInvoke "setPermission" P.SetPermissionRequest
{ P.chmodPath = putField (T.decodeUtf8 path)
, P.chmodMode = putField $ P.FilePermission { fpPerm = putField mode }
}
where
ignore :: P.SetPermissionResponse -> ()
ignore = const ()
fromProtoContentSummary :: P.ContentSummary -> ContentSummary
fromProtoContentSummary p = ContentSummary
{ csLength = getField $ P.csLength p
, csFileCount = getField $ P.csFileCount p
, csDirectoryCount = getField $ P.csDirectoryCount p
, csQuota = getField $ P.csQuota p
, csSpaceConsumed = getField $ P.csSpaceConsumed p
, csSpaceQuota = getField $ P.csSpaceQuota p
}
fromProtoFileStatus :: P.FileStatus -> FileStatus
fromProtoFileStatus p = FileStatus
{ fsFileType = fromProtoFileType $ getField $ P.fsFileType p
, fsPath = getField $ P.fsPath p
, fsLength = getField $ P.fsLength p
, fsPermission = fromIntegral $ getField $ P.fpPerm $ getField $ P.fsPermission p
, fsOwner = getField $ P.fsOwner p
, fsGroup = getField $ P.fsGroup p
, fsModificationTime = getField $ P.fsModificationTime p
, fsAccessTime = getField $ P.fsAccessTime p
, fsSymLink = getField $ P.fsSymLink p
, fsBlockReplication = fromIntegral $ fromMaybe 0 $ getField $ P.fsBlockReplication p
, fsBlockSize = fromMaybe 0 $ getField $ P.fsBlockSize p
}
fromProtoFileType :: P.FileType -> FileType
fromProtoFileType p = case p of
P.IS_DIR -> Dir
P.IS_FILE -> File
P.IS_SYMLINK -> SymLink