module Neovim.RPC.SocketReader (
runSocketReader,
) where
import Neovim.Classes
import Neovim.Context hiding (ask, asks)
import Neovim.Plugin.IPC
import Neovim.Plugin.IPC.Internal
import Neovim.RPC.Common
import Neovim.RPC.FunctionCall
import Control.Applicative
import Control.Concurrent (forkIO)
import Control.Concurrent.STM
import Control.Monad (void)
import Control.Monad.Reader (MonadReader, ask, asks)
import Control.Monad.Trans.Resource
import Data.Conduit as C
import Data.Conduit.Binary
import Data.Conduit.Cereal
import Data.Foldable (forM_)
import qualified Data.Map as Map
import Data.MessagePack
import Data.Monoid
import qualified Data.Serialize (get)
import Data.Text (Text, unpack)
import System.IO (IOMode (ReadMode))
import System.Log.Logger
import Prelude
logger :: String
logger = "Socket Reader"
runSocketReader :: SocketType -> ConfigWrapper RPCConfig -> IO ()
runSocketReader socketType env = do
h <- createHandle ReadMode socketType
runSocketHandler env $
sourceHandle h
$= conduitGet Data.Serialize.get
$$ messageHandlerSink
newtype SocketHandler a =
SocketHandler (ResourceT (Neovim RPCConfig ()) a)
deriving ( Functor, Applicative, Monad , MonadIO
, MonadReader (ConfigWrapper RPCConfig), MonadThrow)
runSocketHandler :: ConfigWrapper RPCConfig -> SocketHandler a -> IO ()
runSocketHandler r (SocketHandler a) =
void $ runNeovim r () (runResourceT a >> quit)
messageHandlerSink :: Sink Object SocketHandler ()
messageHandlerSink = awaitForever $ \rpc -> do
liftIO . debugM logger $ "Received: " <> show rpc
case rpc of
ObjectArray [ObjectInt msgType, ObjectInt fi, e, result] ->
handleResponseOrRequest msgType fi e result
ObjectArray [ObjectInt msgType, method, params] ->
handleNotification msgType method params
obj -> liftIO . errorM logger $
"Unhandled rpc message: " <> show obj
handleResponseOrRequest :: Int64 -> Int64 -> Object -> Object
-> Sink a SocketHandler ()
handleResponseOrRequest msgType i
| msgType == 1 = handleResponse i
| msgType == 0 = handleRequestOrNotification (Just i)
| otherwise = \_ _ -> do
liftIO . errorM logger $ "Invalid message type: " <> show msgType
return ()
handleResponse :: Int64 -> Object -> Object -> Sink a SocketHandler ()
handleResponse i e result = do
answerMap <- asks (recipients . customConfig)
mReply <- Map.lookup i <$> liftIO (readTVarIO answerMap)
case mReply of
Nothing -> liftIO $ warningM logger
"Received response but could not find a matching recipient."
Just (_,reply) -> do
atomically' . modifyTVar' answerMap $ Map.delete i
atomically' . putTMVar reply $ case e of
ObjectNil -> Right result
_ -> Left e
handleRequestOrNotification :: Maybe Int64 -> Object -> Object -> Sink a SocketHandler ()
handleRequestOrNotification mi method (ObjectArray params) = case fromObject method of
Left e -> liftIO . errorM logger $ show e
Right m -> void . liftIO . forkIO . handle m =<< ask
where
lookupFunction :: Text -> RPCConfig -> STM (Maybe FunctionType)
lookupFunction m rpc = Map.lookup m <$> readTMVar (functions rpc)
handle m rpc = atomically (lookupFunction m (customConfig rpc)) >>= \case
Nothing -> do
let errM = "No provider for: " <> unpack m
debugM logger errM
forM_ mi $ \i -> atomically' . writeTQueue (_eventQueue rpc)
. SomeMessage $ Response i (toObject errM) ObjectNil
Just (Stateless f) -> do
liftIO . debugM logger $ "Executing stateless function with ID: " <> show mi
res <- fmap fst <$> runNeovim (rpc { customConfig = () }) () (f $ parseParams params)
forM_ mi $ \i -> atomically' . writeTQueue (_eventQueue rpc)
. SomeMessage . uncurry (Response i) $ responseResult res
Just (Stateful c) -> do
now <- liftIO getCurrentTime
reply <- liftIO newEmptyTMVarIO
let q = (recipients . customConfig) rpc
liftIO . debugM logger $ "Executing stateful function with ID: " <> show mi
case mi of
Just i -> do
atomically' . modifyTVar q $ Map.insert i (now, reply)
atomically' . writeTQueue c . SomeMessage $ Request m i (parseParams params)
Nothing ->
atomically' . writeTQueue c . SomeMessage $ Notification m (parseParams params)
responseResult (Left !e) = (toObject e, ObjectNil)
responseResult (Right !res) = (ObjectNil, toObject res)
handleRequestOrNotification _ _ params = liftIO . errorM logger $
"Parmaeters in request are not in an object array: " <> show params
parseParams :: [Object] -> [Object]
parseParams = \case
[ObjectArray fArgs] -> fArgs
args -> args
handleNotification :: Int64 -> Object -> Object -> Sink a SocketHandler ()
handleNotification 2 method params = do
liftIO . debugM logger $ "Received notification: " <> show method <> show params
handleRequestOrNotification Nothing method params
handleNotification msgType fn params = liftIO . errorM logger $
"Message is not a noticiation. Received msgType: " <> show msgType
<> " The expected value was 2. The following arguments were given: "
<> show fn <> " and " <> show params