{-# LANGUAGE OverloadedStrings, DoAndIfThenElse, FlexibleContexts, CPP #-}

-- | Description : Low-level ZeroMQ communication wrapper.
--
-- The "ZeroMQ" module abstracts away the low-level 0MQ based interface with IPython, replacing it
-- instead with a Haskell Channel based interface. The `serveProfile` function takes a IPython
-- profile specification and returns the channel interface to use.
module IHaskell.IPython.ZeroMQ (
    ZeroMQInterface(..),
    ZeroMQStdin(..),
    serveProfile,
    serveStdin,
    ZeroMQEphemeralPorts,
    withEphemeralPorts,
    ) where

import           Control.Concurrent
import           Control.Exception
import           Control.Monad
import qualified Crypto.Hash as Hash
import           Crypto.Hash.Algorithms (SHA256)
import qualified Crypto.MAC.HMAC as HMAC
import           Data.Aeson
import qualified Data.ByteArray.Encoding as Encoding
import           Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as Char
import qualified Data.ByteString.Lazy as LBS
import           Data.Char
import           Data.Monoid ((<>))
import qualified Data.Text.Encoding as Text
import           System.ZMQ4 as ZMQ4
import           Text.Read (readMaybe)
import           Text.Parsec (runParserT, manyTill, anyToken, (<|>), eof, tokenPrim, incSourceColumn)

import           IHaskell.IPython.Message.Parser
import           IHaskell.IPython.Types

-- | The channel interface to the ZeroMQ sockets. All communication is done via Messages, which are
-- encoded and decoded into a lower level form before being transmitted to IPython. These channels
-- should functionally serve as high-level sockets which speak Messages instead of ByteStrings.
data ZeroMQInterface =
       Channels
         {
         -- | A channel populated with requests from the frontend.
         ZeroMQInterface -> Chan Message
shellRequestChannel :: Chan Message
         -- | Writing to this channel causes a reply to be sent to the frontend.
         , ZeroMQInterface -> Chan Message
shellReplyChannel :: Chan Message
         -- | This channel is a duplicate of the shell request channel, though using a different backend
         -- socket.
         , ZeroMQInterface -> Chan Message
controlRequestChannel :: Chan Message
         -- | This channel is a duplicate of the shell reply channel, though using a different backend
         -- socket.
         , ZeroMQInterface -> Chan Message
controlReplyChannel :: Chan Message
         -- | Writing to this channel sends an iopub message to the frontend.
         , ZeroMQInterface -> Chan Message
iopubChannel :: Chan Message
         -- | Key used to sign messages.
         , ZeroMQInterface -> ByteString
hmacKey :: ByteString
         }

data ZeroMQStdin =
       StdinChannel
         { ZeroMQStdin -> Chan Message
stdinRequestChannel :: Chan Message
         , ZeroMQStdin -> Chan Message
stdinReplyChannel :: Chan Message
         }

-- | Create new channels for a ZeroMQInterface
newZeroMQInterface :: ByteString -> IO ZeroMQInterface
newZeroMQInterface :: ByteString -> IO ZeroMQInterface
newZeroMQInterface ByteString
key = do
  Chan Message
shellReqChan <- forall a. IO (Chan a)
newChan
  Chan Message
shellRepChan <- forall a. IO (Chan a)
newChan
  Chan Message
controlReqChan <- forall a. Chan a -> IO (Chan a)
dupChan Chan Message
shellReqChan
  Chan Message
controlRepChan <- forall a. Chan a -> IO (Chan a)
dupChan Chan Message
shellRepChan
  Chan Message
iopubChan <- forall a. IO (Chan a)
newChan
  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$! Channels
    { shellRequestChannel :: Chan Message
shellRequestChannel = Chan Message
shellReqChan
    , shellReplyChannel :: Chan Message
shellReplyChannel = Chan Message
shellRepChan
    , controlRequestChannel :: Chan Message
controlRequestChannel = Chan Message
controlReqChan
    , controlReplyChannel :: Chan Message
controlReplyChannel = Chan Message
controlRepChan
    , iopubChannel :: Chan Message
iopubChannel = Chan Message
iopubChan
    , hmacKey :: ByteString
hmacKey = ByteString
key
    }

-- | Start responding on all ZeroMQ channels used to communicate with IPython | via the provided
-- profile. Return a set of channels which can be used to | communicate with IPython in a more
-- structured manner.
serveProfile :: Profile            -- ^ The profile specifying which ports and transport mechanisms to use.
             -> Bool               -- ^ Print debug output
             -> IO ZeroMQInterface -- ^ The Message-channel based interface to the sockets.
serveProfile :: Profile -> Bool -> IO ZeroMQInterface
serveProfile Profile
profile Bool
debug = do
  ZeroMQInterface
channels <- ByteString -> IO ZeroMQInterface
newZeroMQInterface (Profile -> ByteString
signatureKey Profile
profile)

  -- Create the context in a separate thread that never finishes. If withContext or withSocket
  -- complete, the context or socket become invalid.
  ThreadId
_ <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall a. (Context -> IO a) -> IO a
withContext forall a b. (a -> b) -> a -> b
$ \Context
ctxt -> do
    -- Serve on all sockets.
    ThreadId
_ <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt Rep
Rep (Profile -> IP
ip Profile
profile) (Profile -> Int
hbPort Profile
profile) forall a b. (a -> b) -> a -> b
$ ZeroMQInterface -> Socket Rep -> IO ()
heartbeat ZeroMQInterface
channels
    ThreadId
_ <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt Router
Router (Profile -> IP
ip Profile
profile) (Profile -> Int
controlPort Profile
profile) forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Router -> IO ()
control Bool
debug ZeroMQInterface
channels
    ThreadId
_ <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt Router
Router (Profile -> IP
ip Profile
profile) (Profile -> Int
shellPort Profile
profile) forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Router -> IO ()
shell Bool
debug ZeroMQInterface
channels

    -- The ctxt is reference counted in this thread only. Thus, the last serveSocket cannot be
    -- asynchronous, because otherwise ctxt would be garbage collectable - since it would only be
    -- used in other threads. Thus, keep the last serveSocket in this thread.
    forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt Pub
Pub (Profile -> IP
ip Profile
profile) (Profile -> Int
iopubPort Profile
profile) forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Pub -> IO ()
iopub Bool
debug ZeroMQInterface
channels

  forall (m :: * -> *) a. Monad m => a -> m a
return ZeroMQInterface
channels

-- | Describes ports used when creating an ephemeral ZeroMQ session. Used to generate the ipython
-- JSON config file.
data ZeroMQEphemeralPorts =
       ZeroMQEphemeralPorts
         { ZeroMQEphemeralPorts -> Int
ephHbPort :: !Port
         , ZeroMQEphemeralPorts -> Int
ephControlPort :: !Port
         , ZeroMQEphemeralPorts -> Int
ephShellPort :: !Port
         , ZeroMQEphemeralPorts -> Int
ephIOPubPort :: !Port
         , ZeroMQEphemeralPorts -> ByteString
ephSignatureKey :: !ByteString
         }

instance ToJSON ZeroMQEphemeralPorts where
  toJSON :: ZeroMQEphemeralPorts -> Value
toJSON ZeroMQEphemeralPorts
ports =
    [Pair] -> Value
object
      [ Key
"ip" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= (IP
"127.0.0.1" :: String)
      , Key
"transport" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= Transport
TCP
      , Key
"control_port" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= ZeroMQEphemeralPorts -> Int
ephControlPort ZeroMQEphemeralPorts
ports
      , Key
"hb_port" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= ZeroMQEphemeralPorts -> Int
ephHbPort ZeroMQEphemeralPorts
ports
      , Key
"shell_port" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= ZeroMQEphemeralPorts -> Int
ephShellPort ZeroMQEphemeralPorts
ports
      , Key
"iopub_port" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= ZeroMQEphemeralPorts -> Int
ephIOPubPort ZeroMQEphemeralPorts
ports
      , Key
"key" forall kv v. (KeyValue kv, ToJSON v) => Key -> v -> kv
.= ByteString -> Text
Text.decodeUtf8 (ZeroMQEphemeralPorts -> ByteString
ephSignatureKey ZeroMQEphemeralPorts
ports)
      ]

parsePort :: String -> Maybe Int
parsePort :: IP -> Maybe Int
parsePort IP
s = forall a. Read a => IP -> Maybe a
readMaybe IP
num
  where
    num :: IP
num = forall a. [a] -> [a]
reverse (forall a. (a -> Bool) -> [a] -> [a]
takeWhile Char -> Bool
isNumber (forall a. [a] -> [a]
reverse IP
s))

bindLocalEphemeralPort :: Socket a -> IO Int
bindLocalEphemeralPort :: forall a. Socket a -> IO Int
bindLocalEphemeralPort Socket a
sock = do
  forall a. Socket a -> IP -> IO ()
bind Socket a
sock forall a b. (a -> b) -> a -> b
$ IP
"tcp://127.0.0.1:*"
  IP
endpointString <- forall a. Socket a -> IO IP
lastEndpoint Socket a
sock
  case IP -> Maybe Int
parsePort IP
endpointString of
    Maybe Int
Nothing ->
      forall (m :: * -> *) a. MonadFail m => IP -> m a
fail forall a b. (a -> b) -> a -> b
$ IP
"internalError: IHaskell.IPython.ZeroMQ.bindLocalEphemeralPort encountered a port index that could not be interpreted as an int."
    Just Int
endpointIndex ->
      forall (m :: * -> *) a. Monad m => a -> m a
return Int
endpointIndex

-- | Run session for communicating with an IPython instance on ephemerally allocated ZMQ4 sockets.
-- The sockets will be closed when the callback returns.
withEphemeralPorts :: ByteString -- ^ HMAC encryption key
                   -> Bool -- ^ Print debug output
                   -> (ZeroMQEphemeralPorts
                    -> ZeroMQInterface
                    -> IO a) -- ^ Callback that takes the interface to the sockets.
                   -> IO a
withEphemeralPorts :: forall a.
ByteString
-> Bool
-> (ZeroMQEphemeralPorts -> ZeroMQInterface -> IO a)
-> IO a
withEphemeralPorts ByteString
key Bool
debug ZeroMQEphemeralPorts -> ZeroMQInterface -> IO a
callback = do
  ZeroMQInterface
channels <- ByteString -> IO ZeroMQInterface
newZeroMQInterface ByteString
key
  -- Create the ZMQ4 context
  forall a. (Context -> IO a) -> IO a
withContext forall a b. (a -> b) -> a -> b
$ \Context
ctxt -> do
    -- Create the sockets to communicate with.
    forall a b.
SocketType a =>
Context -> a -> (Socket a -> IO b) -> IO b
withSocket Context
ctxt Rep
Rep forall a b. (a -> b) -> a -> b
$ \Socket Rep
heartbeatSocket -> do
      forall a b.
SocketType a =>
Context -> a -> (Socket a -> IO b) -> IO b
withSocket Context
ctxt Router
Router forall a b. (a -> b) -> a -> b
$ \Socket Router
controlportSocket -> do
        forall a b.
SocketType a =>
Context -> a -> (Socket a -> IO b) -> IO b
withSocket Context
ctxt Router
Router forall a b. (a -> b) -> a -> b
$ \Socket Router
shellportSocket -> do
          forall a b.
SocketType a =>
Context -> a -> (Socket a -> IO b) -> IO b
withSocket Context
ctxt Pub
Pub forall a b. (a -> b) -> a -> b
$ \Socket Pub
iopubSocket -> do
            -- Bind each socket to a local port, getting the port chosen.
            Int
hbPt <- forall a. Socket a -> IO Int
bindLocalEphemeralPort Socket Rep
heartbeatSocket
            Int
controlPt <- forall a. Socket a -> IO Int
bindLocalEphemeralPort Socket Router
controlportSocket
            Int
shellPt <- forall a. Socket a -> IO Int
bindLocalEphemeralPort Socket Router
shellportSocket
            Int
iopubPt <- forall a. Socket a -> IO Int
bindLocalEphemeralPort Socket Pub
iopubSocket
            -- Create object to store ephemeral ports
            let ports :: ZeroMQEphemeralPorts
ports = Int -> Int -> Int -> Int -> ByteString -> ZeroMQEphemeralPorts
ZeroMQEphemeralPorts Int
hbPt Int
controlPt Int
shellPt Int
iopubPt ByteString
key
            -- Launch actions to listen to communicate between channels and cockets.
            ThreadId
_ <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ ZeroMQInterface -> Socket Rep -> IO ()
heartbeat ZeroMQInterface
channels Socket Rep
heartbeatSocket
            ThreadId
_ <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Router -> IO ()
control Bool
debug ZeroMQInterface
channels Socket Router
controlportSocket
            ThreadId
_ <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Router -> IO ()
shell Bool
debug ZeroMQInterface
channels Socket Router
shellportSocket
            ThreadId
_ <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ Bool -> ZeroMQInterface -> Socket Pub -> IO ()
checkedIOpub Bool
debug ZeroMQInterface
channels Socket Pub
iopubSocket
            -- Run callback function; provide it with both ports and channels.
            ZeroMQEphemeralPorts -> ZeroMQInterface -> IO a
callback ZeroMQEphemeralPorts
ports ZeroMQInterface
channels

serveStdin :: Profile -> IO ZeroMQStdin
serveStdin :: Profile -> IO ZeroMQStdin
serveStdin Profile
profile = do
  Chan Message
reqChannel <- forall a. IO (Chan a)
newChan
  Chan Message
repChannel <- forall a. IO (Chan a)
newChan

  -- Create the context in a separate thread that never finishes. If withContext or withSocket
  -- complete, the context or socket become invalid.
  ThreadId
_ <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall a. (Context -> IO a) -> IO a
withContext forall a b. (a -> b) -> a -> b
$ \Context
ctxt ->
    -- Serve on all sockets.
    forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt Router
Router (Profile -> IP
ip Profile
profile) (Profile -> Int
stdinPort Profile
profile) forall a b. (a -> b) -> a -> b
$ \Socket Router
sock -> do
      -- Read the request from the interface channel and send it.
      forall a. Chan a -> IO a
readChan Chan Message
reqChannel forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
False (Profile -> ByteString
signatureKey Profile
profile) Socket Router
sock

      -- Receive a response and write it to the interface channel.
      forall a. Receiver a => Bool -> Socket a -> IO Message
receiveMessage Bool
False Socket Router
sock forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. Chan a -> a -> IO ()
writeChan Chan Message
repChannel

  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Chan Message -> Chan Message -> ZeroMQStdin
StdinChannel Chan Message
reqChannel Chan Message
repChannel

-- | Serve on a given sock in a separate thread. Bind the sock in the | given context and then
-- loop the provided action, which should listen | on the sock and respond to any events.
serveSocket :: SocketType a => Context -> a -> IP -> Port -> (Socket a -> IO b) -> IO ()
serveSocket :: forall a b.
SocketType a =>
Context -> a -> IP -> Int -> (Socket a -> IO b) -> IO ()
serveSocket Context
ctxt a
socketType IP
ipAddress Int
port Socket a -> IO b
action = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$
  forall a b.
SocketType a =>
Context -> a -> (Socket a -> IO b) -> IO b
withSocket Context
ctxt a
socketType forall a b. (a -> b) -> a -> b
$ \Socket a
sock -> do
    forall a. Socket a -> IP -> IO ()
bind Socket a
sock forall a b. (a -> b) -> a -> b
$ IP
"tcp://" forall a. [a] -> [a] -> [a]
++ IP
ipAddress forall a. [a] -> [a] -> [a]
++ IP
":" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> IP
show Int
port
    forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ Socket a -> IO b
action Socket a
sock

-- | Listener on the heartbeat port. Echoes back any data it was sent.
heartbeat :: ZeroMQInterface -> Socket Rep -> IO ()
heartbeat :: ZeroMQInterface -> Socket Rep -> IO ()
heartbeat ZeroMQInterface
_ Socket Rep
sock = do
  -- Read some data.
  ByteString
request <- forall a. Receiver a => Socket a -> IO ByteString
receive Socket Rep
sock

  -- Send it back.
  forall a. Sender a => Socket a -> [Flag] -> ByteString -> IO ()
send Socket Rep
sock [] ByteString
request

-- | Listener on the shell port. Reads messages and writes them to | the shell request channel. For
-- each message, reads a response from the | shell reply channel of the interface and sends it back
-- to the frontend.
shell :: Bool -> ZeroMQInterface -> Socket Router -> IO ()
shell :: Bool -> ZeroMQInterface -> Socket Router -> IO ()
shell Bool
debug ZeroMQInterface
channels Socket Router
sock = do
  -- Receive a message and write it to the interface channel.
  forall a. Receiver a => Bool -> Socket a -> IO Message
receiveMessage Bool
debug Socket Router
sock forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. Chan a -> a -> IO ()
writeChan Chan Message
requestChannel

  -- Read the reply from the interface channel and send it.
  forall a. Chan a -> IO a
readChan Chan Message
replyChannel forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
debug (ZeroMQInterface -> ByteString
hmacKey ZeroMQInterface
channels) Socket Router
sock

  where
    requestChannel :: Chan Message
requestChannel = ZeroMQInterface -> Chan Message
shellRequestChannel ZeroMQInterface
channels
    replyChannel :: Chan Message
replyChannel = ZeroMQInterface -> Chan Message
shellReplyChannel ZeroMQInterface
channels

-- | Listener on the shell port. Reads messages and writes them to | the shell request channel. For
-- each message, reads a response from the | shell reply channel of the interface and sends it back
-- to the frontend.
control :: Bool -> ZeroMQInterface -> Socket Router -> IO ()
control :: Bool -> ZeroMQInterface -> Socket Router -> IO ()
control Bool
debug ZeroMQInterface
channels Socket Router
sock = do
  -- Receive a message and write it to the interface channel.
  forall a. Receiver a => Bool -> Socket a -> IO Message
receiveMessage Bool
debug Socket Router
sock forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. Chan a -> a -> IO ()
writeChan Chan Message
requestChannel

  -- Read the reply from the interface channel and send it.
  forall a. Chan a -> IO a
readChan Chan Message
replyChannel forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
debug (ZeroMQInterface -> ByteString
hmacKey ZeroMQInterface
channels) Socket Router
sock

  where
    requestChannel :: Chan Message
requestChannel = ZeroMQInterface -> Chan Message
controlRequestChannel ZeroMQInterface
channels
    replyChannel :: Chan Message
replyChannel = ZeroMQInterface -> Chan Message
controlReplyChannel ZeroMQInterface
channels

-- | Send messages via the iopub channel. | This reads messages from the ZeroMQ iopub interface
-- channel | and then writes the messages to the socket.
iopub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
iopub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
iopub Bool
debug ZeroMQInterface
channels Socket Pub
sock =
  forall a. Chan a -> IO a
readChan (ZeroMQInterface -> Chan Message
iopubChannel ZeroMQInterface
channels) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
debug (ZeroMQInterface -> ByteString
hmacKey ZeroMQInterface
channels) Socket Pub
sock

-- | Attempt to send a message along the socket, returning true if successful.
trySendMessage :: Sender a => String -> Bool -> ByteString -> Socket a -> Message -> IO Bool
trySendMessage :: forall a.
Sender a =>
IP -> Bool -> ByteString -> Socket a -> Message -> IO Bool
trySendMessage IP
_ Bool
debug ByteString
hmackey Socket a
sock Message
msg = do
  let zmqErrorHandler :: ZMQError -> IO Bool
      zmqErrorHandler :: ZMQError -> IO Bool
zmqErrorHandler ZMQError
e
        -- Ignore errors if we cannot send. We may want to forward this to the thread that tried put the
        -- message in the Chan initially.
        | ZMQError -> Int
errno ZMQError
e forall a. Eq a => a -> a -> Bool
== Int
38 = forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        | Bool
otherwise = forall e a. Exception e => e -> IO a
throwIO ZMQError
e
  (forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
debug ByteString
hmackey Socket a
sock Message
msg forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True) forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` ZMQError -> IO Bool
zmqErrorHandler

-- | Send messages via the iopub channel. This reads messages from the ZeroMQ iopub interface
-- channel and then writes the messages to the socket. This is a checked implementation which will
-- stop if the socket is closed.
checkedIOpub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
checkedIOpub :: Bool -> ZeroMQInterface -> Socket Pub -> IO ()
checkedIOpub Bool
debug ZeroMQInterface
channels Socket Pub
sock = do
  Message
msg <- forall a. Chan a -> IO a
readChan (ZeroMQInterface -> Chan Message
iopubChannel ZeroMQInterface
channels)
  Bool
cont <- forall a.
Sender a =>
IP -> Bool -> ByteString -> Socket a -> Message -> IO Bool
trySendMessage IP
"io" Bool
debug (ZeroMQInterface -> ByteString
hmacKey ZeroMQInterface
channels) Socket Pub
sock Message
msg
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cont forall a b. (a -> b) -> a -> b
$
    Bool -> ZeroMQInterface -> Socket Pub -> IO ()
checkedIOpub Bool
debug ZeroMQInterface
channels Socket Pub
sock

-- | Receive and parse a message from a socket.
receiveMessage :: Receiver a => Bool -> Socket a -> IO Message
receiveMessage :: forall a. Receiver a => Bool -> Socket a -> IO Message
receiveMessage Bool
debug Socket a
sock = do
  [ByteString]
blobs <- forall a. Receiver a => Socket a -> IO [ByteString]
receiveMulti Socket a
sock
  forall s (m :: * -> *) t u a.
Stream s m t =>
ParsecT s u m a -> u -> IP -> s -> m (Either ParseError a)
runParserT forall {u}.
ParsecT
  [ByteString]
  u
  IO
  ([ByteString], ByteString, ByteString, ByteString, ByteString,
   [ByteString])
parseBlobs () IP
"" [ByteString]
blobs forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Either
  ParseError
  ([ByteString], ByteString, ByteString, ByteString, ByteString,
   [ByteString])
r -> case Either
  ParseError
  ([ByteString], ByteString, ByteString, ByteString, ByteString,
   [ByteString])
r of
    Left ParseError
parseerr -> forall (m :: * -> *) a. MonadFail m => IP -> m a
fail forall a b. (a -> b) -> a -> b
$ IP
"Malformed Wire Protocol message: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> IP
show ParseError
parseerr
    Right ([ByteString]
idents, ByteString
headerData, ByteString
parentHeader, ByteString
metaData, ByteString
content, [ByteString]
buffers) -> do
      forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
debug forall a b. (a -> b) -> a -> b
$ do
        IP -> IO ()
putStr IP
"Header: "
        ByteString -> IO ()
Char.putStrLn ByteString
headerData
        IP -> IO ()
putStr IP
"Content: "
        ByteString -> IO ()
Char.putStrLn ByteString
content
      forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [ByteString]
-> ByteString
-> ByteString
-> ByteString
-> ByteString
-> [ByteString]
-> Message
parseMessage [ByteString]
idents ByteString
headerData ByteString
parentHeader ByteString
metaData ByteString
content [ByteString]
buffers
  where
    parseBlobs :: ParsecT
  [ByteString]
  u
  IO
  ([ByteString], ByteString, ByteString, ByteString, ByteString,
   [ByteString])
parseBlobs = do
      [ByteString]
idents       <- forall s (m :: * -> *) t u a end.
Stream s m t =>
ParsecT s u m a -> ParsecT s u m end -> ParsecT s u m [a]
manyTill forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken (forall {s} {m :: * -> *} {u}.
Stream s m ByteString =>
(ByteString -> Bool) -> ParsecT s u m ByteString
satisfy (forall a. Eq a => a -> a -> Bool
==ByteString
"<IDS|MSG>"))
      ByteString
_            <- forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> forall (m :: * -> *) a. MonadFail m => IP -> m a
fail IP
"No signature"
      ByteString
headerData   <- forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> forall (m :: * -> *) a. MonadFail m => IP -> m a
fail IP
"No headerData"
      ByteString
parentHeader <- forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> forall (m :: * -> *) a. MonadFail m => IP -> m a
fail IP
"No parentHeader"
      ByteString
metaData     <- forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> forall (m :: * -> *) a. MonadFail m => IP -> m a
fail IP
"No metaData"
      ByteString
content      <- forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken forall s u (m :: * -> *) a.
ParsecT s u m a -> ParsecT s u m a -> ParsecT s u m a
<|> forall (m :: * -> *) a. MonadFail m => IP -> m a
fail IP
"No contents"
      [ByteString]
buffers      <- forall s (m :: * -> *) t u a end.
Stream s m t =>
ParsecT s u m a -> ParsecT s u m end -> ParsecT s u m [a]
manyTill forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m t
anyToken forall s (m :: * -> *) t u.
(Stream s m t, Show t) =>
ParsecT s u m ()
eof
      forall (f :: * -> *) a. Applicative f => a -> f a
pure ([ByteString]
idents, ByteString
headerData, ByteString
parentHeader, ByteString
metaData, ByteString
content, [ByteString]
buffers)
    satisfy :: (ByteString -> Bool) -> ParsecT s u m ByteString
satisfy ByteString -> Bool
f = forall s (m :: * -> *) t a u.
Stream s m t =>
(t -> IP)
-> (SourcePos -> t -> s -> SourcePos)
-> (t -> Maybe a)
-> ParsecT s u m a
tokenPrim ByteString -> IP
Char.unpack (\SourcePos
pos ByteString
_ s
_ -> SourcePos -> Int -> SourcePos
incSourceColumn SourcePos
pos Int
1)
                                      (\ByteString
c -> if ByteString -> Bool
f ByteString
c then forall a. a -> Maybe a
Just ByteString
c else forall a. Maybe a
Nothing)

-- | Encode a message in the IPython ZeroMQ communication protocol and send it through the provided
-- socket. Sign it using HMAC with SHA-256 using the provided key.
sendMessage :: Sender a => Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage :: forall a.
Sender a =>
Bool -> ByteString -> Socket a -> Message -> IO ()
sendMessage Bool
_ ByteString
_ Socket a
_ Message
SendNothing = forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendMessage Bool
debug ByteString
hmackey Socket a
sock Message
msg = do
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
debug forall a b. (a -> b) -> a -> b
$ do
    IP -> IO ()
putStr IP
"Message: "
    forall a. Show a => a -> IO ()
print Message
msg
    IP -> IO ()
putStr IP
"Sent: "
    forall a. Show a => a -> IO ()
print ByteString
content

  -- Send all pieces of the message.
  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ByteString -> IO ()
sendPiece [ByteString]
idents
  ByteString -> IO ()
sendPiece ByteString
"<IDS|MSG>"
  ByteString -> IO ()
sendPiece ByteString
signature
  ByteString -> IO ()
sendPiece ByteString
headStr
  ByteString -> IO ()
sendPiece ByteString
parentHeaderStr
  ByteString -> IO ()
sendPiece ByteString
metadata

  -- If there are no mhBuffers, then conclude transmission with content.
  case MessageHeader -> [ByteString]
mhBuffers MessageHeader
hdr of
    [] -> ByteString -> IO ()
sendLast ByteString
content
    [ByteString]
_  -> ByteString -> IO ()
sendPiece ByteString
content

  [ByteString] -> IO ()
sendBuffers forall a b. (a -> b) -> a -> b
$ MessageHeader -> [ByteString]
mhBuffers MessageHeader
hdr

  where
    sendBuffers :: [ByteString] -> IO ()
sendBuffers [] = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    sendBuffers [ByteString
b] = ByteString -> IO ()
sendLast ByteString
b
    sendBuffers (ByteString
b:[ByteString]
bs) = ByteString -> IO ()
sendPiece ByteString
b forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> [ByteString] -> IO ()
sendBuffers [ByteString]
bs

    sendPiece :: ByteString -> IO ()
sendPiece = forall a. Sender a => Socket a -> [Flag] -> ByteString -> IO ()
send Socket a
sock [Flag
SendMore]
    sendLast :: ByteString -> IO ()
sendLast = forall a. Sender a => Socket a -> [Flag] -> ByteString -> IO ()
send Socket a
sock []

    -- Encode to a strict bytestring.
    encodeStrict :: ToJSON a => a -> ByteString
    encodeStrict :: forall a. ToJSON a => a -> ByteString
encodeStrict = ByteString -> ByteString
LBS.toStrict forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. ToJSON a => a -> ByteString
encode

    -- Signature for the message using HMAC SHA-256.
    signature :: ByteString
    signature :: ByteString
signature = ByteString -> ByteString
hmac forall a b. (a -> b) -> a -> b
$ ByteString
headStr forall a. Semigroup a => a -> a -> a
<> ByteString
parentHeaderStr forall a. Semigroup a => a -> a -> a
<> ByteString
metadata forall a. Semigroup a => a -> a -> a
<> ByteString
content

    -- Compute the HMAC SHA-256 signature of a bytestring message.
    hmac :: ByteString -> ByteString
    hmac :: ByteString -> ByteString
hmac = (forall bin bout.
(ByteArrayAccess bin, ByteArray bout) =>
Base -> bin -> bout
Encoding.convertToBase Base
Encoding.Base16 :: Hash.Digest SHA256 -> ByteString)
      forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. HMAC a -> Digest a
HMAC.hmacGetDigest
      forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall key message a.
(ByteArrayAccess key, ByteArrayAccess message, HashAlgorithm a) =>
key -> message -> HMAC a
HMAC.hmac ByteString
hmackey

    -- Pieces of the message.
    hdr :: MessageHeader
hdr = Message -> MessageHeader
header Message
msg
    parentHeaderStr :: ByteString
parentHeaderStr = forall b a. b -> (a -> b) -> Maybe a -> b
maybe ByteString
"{}" forall a. ToJSON a => a -> ByteString
encodeStrict forall a b. (a -> b) -> a -> b
$ MessageHeader -> Maybe MessageHeader
mhParentHeader MessageHeader
hdr
    idents :: [ByteString]
idents = MessageHeader -> [ByteString]
mhIdentifiers MessageHeader
hdr
    metadata :: ByteString
metadata = let Metadata Object
mdobject = MessageHeader -> Metadata
mhMetadata MessageHeader
hdr in forall a. ToJSON a => a -> ByteString
encodeStrict Object
mdobject
    content :: ByteString
content = forall a. ToJSON a => a -> ByteString
encodeStrict Message
msg
    headStr :: ByteString
headStr = forall a. ToJSON a => a -> ByteString
encodeStrict MessageHeader
hdr