module Network.JsonRpc.Conduit
(
AppConduits
, IncomingMsg(..)
, runConduits
, tcpClient
, tcpServer
, query
, Session(..)
, SentRequests
, initSession
, encodeConduit
, msgConduit
, decodeConduit
) where
import Control.Applicative
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.DeepSeq
import Control.Monad
import Control.Monad.Trans
import Control.Monad.Trans.State
import Data.Aeson
import Data.Aeson.Types (parseEither)
import Data.Attoparsec.ByteString
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy.Char8 as L8
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as M
import Data.Conduit
import qualified Data.Conduit.List as CL
import Data.Conduit.Network
import Data.Conduit.TMChan
import Data.Maybe
import Data.Text (Text)
import Network.JsonRpc.Data
type AppConduits qo no ro qi ni ri m =
(Source m (IncomingMsg qo qi ni ri), Sink (Message qo no ro) m ())
type SentRequests qo = HashMap Id (Request qo)
data IncomingMsg qo qi ni ri
= IncomingMsg { incomingMsg :: !(Message qi ni ri)
, matchingReq :: !(Maybe (Request qo))
}
| IncomingError { incomingError :: !ErrorObj
}
deriving (Eq, Show)
instance (NFData qo, NFData qi, NFData ni, NFData ri)
=> NFData (IncomingMsg qo qi ni ri)
where
rnf (IncomingMsg msg qr) = rnf msg `seq` rnf qr
rnf (IncomingError e) = rnf e
data Session qo = Session
{ lastId :: TVar Id
, sentRequests :: TVar (SentRequests qo)
, isLast :: TQueue Bool
}
initSession :: STM (Session qo)
initSession = Session <$> newTVar (IdInt 0)
<*> newTVar M.empty
<*> newTQueue
encodeConduit :: (ToJSON a, Monad m) => Conduit a m ByteString
encodeConduit = CL.map (L8.toStrict . encode)
msgConduit :: MonadIO m
=> Bool
-> Session qo
-> Conduit (Message qo no ro) m (Message qo no ro)
msgConduit c qs = await >>= \nqM -> case nqM of
Nothing -> do
when c . liftIO . atomically $ writeTQueue (isLast qs) True
Just (MsgRequest rq) -> do
msg <- MsgRequest <$> liftIO (atomically $ addId rq)
yield msg >> msgConduit c qs
Just msg ->
yield msg >> msgConduit c qs
where
addId rq = case getReqId rq of
IdNull -> do
i <- readTVar (lastId qs)
h <- readTVar (sentRequests qs)
let i' = succ i
rq' = rq { getReqId = i' }
h' = M.insert i' rq' h
writeTVar (lastId qs) i'
writeTVar (sentRequests qs) h'
when c $ writeTQueue (isLast qs) False
return rq'
i -> do
h <- readTVar (sentRequests qs)
writeTVar (sentRequests qs) (M.insert i rq h)
when c $ writeTQueue (isLast qs) False
return rq
decodeConduit
:: (FromRequest qi, FromNotif ni, FromResponse ri, MonadIO m)
=> Ver
-> Bool
-> Session qo
-> Conduit ByteString m (IncomingMsg qo qi ni ri)
decodeConduit ver c qs = evalStateT (f True) Nothing where
f re = if c && re
then do
l <- liftIO . atomically $ readTQueue (isLast qs)
if l then return () else loop
else loop
loop = lift await >>= maybe flush process
process = runParser >=> handle
flush = do
p <- get
case p of
Nothing -> return ()
Just k -> handle (k B.empty)
runParser chunk = do
p <- getPartialParser
return $ case p of
Nothing -> parse json' chunk
Just k -> k chunk
getPartialParser = get <* put Nothing
handle (Fail {}) = lift (yield . IncomingError $ errorParse ver Null)
handle (Partial k) = put (Just k) >> loop
handle (Done rest v) = do
msg <- liftIO . atomically $ decodeSTM v
lift $ yield msg
if B.null rest
then f (re msg)
else process rest
where
re (IncomingMsg _ (Just _)) = True
re _ = False
decodeSTM v = do
h <- readTVar (sentRequests qs)
case parseEither (topParse h) v of
Left _ -> return . IncomingError $ errorParse ver Null
Right x -> case x of
Right m@(MsgResponse rs) -> do
rq <- requestSTM h $ getResId rs
return $ IncomingMsg m (Just rq)
Right m@(MsgError re) -> case getErrId re of
IdNull ->
return $ IncomingMsg m Nothing
i -> do
rq <- requestSTM h i
return $ IncomingMsg m (Just rq)
Right m -> return $ IncomingMsg m Nothing
Left e -> return $ IncomingError e
requestSTM h i = do
let rq = fromJust $ i `M.lookup` h
h' = M.delete i h
writeTVar (sentRequests qs) h'
return rq
topParse h v = parseReq v
<|> parseNot v
<|> parseRes h v
<|> return (Left $ errorInvalid ver v)
parseRes h = withObject "response" $ \o -> do
r <- o .:? "result" .!= Null
e <- o .:? "error" .!= Null
when (r == Null && e == Null) mzero
i <- o .:? "id" .!= IdNull
j <- o .:? "jsonrpc"
let ver' = if j == Just ("2.0" :: Text) then V2 else V1
case i of
IdNull ->
Right . MsgError <$> parseJSON (Object o)
_ -> case M.lookup i h of
Nothing -> return . Left $ errorId ver' i
Just rq -> do
rsE <- parseResponse rq (Object o)
return $ case rsE of
Left er -> Right $ MsgError er
Right rs -> Right $ MsgResponse rs
parseReq v = flip fmap (parseRequest v) $ \rqE -> case rqE of
Left e -> Left e
Right rq -> Right $ MsgRequest rq
parseNot v = flip fmap (parseNotif v) $ \rnE -> case rnE of
Left e -> Left e
Right rn -> Right $ MsgNotif rn
query :: (ToJSON qo, ToRequest qo, FromResponse ri)
=> Ver
-> [qo]
-> AppConduits qo () () () () ri IO
-> IO [IncomingMsg qo () () ri]
query ver qs (src, snk) = withAsync (src $$ CL.consume) $ \a -> do
link a
CL.sourceList qs $= CL.map (MsgRequest . buildRequest ver) $$ snk
wait a
runConduits :: ( FromRequest qi, FromNotif ni, FromResponse ri
, ToJSON qo, ToJSON no, ToJSON ro )
=> Ver
-> Bool
-> Sink ByteString IO ()
-> Source IO ByteString
-> (AppConduits qo no ro qi ni ri IO -> IO a)
-> IO a
runConduits ver d rpcSnk rpcSrc f = do
(reqChan, msgChan) <- atomically $ (,) <$> newTBMChan 128 <*> newTBMChan 128
let inbSrc = sourceTBMChan msgChan
inbSnk = sinkTBMChan msgChan True
outSrc = sourceTBMChan reqChan
outSnk = sinkTBMChan reqChan True
withAsync (rpcThread outSrc inbSnk) (g inbSrc outSnk)
where
rpcThread outSrc inbSnk = do
qs <- atomically initSession
withAsync (outThread qs outSrc) $ \a -> do
link a
_ <- rpcSrc $= decodeConduit ver d qs $$ inbSnk
wait a
outThread qs outSrc = outSrc $= msgConduit d qs $= encodeConduit $$ rpcSnk
g inbSrc outSnk a = do
link a
x <- f (inbSrc, outSnk)
_ <- wait a
return x
tcpClient :: ( FromRequest qi, FromNotif ni, FromResponse ri
, ToJSON qo, ToJSON no, ToJSON ro )
=> Ver
-> Bool
-> ClientSettings
-> (AppConduits qo no ro qi ni ri IO -> IO a)
-> IO a
tcpClient ver d cs f = runTCPClient cs $ \ad -> do
runConduits ver d (cr =$ appSink ad) (appSource ad $= ln) f
where
cr = CL.map (flip B8.snoc '\n')
ln = CL.mapFoldable B8.lines
tcpServer :: ( FromRequest qi, FromNotif ni, FromResponse ri
, ToJSON qo, ToJSON no, ToJSON ro )
=> Ver
-> ServerSettings
-> (AppConduits qo no ro qi ni ri IO -> IO ())
-> IO ()
tcpServer ver ss f = runTCPServer ss $ \cl -> do
runConduits ver False (cr =$ appSink cl) (appSource cl) f
where
cr = CL.map (flip B8.snoc '\n')