{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RecordWildCards #-}
module Database.Bolt.Connection.Pipe where
import qualified Database.Bolt.Connection.Connection as C (close, connect, recv, send, sendMany)
import Database.Bolt.Connection.Instances
import Database.Bolt.Connection.Type
import Database.Bolt.Value.Helpers
import Database.Bolt.Value.Instances
import Database.Bolt.Value.Type (BoltValue (pack, unpackT),
FromStructure (fromStructure),
ToStructure (toStructure), unpackAction)
import Control.Exception (throwIO)
import Control.Monad (forM_, unless, void, when)
import Control.Monad.Except (ExceptT, MonadError (..), runExceptT)
import Control.Monad.Trans (MonadIO (..))
import Data.Binary (decode)
import Data.Binary.Put (runPut)
import Data.ByteString (ByteString)
import qualified Data.ByteString as B (concat, length)
import qualified Data.ByteString.Lazy as BSL
import qualified Data.ByteString.Lazy.Internal as BSL
import Data.Int (Int64)
import Data.Word (Word16)
import GHC.Stack (HasCallStack)
type MonadPipe m = (MonadIO m, MonadError BoltError m)
connect :: MonadIO m => HasCallStack => BoltCfg -> m Pipe
connect :: forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
BoltCfg -> m Pipe
connect = forall (m :: * -> *) a b.
(MonadIO m, HasCallStack) =>
(a -> ExceptT BoltError m b) -> a -> m b
makeIO forall (m :: * -> *). MonadPipe m => BoltCfg -> m Pipe
connect'
where
connect' :: MonadPipe m => BoltCfg -> m Pipe
connect' :: forall (m :: * -> *). MonadPipe m => BoltCfg -> m Pipe
connect' BoltCfg
bcfg = do ConnectionWithTimeout
conn <- forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
Bool -> String -> PortNumber -> Int -> m ConnectionWithTimeout
C.connect (BoltCfg -> Bool
secure BoltCfg
bcfg) (BoltCfg -> String
host BoltCfg
bcfg) (forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ BoltCfg -> Int
port BoltCfg
bcfg) (BoltCfg -> Int
socketTimeout BoltCfg
bcfg)
let pipe :: Pipe
pipe = ConnectionWithTimeout -> Word16 -> Word32 -> Pipe
Pipe ConnectionWithTimeout
conn (BoltCfg -> Word16
maxChunkSize BoltCfg
bcfg) (BoltCfg -> Word32
version BoltCfg
bcfg)
forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
Pipe -> BoltCfg -> m ()
handshake Pipe
pipe BoltCfg
bcfg
forall (f :: * -> *) a. Applicative f => a -> f a
pure Pipe
pipe
close :: MonadIO m => HasCallStack => Pipe -> m ()
close :: forall (m :: * -> *). (MonadIO m, HasCallStack) => Pipe -> m ()
close Pipe
pipe = do forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word32 -> Bool
isNewVersion forall a b. (a -> b) -> a -> b
$ Pipe -> Word32
pipe_version Pipe
pipe) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
(MonadIO m, HasCallStack) =>
(a -> ExceptT BoltError m b) -> a -> m b
makeIO (forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
Pipe -> Request -> m ()
`flush` Request
RequestGoodbye) Pipe
pipe
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
ConnectionWithTimeout -> m ()
C.close forall a b. (a -> b) -> a -> b
$ Pipe -> ConnectionWithTimeout
connection Pipe
pipe
reset :: MonadIO m => HasCallStack => Pipe -> m ()
reset :: forall (m :: * -> *). (MonadIO m, HasCallStack) => Pipe -> m ()
reset = forall (m :: * -> *) a b.
(MonadIO m, HasCallStack) =>
(a -> ExceptT BoltError m b) -> a -> m b
makeIO forall (m :: * -> *). MonadPipe m => Pipe -> m ()
reset'
where
reset' :: MonadPipe m => Pipe -> m ()
reset' :: forall (m :: * -> *). MonadPipe m => Pipe -> m ()
reset' Pipe
pipe = do forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
Pipe -> Request -> m ()
flush Pipe
pipe Request
RequestReset
Response
response <- forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
Pipe -> m Response
fetch Pipe
pipe
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Response -> Bool
isFailure Response
response) forall a b. (a -> b) -> a -> b
$
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError BoltError
ResetFailed
makeIO :: MonadIO m => HasCallStack => (a -> ExceptT BoltError m b) -> a -> m b
makeIO :: forall (m :: * -> *) a b.
(MonadIO m, HasCallStack) =>
(a -> ExceptT BoltError m b) -> a -> m b
makeIO a -> ExceptT BoltError m b
action a
arg = do Either BoltError b
actionIO <- forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (a -> ExceptT BoltError m b
action a
arg)
case Either BoltError b
actionIO of
Right b
x -> forall (f :: * -> *) a. Applicative f => a -> f a
pure b
x
Left BoltError
e -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => e -> IO a
throwIO BoltError
e
processError :: MonadIO m => HasCallStack => Pipe -> m ()
processError :: forall (m :: * -> *). (MonadIO m, HasCallStack) => Pipe -> m ()
processError pipe :: Pipe
pipe@Pipe{Word16
Word32
ConnectionWithTimeout
mcs :: Pipe -> Word16
pipe_version :: Word32
mcs :: Word16
connection :: ConnectionWithTimeout
connection :: Pipe -> ConnectionWithTimeout
pipe_version :: Pipe -> Word32
..} = if Word32 -> Bool
isNewVersion Word32
pipe_version
then forall (m :: * -> *). (MonadIO m, HasCallStack) => Pipe -> m ()
reset Pipe
pipe
else forall (m :: * -> *) a b.
(MonadIO m, HasCallStack) =>
(a -> ExceptT BoltError m b) -> a -> m b
makeIO forall (m :: * -> *). (MonadPipe m, HasCallStack) => Pipe -> m ()
ackFailure Pipe
pipe
ackFailure :: MonadPipe m => HasCallStack => Pipe -> m ()
ackFailure :: forall (m :: * -> *). (MonadPipe m, HasCallStack) => Pipe -> m ()
ackFailure Pipe
pipe = forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
Pipe -> Request -> m ()
flush Pipe
pipe Request
RequestAckFailure forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Functor f => f a -> f ()
void (forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
Pipe -> m Response
fetch Pipe
pipe)
discardAll :: MonadPipe m => HasCallStack => Pipe -> m ()
discardAll :: forall (m :: * -> *). (MonadPipe m, HasCallStack) => Pipe -> m ()
discardAll Pipe
pipe = forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
Pipe -> Request -> m ()
flush Pipe
pipe Request
RequestDiscardAll forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Functor f => f a -> f ()
void (forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
Pipe -> m Response
fetch Pipe
pipe)
flush :: MonadPipe m => HasCallStack => Pipe -> Request -> m ()
flush :: forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
Pipe -> Request -> m ()
flush Pipe
pipe Request
request = do forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ByteString]
chunks forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
ConnectionWithTimeout -> [ByteString] -> m ()
C.sendMany ConnectionWithTimeout
conn forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> [ByteString]
mkChunk
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
ConnectionWithTimeout -> ByteString -> m ()
C.send ConnectionWithTimeout
conn ByteString
terminal
where bs :: ByteString
bs = Put -> ByteString
runPut forall a b. (a -> b) -> a -> b
$ forall a. BoltValue a => a -> Put
pack forall a b. (a -> b) -> a -> b
$ forall a. ToStructure a => a -> Structure
toStructure Request
request
chunkSize :: Int64
chunkSize = Word16 -> ByteString -> Int64
chunkSizeFor (Pipe -> Word16
mcs Pipe
pipe) ByteString
bs
chunks :: [ByteString]
chunks = Int64 -> ByteString -> [ByteString]
split Int64
chunkSize ByteString
bs
terminal :: ByteString
terminal = forall a. Binary a => a -> ByteString
encodeStrict (Word16
0 :: Word16)
conn :: ConnectionWithTimeout
conn = Pipe -> ConnectionWithTimeout
connection Pipe
pipe
mkChunk :: ByteString -> [ByteString]
mkChunk :: ByteString -> [ByteString]
mkChunk ByteString
chunk = let size :: Word16
size = forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int
B.length ByteString
chunk) :: Word16
in [forall a. Binary a => a -> ByteString
encodeStrict Word16
size, ByteString
chunk]
fetch :: MonadPipe m => HasCallStack => Pipe -> m Response
fetch :: forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
Pipe -> m Response
fetch Pipe
pipe = do ByteString
bs <- forall (m :: * -> *). MonadPipe m => m ByteString
chunks
let response :: Either UnpackError Response
response = forall a. Get a -> ByteString -> Either UnpackError a
unpackAction forall a. BoltValue a => Get a
unpackT ByteString
bs forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a (m :: * -> *).
(FromStructure a, MonadError UnpackError m) =>
Structure -> m a
fromStructure
case Either UnpackError Response
response of
Left UnpackError
ue -> forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError forall a b. (a -> b) -> a -> b
$ UnpackError -> BoltError
WrongMessageFormat UnpackError
ue
Right Response
a -> forall (f :: * -> *) a. Applicative f => a -> f a
pure Response
a
where conn :: ConnectionWithTimeout
conn = Pipe -> ConnectionWithTimeout
connection Pipe
pipe
chunks :: MonadPipe m => m BSL.ByteString
chunks :: forall (m :: * -> *). MonadPipe m => m ByteString
chunks = do Word16
size <- forall a. Binary a => ByteString -> a
decode forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
ConnectionWithTimeout -> Word16 -> m ByteString
recvChunk ConnectionWithTimeout
conn Word16
2
ByteString
chunk <- forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
ConnectionWithTimeout -> Word16 -> m ByteString
recvChunk ConnectionWithTimeout
conn Word16
size
if ByteString -> Bool
BSL.null ByteString
chunk
then forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
BSL.empty
else do ByteString
rest <- forall (m :: * -> *). MonadPipe m => m ByteString
chunks
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$! ByteString
chunk forall a. Semigroup a => a -> a -> a
<> ByteString
rest
handshake :: MonadPipe m => HasCallStack => Pipe -> BoltCfg -> m ()
handshake :: forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
Pipe -> BoltCfg -> m ()
handshake Pipe
pipe BoltCfg
bcfg = do let conn :: ConnectionWithTimeout
conn = Pipe -> ConnectionWithTimeout
connection Pipe
pipe
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
ConnectionWithTimeout -> ByteString -> m ()
C.send ConnectionWithTimeout
conn (forall a. Binary a => a -> ByteString
encodeStrict forall a b. (a -> b) -> a -> b
$ BoltCfg -> Word32
magic BoltCfg
bcfg)
forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
ConnectionWithTimeout -> ByteString -> m ()
C.send ConnectionWithTimeout
conn (BoltCfg -> ByteString
boltVersionProposal BoltCfg
bcfg)
Word32
serverVersion <- forall a. Binary a => ByteString -> a
decode forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
ConnectionWithTimeout -> Word16 -> m ByteString
recvChunk ConnectionWithTimeout
conn Word16
4
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word32
serverVersion forall a. Eq a => a -> a -> Bool
/= BoltCfg -> Word32
version BoltCfg
bcfg) forall a b. (a -> b) -> a -> b
$
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError BoltError
UnsupportedServerVersion
forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
Pipe -> Request -> m ()
flush Pipe
pipe (BoltCfg -> Request
createInit BoltCfg
bcfg)
Response
response <- forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
Pipe -> m Response
fetch Pipe
pipe
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Response -> Bool
isSuccess Response
response) forall a b. (a -> b) -> a -> b
$
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError BoltError
AuthentificationFailed
boltVersionProposal :: BoltCfg -> ByteString
boltVersionProposal :: BoltCfg -> ByteString
boltVersionProposal BoltCfg
bcfg = [ByteString] -> ByteString
B.concat forall a b. (a -> b) -> a -> b
$ forall a. Binary a => a -> ByteString
encodeStrict forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [BoltCfg -> Word32
version BoltCfg
bcfg, Word32
0, Word32
0, Word32
0]
recvChunk :: MonadPipe m => HasCallStack => ConnectionWithTimeout -> Word16 -> m BSL.ByteString
recvChunk :: forall (m :: * -> *).
(MonadPipe m, HasCallStack) =>
ConnectionWithTimeout -> Word16 -> m ByteString
recvChunk ConnectionWithTimeout
conn Word16
size = forall (m :: * -> *). MonadPipe m => Int -> m ByteString
helper (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
size)
where helper :: MonadPipe m => Int -> m BSL.ByteString
helper :: forall (m :: * -> *). MonadPipe m => Int -> m ByteString
helper Int
0 = forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
BSL.empty
helper Int
sz = do Maybe ByteString
mbChunk <- forall (m :: * -> *).
(MonadIO m, HasCallStack) =>
ConnectionWithTimeout -> Int -> m (Maybe ByteString)
C.recv ConnectionWithTimeout
conn Int
sz
case Maybe ByteString
mbChunk of
Just ByteString
chunk -> ByteString -> ByteString -> ByteString
BSL.chunk ByteString
chunk forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *). MonadPipe m => Int -> m ByteString
helper (Int
sz forall a. Num a => a -> a -> a
- ByteString -> Int
B.length ByteString
chunk)
Maybe ByteString
Nothing -> forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError BoltError
CannotReadChunk
chunkSizeFor :: Word16 -> BSL.ByteString -> Int64
chunkSizeFor :: Word16 -> ByteString -> Int64
chunkSizeFor Word16
maxSize ByteString
bs = Int64
1 forall a. Num a => a -> a -> a
+ forall a. Integral a => a -> a -> a
div Int64
len Int64
noc
where len :: Int64
len = ByteString -> Int64
BSL.length ByteString
bs
noc :: Int64
noc = Int64
1 forall a. Num a => a -> a -> a
+ forall a. Integral a => a -> a -> a
div Int64
len (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
maxSize)
split :: Int64 -> BSL.ByteString -> [ByteString]
split :: Int64 -> ByteString -> [ByteString]
split Int64
size ByteString
bs | ByteString -> Bool
BSL.null ByteString
bs = []
| Bool
otherwise = let (ByteString
chunk, ByteString
rest) = Int64 -> ByteString -> (ByteString, ByteString)
BSL.splitAt Int64
size ByteString
bs
in ByteString -> ByteString
BSL.toStrict ByteString
chunk forall a. a -> [a] -> [a]
: Int64 -> ByteString -> [ByteString]
split Int64
size ByteString
rest