{-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveGeneric #-} {-# OPTIONS_GHC -fcontext-stack=37 #-} -- | This module contains protocol buffers that are used to transfer data -- to and from the datanode, as well as between datanodes. module Data.Hadoop.Protobuf.DataTransfer where import Data.ByteString (ByteString) import Data.Int (Int32, Int64) import Data.ProtocolBuffers import Data.ProtocolBuffers.Orphans () import Data.Text (Text) import Data.Word (Word32, Word64) import GHC.Generics (Generic) import Data.Hadoop.Protobuf.Hdfs import Data.Hadoop.Protobuf.Security ------------------------------------------------------------------------ data DataTransferEncryptorStatus = DTES_SUCCESS | DTES_ERROR_UNKNOWN_KEY | DTES_ERROR deriving (Generic, Show, Eq) instance Enum DataTransferEncryptorStatus where toEnum n = case n of 0 -> DTES_SUCCESS 1 -> DTES_ERROR_UNKNOWN_KEY 2 -> DTES_ERROR _ -> error $ "DataTransferEncryptorStatus.toEnum: invalid value <" ++ show n ++ ">" fromEnum e = case e of DTES_SUCCESS -> 0 DTES_ERROR_UNKNOWN_KEY -> 1 DTES_ERROR -> 2 instance Encode DataTransferEncryptorStatus instance Decode DataTransferEncryptorStatus data DataTransferEncryptorMessage = DataTransferEncryptorMessage { dtesStatus :: Required 1 (Enumeration DataTransferEncryptorStatus) , dtesPayload :: Optional 2 (Value ByteString) , dtesData :: Optional 3 (Value Text) } deriving (Generic, Show) instance Encode DataTransferEncryptorMessage instance Decode DataTransferEncryptorMessage data BaseHeader = BaseHeader { bhBlock :: Required 1 (Message ExtendedBlock) , bhToken :: Optional 2 (Message Token) } deriving (Generic, Show) instance Encode BaseHeader instance Decode BaseHeader data ClientOperationHeader = ClientOperationHeader { cohBaseHeader :: Required 1 (Message BaseHeader) , cohClientName :: Required 2 (Value Text) } deriving (Generic, Show) instance Encode ClientOperationHeader instance Decode ClientOperationHeader data CachingStrategy = CachingStrategy { csDropBehind :: Optional 1 (Value Bool) , csReadahead :: Optional 2 (Value Int64) } deriving (Generic, Show) instance Encode CachingStrategy instance Decode CachingStrategy data OpReadBlock = OpReadBlock { orbHeader :: Required 1 (Message ClientOperationHeader) , orbOffset :: Required 2 (Value Word64) , orbLen :: Required 3 (Value Word64) , orbSendChecksums :: Optional 4 (Value Bool) , orbCachingStrategy :: Optional 5 (Message CachingStrategy) } deriving (Generic, Show) instance Encode OpReadBlock instance Decode OpReadBlock data Checksum = Checksum { csType :: Required 1 (Message ChecksumType) , csBytesPerChecksum :: Required 2 (Value Word32) } deriving (Generic, Show) instance Encode Checksum instance Decode Checksum data BlockConstructionStage = BCS_PIPELINE_SETUP_APPEND -- | pipeline set up for failed PIPELINE_SETUP_APPEND recovery | BCS_PIPELINE_SETUP_APPEND_RECOVERY -- | data streaming | BCS_DATA_STREAMING -- | pipeline setup for failed data streaming recovery | BCS_PIPELINE_SETUP_STREAMING_RECOVERY -- | close the block and pipeline | BCS_PIPELINE_CLOSE -- | Recover a failed PIPELINE_CLOSE | BCS_PIPELINE_CLOSE_RECOVERY -- | pipeline set up for block creation | BCS_PIPELINE_SETUP_CREATE -- | transfer RBW for adding datanodes | BCS_TRANSFER_RBW -- | transfer Finalized for adding datanodes | BCS_TRANSFER_FINALIZED deriving (Generic, Show, Eq) instance Enum BlockConstructionStage where toEnum n = case n of 0 -> BCS_PIPELINE_SETUP_APPEND 1 -> BCS_PIPELINE_SETUP_APPEND_RECOVERY 2 -> BCS_DATA_STREAMING 3 -> BCS_PIPELINE_SETUP_STREAMING_RECOVERY 4 -> BCS_PIPELINE_CLOSE 5 -> BCS_PIPELINE_CLOSE_RECOVERY 6 -> BCS_PIPELINE_SETUP_CREATE 7 -> BCS_TRANSFER_RBW 8 -> BCS_TRANSFER_FINALIZED _ -> error $ "BlockConstructionStage.toEnum: invalid enum value <" ++ show n ++ ">" fromEnum e = case e of BCS_PIPELINE_SETUP_APPEND -> 0 BCS_PIPELINE_SETUP_APPEND_RECOVERY -> 1 BCS_DATA_STREAMING -> 2 BCS_PIPELINE_SETUP_STREAMING_RECOVERY -> 3 BCS_PIPELINE_CLOSE -> 4 BCS_PIPELINE_CLOSE_RECOVERY -> 5 BCS_PIPELINE_SETUP_CREATE -> 6 BCS_TRANSFER_RBW -> 7 BCS_TRANSFER_FINALIZED -> 8 instance Encode BlockConstructionStage instance Decode BlockConstructionStage data OpWriteBlock = OpWriteBlock { owbHeader :: Required 1 (Message ClientOperationHeader) , owbTargets :: Repeated 2 (Message DataNodeInfo) , owbSource :: Optional 3 (Message DataNodeInfo) , owbStage :: Required 4 (Enumeration BlockConstructionStage) , owbPipelineSize :: Required 5 (Value Word32) , owbMinBytesRcvd :: Required 6 (Value Word64) , owbMaxBytesRcvd :: Required 7 (Value Word64) , owbLatestGenerationStamp :: Required 8 (Value Word64) -- The requested checksum mechanism for this block write. , owbRequestedChecksum :: Required 9 (Message Checksum) , owbCachingStrategy :: Optional 10 (Message CachingStrategy) } deriving (Generic, Show) instance Encode OpWriteBlock instance Decode OpWriteBlock data OpTransferBlock = OpTransferBlock { otbHeader :: Required 1 (Message ClientOperationHeader) , otbTargets :: Repeated 2 (Message DataNodeInfo) } deriving (Generic, Show) instance Encode OpTransferBlock instance Decode OpTransferBlock data OpReplaceBlock = OpReplaceBlock { orpHeader :: Required 1 (Message BaseHeader) , orpDelHint :: Required 2 (Value Text) , orpSource :: Required 3 (Message DataNodeInfo) } deriving (Generic, Show) instance Encode OpReplaceBlock instance Decode OpReplaceBlock data OpCopyBlock = OpCopyBlock { ocbHeader :: Required 1 (Message BaseHeader) } deriving (Generic, Show) instance Encode OpCopyBlock instance Decode OpCopyBlock data OpBlockChecksum = OpBlockChecksum { obcHeader :: Required 1 (Message BaseHeader) } deriving (Generic, Show) instance Encode OpBlockChecksum instance Decode OpBlockChecksum data OpRequestShortCircuitAccess = OpRequestShortCircuitAccess { orscHeader :: Required 1 (Message BaseHeader) -- | In order to get short-circuit access to block data, clients must -- set this to the highest version of the block data that they can -- understand. Currently 1 is the only version, but more versions -- may exist in the future if the on-disk format changes. , orscMaxVersion :: Required 2 (Value Word32) } deriving (Generic, Show) instance Encode OpRequestShortCircuitAccess instance Decode OpRequestShortCircuitAccess -- All fields must be fixed-length! data PacketHeader = PacketHeader { phOffsetInBlock :: Required 1 (Value (Fixed Int64)) , phSeqno :: Required 2 (Value (Fixed Int64)) , phLastPacketInBlock :: Required 3 (Value Bool) , phDataLen :: Required 4 (Value (Fixed Int32)) , phSyncBlock :: Optional 5 (Value Bool) -- ^ default = false } deriving (Generic, Show) instance Encode PacketHeader instance Decode PacketHeader data DataTransferStatus = DT_SUCCESS | DT_ERROR | DT_ERROR_CHECKSUM | DT_ERROR_INVALID | DT_ERROR_EXISTS | DT_ERROR_ACCESS_TOKEN | DT_CHECKSUM_OK | DT_ERROR_UNSUPPORTED deriving (Generic, Show, Eq) instance Enum DataTransferStatus where toEnum n = case n of 0 -> DT_SUCCESS 1 -> DT_ERROR 2 -> DT_ERROR_CHECKSUM 3 -> DT_ERROR_INVALID 4 -> DT_ERROR_EXISTS 5 -> DT_ERROR_ACCESS_TOKEN 6 -> DT_CHECKSUM_OK 7 -> DT_ERROR_UNSUPPORTED _ -> error $ "DataTransferStatus.toEnum: invalid enum value <" ++ show n ++ ">" fromEnum e = case e of DT_SUCCESS -> 0 DT_ERROR -> 1 DT_ERROR_CHECKSUM -> 2 DT_ERROR_INVALID -> 3 DT_ERROR_EXISTS -> 4 DT_ERROR_ACCESS_TOKEN -> 5 DT_CHECKSUM_OK -> 6 DT_ERROR_UNSUPPORTED -> 7 data PipelineAck = PipelineAck { psSeqno :: Required 1 (Value (Signed Int64)) -- TODO paStatus should be a repeated field but there is a bug in protobuf -- TODO with repeated enumeration's , paStatus :: Optional 2 (Enumeration DataTransferStatus) , paDownstreamAckTimeNanos :: Optional 3 (Value Word64) -- ^ default = 0 } deriving (Generic, Show) instance Encode PipelineAck instance Decode PipelineAck -- | Sent as part of the BlockOpResponse -- for READ_BLOCK and COPY_BLOCK operations. data ReadOpChecksumInfo = ReadOpChecksumInfo { rociChecksum :: Required 1 (Message Checksum) -- | The offset into the block at which the first packet -- will start. This is necessary since reads will align -- backwards to a checksum chunk boundary. , rociChunkOffset :: Required 2 (Value Word64) } deriving (Generic, Show) instance Encode ReadOpChecksumInfo instance Decode ReadOpChecksumInfo data BlockOpResponse = BlockOpResponse { borStatus :: Required 1 (Enumeration DataTransferStatus) , borFirstBadLink :: Optional 2 (Value String) , borChecksumResponse :: Optional 3 (Message OpBlockChecksumResponse) , borReadOpChecksumInfo :: Optional 4 (Message ReadOpChecksumInfo) -- | Explanatory text which may be useful to log on the client side , borData :: Optional 5 (Value Text) -- | If the server chooses to agree to the request of a client for -- short-circuit access, it will send a response data with the relevant -- file descriptors attached. -- -- In the body of the data, this version number will be set to the -- specific version number of the block data that the client is about to -- read. , borShortCircuitAccessVersion :: Optional 6 (Value Word32) } deriving (Generic, Show) instance Encode BlockOpResponse instance Decode BlockOpResponse -- | Message sent from the client to the DN after reading the entire -- read request. data ClientReadStatus = ClientReadStatus { crsStatus :: Required 1 (Enumeration DataTransferStatus) } deriving (Generic, Show) instance Encode ClientReadStatus instance Decode ClientReadStatus data DNTransferAck = DNTransferAck { dntaStatus :: Required 1 (Enumeration DataTransferStatus) } deriving (Generic, Show) instance Encode DNTransferAck instance Decode DNTransferAck data OpBlockChecksumResponse = OpBlockChecksumResponse { obcrBytesPerCrc :: Required 1 (Value Word32) , obcrCrcPerBlock :: Required 2 (Value Word64) , obcrMd5 :: Required 3 (Value ByteString) , obcrCrcType :: Optional 4 (Message ChecksumType) } deriving (Generic, Show) instance Encode OpBlockChecksumResponse instance Decode OpBlockChecksumResponse