{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE TypeApplications #-}

-- | Module: Control.Distributed.MPI.Serialize
-- Description: Simplified MPI bindings with automatic serialization
--              based on Data.Serialize
-- Copyright: (C) 2020 Erik Schnetter
-- License: Apache-2.0
-- Maintainer: Erik Schnetter <schnetter@gmail.com>
-- Stability: experimental
-- Portability: Requires an externally installed MPI library

module Control.Distributed.MPI.Serialize
  ( -- * Types, and associated functions and constants
    MPIException(..)

    -- ** Communicators
  , Comm(..)
  , commSelf
  , commWorld

    -- ** Message sizes
  , Count(..)
  , fromCount
  , toCount

    -- ** Process ranks
  , Rank(..)
  , anySource
  , commRank
  , commSize
  , fromRank
  , rootRank
  , toRank

    -- ** Message status
  , Status(..)

    -- ** Message tags
  , Tag(..)
  , anyTag
  , fromTag
  , toTag
  , unitTag

  , Request

    -- * Functions

    -- ** Initialization and shutdown
  , abort
  , mainMPI

    -- ** Point-to-point (blocking)
  , recv
  , recv_
  , send
  , sendrecv
  , sendrecv_

    -- ** Point-to-point (non-blocking)
  , irecv
  , isend
  , test
  , test_
  , wait
  , wait_

    -- ** Collective (blocking)
  , barrier
  , bcast
  , bcastRecv
  , bcastSend
  , bcastSend_

    -- ** Collective (non-blocking)
  , ibarrier
  , ibcast
  , ibcastRecv
  , ibcastSend
  , ibcastSend_
  ) where

import Prelude hiding (init)

import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.Loops
import qualified Data.ByteString as B
import qualified Data.ByteString.Unsafe as B
import Data.Maybe
import qualified Data.Serialize as Serialize
import Data.Typeable
import Foreign
import Foreign.C.Types

import qualified Control.Distributed.MPI as MPI
import Control.Distributed.MPI
  ( Comm(..)
  , commSelf
  , commWorld
  , Count(..)
  , fromCount
  , toCount
  , Rank(..)
  , anySource
  , commRank
  , commSize
  , fromRank
  , rootRank
  , toRank
  , Tag(..)
  , anyTag
  , fromTag
  , toTag
  , unitTag
  , abort
  , barrier
  )



-- Serialization, based on Data.Store
type CanSerialize a = Serialize.Serialize a
serialize :: CanSerialize a => a -> IO B.ByteString
serialize :: a -> IO ByteString
serialize = ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString)
-> (a -> ByteString) -> a -> IO ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ByteString
forall a. Serialize a => a -> ByteString
Serialize.encode
deserialize :: CanSerialize a => B.ByteString -> IO a
deserialize :: ByteString -> IO a
deserialize buf :: ByteString
buf =
  do let obj :: Either String a
obj = ByteString -> Either String a
forall a. Serialize a => ByteString -> Either String a
Serialize.decode ByteString
buf
     case Either String a
obj of
       Left str :: String
str -> MPIException -> IO a
forall e a. Exception e => e -> IO a
throwIO (MPIException -> IO a) -> MPIException -> IO a
forall a b. (a -> b) -> a -> b
$
         String -> MPIException
MPIException ("Data.Serialize.decode failed: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
str)
       Right x :: a
x -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x



-- | Run the supplied Maybe computation repeatedly while it returns
-- Nothing. If it returns a value, then returns that value.
whileNothing :: Monad m => m (Maybe a) -> m () -> m a
whileNothing :: m (Maybe a) -> m () -> m a
whileNothing cond :: m (Maybe a)
cond loop :: m ()
loop = m a
go
  where go :: m a
go = do Maybe a
mx <- m (Maybe a)
cond
                case Maybe a
mx of
                  Nothing -> do m ()
loop
                                m a
go
                  Just x :: a
x -> a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x



-- | Exception type indicating an error in a call to MPI
newtype MPIException = MPIException String
  deriving (MPIException -> MPIException -> Bool
(MPIException -> MPIException -> Bool)
-> (MPIException -> MPIException -> Bool) -> Eq MPIException
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: MPIException -> MPIException -> Bool
$c/= :: MPIException -> MPIException -> Bool
== :: MPIException -> MPIException -> Bool
$c== :: MPIException -> MPIException -> Bool
Eq, Eq MPIException
Eq MPIException =>
(MPIException -> MPIException -> Ordering)
-> (MPIException -> MPIException -> Bool)
-> (MPIException -> MPIException -> Bool)
-> (MPIException -> MPIException -> Bool)
-> (MPIException -> MPIException -> Bool)
-> (MPIException -> MPIException -> MPIException)
-> (MPIException -> MPIException -> MPIException)
-> Ord MPIException
MPIException -> MPIException -> Bool
MPIException -> MPIException -> Ordering
MPIException -> MPIException -> MPIException
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: MPIException -> MPIException -> MPIException
$cmin :: MPIException -> MPIException -> MPIException
max :: MPIException -> MPIException -> MPIException
$cmax :: MPIException -> MPIException -> MPIException
>= :: MPIException -> MPIException -> Bool
$c>= :: MPIException -> MPIException -> Bool
> :: MPIException -> MPIException -> Bool
$c> :: MPIException -> MPIException -> Bool
<= :: MPIException -> MPIException -> Bool
$c<= :: MPIException -> MPIException -> Bool
< :: MPIException -> MPIException -> Bool
$c< :: MPIException -> MPIException -> Bool
compare :: MPIException -> MPIException -> Ordering
$ccompare :: MPIException -> MPIException -> Ordering
$cp1Ord :: Eq MPIException
Ord, ReadPrec [MPIException]
ReadPrec MPIException
Int -> ReadS MPIException
ReadS [MPIException]
(Int -> ReadS MPIException)
-> ReadS [MPIException]
-> ReadPrec MPIException
-> ReadPrec [MPIException]
-> Read MPIException
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [MPIException]
$creadListPrec :: ReadPrec [MPIException]
readPrec :: ReadPrec MPIException
$creadPrec :: ReadPrec MPIException
readList :: ReadS [MPIException]
$creadList :: ReadS [MPIException]
readsPrec :: Int -> ReadS MPIException
$creadsPrec :: Int -> ReadS MPIException
Read, Int -> MPIException -> String -> String
[MPIException] -> String -> String
MPIException -> String
(Int -> MPIException -> String -> String)
-> (MPIException -> String)
-> ([MPIException] -> String -> String)
-> Show MPIException
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [MPIException] -> String -> String
$cshowList :: [MPIException] -> String -> String
show :: MPIException -> String
$cshow :: MPIException -> String
showsPrec :: Int -> MPIException -> String -> String
$cshowsPrec :: Int -> MPIException -> String -> String
Show, Typeable)
instance Exception MPIException

mpiAssert :: Bool -> String -> IO ()
mpiAssert :: Bool -> String -> IO ()
mpiAssert cond :: Bool
cond msg :: String
msg =
  do Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
cond) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MPIException -> IO ()
forall a e. Exception e => e -> a
throw (String -> MPIException
MPIException String
msg)
     () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()



data DidInit = DidInit | DidNotInit

initMPI :: IO DidInit
initMPI :: IO DidInit
initMPI =
  do Bool
isInit <- IO Bool
MPI.initialized
     if Bool
isInit
       then DidInit -> IO DidInit
forall (m :: * -> *) a. Monad m => a -> m a
return DidInit
DidNotInit
       else do ThreadSupport
ts <- ThreadSupport -> IO ThreadSupport
MPI.initThread ThreadSupport
MPI.ThreadMultiple
               Bool -> String -> IO ()
mpiAssert (ThreadSupport
ts ThreadSupport -> ThreadSupport -> Bool
forall a. Ord a => a -> a -> Bool
>= ThreadSupport
MPI.ThreadMultiple)
                 ("MPI.init: Insufficient thread support: requiring " String -> String -> String
forall a. [a] -> [a] -> [a]
++
                  ThreadSupport -> String
forall a. Show a => a -> String
show ThreadSupport
MPI.ThreadMultiple String -> String -> String
forall a. [a] -> [a] -> [a]
++
                  ", but MPI library provided only " String -> String -> String
forall a. [a] -> [a] -> [a]
++ ThreadSupport -> String
forall a. Show a => a -> String
show ThreadSupport
ts)
               DidInit -> IO DidInit
forall (m :: * -> *) a. Monad m => a -> m a
return DidInit
DidInit

finalizeMPI :: DidInit -> IO ()
finalizeMPI :: DidInit -> IO ()
finalizeMPI DidInit =
  do Bool
isFinalized <- IO Bool
MPI.finalized
     if Bool
isFinalized
       then () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
       else do IO ()
MPI.finalize
finalizeMPI DidNotInit = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Convenience function to initialize and finalize MPI. This
-- initializes MPI with 'ThreadMultiple' thread support.
mainMPI :: IO () -- ^ action to run with MPI, typically the whole program
        -> IO ()
mainMPI :: IO () -> IO ()
mainMPI action :: IO ()
action = IO DidInit -> (DidInit -> IO ()) -> (DidInit -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO DidInit
initMPI DidInit -> IO ()
finalizeMPI (\_ -> IO ()
action)



-- | A communication request, usually created by a non-blocking
-- communication function.
newtype Request a = Request (MVar (Status, a))

-- | The status of a finished communication, indicating rank and tag
-- of the other communication end point.
data Status = Status { Status -> Rank
msgRank :: !Rank
                     , Status -> Tag
msgTag :: !Tag
                     }
  deriving (Status -> Status -> Bool
(Status -> Status -> Bool)
-> (Status -> Status -> Bool) -> Eq Status
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Status -> Status -> Bool
$c/= :: Status -> Status -> Bool
== :: Status -> Status -> Bool
$c== :: Status -> Status -> Bool
Eq, Eq Status
Eq Status =>
(Status -> Status -> Ordering)
-> (Status -> Status -> Bool)
-> (Status -> Status -> Bool)
-> (Status -> Status -> Bool)
-> (Status -> Status -> Bool)
-> (Status -> Status -> Status)
-> (Status -> Status -> Status)
-> Ord Status
Status -> Status -> Bool
Status -> Status -> Ordering
Status -> Status -> Status
forall a.
Eq a =>
(a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Status -> Status -> Status
$cmin :: Status -> Status -> Status
max :: Status -> Status -> Status
$cmax :: Status -> Status -> Status
>= :: Status -> Status -> Bool
$c>= :: Status -> Status -> Bool
> :: Status -> Status -> Bool
$c> :: Status -> Status -> Bool
<= :: Status -> Status -> Bool
$c<= :: Status -> Status -> Bool
< :: Status -> Status -> Bool
$c< :: Status -> Status -> Bool
compare :: Status -> Status -> Ordering
$ccompare :: Status -> Status -> Ordering
$cp1Ord :: Eq Status
Ord, ReadPrec [Status]
ReadPrec Status
Int -> ReadS Status
ReadS [Status]
(Int -> ReadS Status)
-> ReadS [Status]
-> ReadPrec Status
-> ReadPrec [Status]
-> Read Status
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [Status]
$creadListPrec :: ReadPrec [Status]
readPrec :: ReadPrec Status
$creadPrec :: ReadPrec Status
readList :: ReadS [Status]
$creadList :: ReadS [Status]
readsPrec :: Int -> ReadS Status
$creadsPrec :: Int -> ReadS Status
Read, Int -> Status -> String -> String
[Status] -> String -> String
Status -> String
(Int -> Status -> String -> String)
-> (Status -> String)
-> ([Status] -> String -> String)
-> Show Status
forall a.
(Int -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [Status] -> String -> String
$cshowList :: [Status] -> String -> String
show :: Status -> String
$cshow :: Status -> String
showsPrec :: Int -> Status -> String -> String
$cshowsPrec :: Int -> Status -> String -> String
Show)



-- | Receive an object.
recv :: CanSerialize a
     => Rank                    -- ^ Source rank
     -> Tag                     -- ^ Source tag
     -> Comm                    -- ^ Communicator
     -> IO (Status, a)          -- ^ Message status and received object
recv :: Rank -> Tag -> Comm -> IO (Status, a)
recv recvrank :: Rank
recvrank recvtag :: Tag
recvtag comm :: Comm
comm =
  do Status
status <- IO (Maybe Status) -> IO () -> IO Status
forall (m :: * -> *) a. Monad m => m (Maybe a) -> m () -> m a
whileNothing (Rank -> Tag -> Comm -> IO (Maybe Status)
MPI.iprobe Rank
recvrank Tag
recvtag Comm
comm) IO ()
yield
     Rank
source <- Status -> IO Rank
MPI.getSource Status
status
     Tag
tag <- Status -> IO Tag
MPI.getTag Status
status
     Count
count <- Status -> Datatype -> IO Count
MPI.getCount Status
status Datatype
MPI.datatypeByte
     let len :: Int
len = Count -> Int
forall i. Integral i => Count -> i
MPI.fromCount Count
count
     Ptr CChar
ptr <- Int -> IO (Ptr CChar)
forall a. Int -> IO (Ptr a)
mallocBytes Int
len
     ByteString
buffer <- CStringLen -> IO ByteString
B.unsafePackMallocCStringLen (Ptr CChar
ptr, Int
len)
     Request
req <- ByteString -> Rank -> Tag -> Comm -> IO Request
forall rb. Buffer rb => rb -> Rank -> Tag -> Comm -> IO Request
MPI.irecv ByteString
buffer Rank
source Tag
tag Comm
comm
     IO Bool -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m ()
whileM_ (Bool -> Bool
not (Bool -> Bool) -> IO Bool -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Request -> IO Bool
MPI.test_ Request
req) IO ()
yield
     a
recvobj <- ByteString -> IO a
forall a. CanSerialize a => ByteString -> IO a
deserialize ByteString
buffer
     (Status, a) -> IO (Status, a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Rank -> Tag -> Status
Status Rank
source Tag
tag, a
recvobj)

-- | Receive an object without returning a status.
recv_ :: CanSerialize a
      => Rank                   -- ^ Source rank
      -> Tag                    -- ^ Source tag
      -> Comm                   -- ^ Communicator
      -> IO a                   -- ^ Received object
recv_ :: Rank -> Tag -> Comm -> IO a
recv_ recvrank :: Rank
recvrank recvtag :: Tag
recvtag comm :: Comm
comm =
  (Status, a) -> a
forall a b. (a, b) -> b
snd ((Status, a) -> a) -> IO (Status, a) -> IO a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Rank -> Tag -> Comm -> IO (Status, a)
forall a. CanSerialize a => Rank -> Tag -> Comm -> IO (Status, a)
recv Rank
recvrank Tag
recvtag Comm
comm

-- | Send an object.
send :: CanSerialize a
     => a                     -- ^ Object to send
     -> Rank                  -- ^ Destination rank
     -> Tag                   -- ^ Message tag
     -> Comm                  -- ^ Communicator
     -> IO ()
send :: a -> Rank -> Tag -> Comm -> IO ()
send sendobj :: a
sendobj sendrank :: Rank
sendrank sendtag :: Tag
sendtag comm :: Comm
comm =
  do ByteString
sendbuf <- a -> IO ByteString
forall a. CanSerialize a => a -> IO ByteString
serialize a
sendobj
     -- Use 'unsafeUseAsCStringLen' to ensure 'sendbuf' is not freed
     -- too early
     ByteString -> (CStringLen -> IO ()) -> IO ()
forall a. ByteString -> (CStringLen -> IO a) -> IO a
B.unsafeUseAsCStringLen ByteString
sendbuf ((CStringLen -> IO ()) -> IO ()) -> (CStringLen -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \_ ->
       do Request
req <- ByteString -> Rank -> Tag -> Comm -> IO Request
forall rb. Buffer rb => rb -> Rank -> Tag -> Comm -> IO Request
MPI.isend ByteString
sendbuf Rank
sendrank Tag
sendtag Comm
comm
          IO Bool -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m ()
whileM_ (Bool -> Bool
not (Bool -> Bool) -> IO Bool -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Request -> IO Bool
MPI.test_ Request
req) IO ()
yield

-- | Send and receive objects simultaneously.
sendrecv :: (CanSerialize a, CanSerialize b)
         => a                   -- ^ Object to send
         -> Rank                -- ^ Destination rank
         -> Tag                 -- ^ Send message tag
         -> Rank                -- ^ Source rank
         -> Tag                 -- ^ Receive message tag
         -> Comm                -- ^ Communicator
         -> IO (Status, b)      -- ^ Message status and received object
sendrecv :: a -> Rank -> Tag -> Rank -> Tag -> Comm -> IO (Status, b)
sendrecv sendobj :: a
sendobj sendrank :: Rank
sendrank sendtag :: Tag
sendtag recvrank :: Rank
recvrank recvtag :: Tag
recvtag comm :: Comm
comm =
  do Request b
recvreq <- Rank -> Tag -> Comm -> IO (Request b)
forall a. CanSerialize a => Rank -> Tag -> Comm -> IO (Request a)
irecv Rank
recvrank Tag
recvtag Comm
comm
     a -> Rank -> Tag -> Comm -> IO ()
forall a. CanSerialize a => a -> Rank -> Tag -> Comm -> IO ()
send a
sendobj Rank
sendrank Tag
sendtag Comm
comm
     Request b -> IO (Status, b)
forall a. Request a -> IO (Status, a)
wait Request b
recvreq

-- | Send and receive objects simultaneously, without returning a
-- status for the received message.
sendrecv_ :: (CanSerialize a, CanSerialize b)
          => a                  -- ^ Object to send
          -> Rank               -- ^ Destination rank
          -> Tag                -- ^ Send message tag
          -> Rank               -- ^ Source rank
          -> Tag                -- ^ Receive message tag
          -> Comm               -- ^ Communicator
          -> IO b               -- ^ Received object
sendrecv_ :: a -> Rank -> Tag -> Rank -> Tag -> Comm -> IO b
sendrecv_ sendobj :: a
sendobj sendrank :: Rank
sendrank sendtag :: Tag
sendtag recvrank :: Rank
recvrank recvtag :: Tag
recvtag comm :: Comm
comm =
  (Status, b) -> b
forall a b. (a, b) -> b
snd ((Status, b) -> b) -> IO (Status, b) -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> Rank -> Tag -> Rank -> Tag -> Comm -> IO (Status, b)
forall a b.
(CanSerialize a, CanSerialize b) =>
a -> Rank -> Tag -> Rank -> Tag -> Comm -> IO (Status, b)
sendrecv a
sendobj Rank
sendrank Tag
sendtag Rank
recvrank Tag
recvtag Comm
comm

-- | Begin to receive an object. Call `test` or `wait` to finish the
-- communication, and to obtain the received object.
irecv :: CanSerialize a
      => Rank                   -- ^ Source rank
      -> Tag                    -- ^ Source tag
      -> Comm                   -- ^ Communicator
      -> IO (Request a)         -- ^ Communication request
irecv :: Rank -> Tag -> Comm -> IO (Request a)
irecv recvrank :: Rank
recvrank recvtag :: Tag
recvtag comm :: Comm
comm =
  do MVar (Status, a)
result <- IO (MVar (Status, a))
forall a. IO (MVar a)
newEmptyMVar
     ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$
       do (Status, a)
res <- Rank -> Tag -> Comm -> IO (Status, a)
forall a. CanSerialize a => Rank -> Tag -> Comm -> IO (Status, a)
recv Rank
recvrank Tag
recvtag Comm
comm
          MVar (Status, a) -> (Status, a) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Status, a)
result (Status, a)
res
     Request a -> IO (Request a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (Status, a) -> Request a
forall a. MVar (Status, a) -> Request a
Request MVar (Status, a)
result)

-- | Begin to send an object. Call 'test' or 'wait' to finish the
-- communication.
isend :: CanSerialize a
      => a                     -- ^ Object to send
      -> Rank                  -- ^ Destination rank
      -> Tag                   -- ^ Message tag
      -> Comm                  -- ^ Communicator
      -> IO (Request ())       -- ^ Communication request
isend :: a -> Rank -> Tag -> Comm -> IO (Request ())
isend sendobj :: a
sendobj sendrank :: Rank
sendrank sendtag :: Tag
sendtag comm :: Comm
comm =
  do MVar (Status, ())
result <- IO (MVar (Status, ()))
forall a. IO (MVar a)
newEmptyMVar
     ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do a -> Rank -> Tag -> Comm -> IO ()
forall a. CanSerialize a => a -> Rank -> Tag -> Comm -> IO ()
send a
sendobj Rank
sendrank Tag
sendtag Comm
comm
                      MVar (Status, ()) -> (Status, ()) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Status, ())
result (Rank -> Tag -> Status
Status Rank
sendrank Tag
sendtag, ())
     Request () -> IO (Request ())
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (Status, ()) -> Request ()
forall a. MVar (Status, a) -> Request a
Request MVar (Status, ())
result)

-- | Check whether a communication has finished, and return the
-- communication result if so.
test :: Request a               -- ^ Communication request
     -> IO (Maybe (Status, a))  -- ^ 'Just' communication result, if
                                -- communication has finished, else 'Nothing'
test :: Request a -> IO (Maybe (Status, a))
test (Request result :: MVar (Status, a)
result) = MVar (Status, a) -> IO (Maybe (Status, a))
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar (Status, a)
result

-- | Check whether a communication has finished, and return the
-- communication result if so, without returning a message status.
test_ :: Request a       -- ^ Communication request
      -> IO (Maybe a)    -- ^ 'Just' communication result, if
                         -- communication has finished, else 'Nothing'
test_ :: Request a -> IO (Maybe a)
test_ req :: Request a
req = ((Status, a) -> a) -> Maybe (Status, a) -> Maybe a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Status, a) -> a
forall a b. (a, b) -> b
snd (Maybe (Status, a) -> Maybe a)
-> IO (Maybe (Status, a)) -> IO (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Request a -> IO (Maybe (Status, a))
forall a. Request a -> IO (Maybe (Status, a))
test Request a
req

-- | Wait for a communication to finish and return the communication
-- result.
wait :: Request a               -- ^ Communication request
     -> IO (Status, a)          -- ^ Message status and communication result
wait :: Request a -> IO (Status, a)
wait (Request result :: MVar (Status, a)
result) = MVar (Status, a) -> IO (Status, a)
forall a. MVar a -> IO a
takeMVar MVar (Status, a)
result

-- | Wait for a communication to finish and return the communication
-- result, without returning a message status.
wait_ :: Request a              -- ^ Communication request
      -> IO a                   -- ^ Communication result
wait_ :: Request a -> IO a
wait_ req :: Request a
req = (Status, a) -> a
forall a b. (a, b) -> b
snd ((Status, a) -> a) -> IO (Status, a) -> IO a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Request a -> IO (Status, a)
forall a. Request a -> IO (Status, a)
wait Request a
req



-- | Broadcast a message from one process (the "root") to all other
-- processes in the communicator. The send object must be present
-- (`Just`) on the root, and is ignored on all non-root processes.
bcast :: CanSerialize a
      => Maybe a
      -> Rank
      -> Comm
      -> IO a
bcast :: Maybe a -> Rank -> Comm -> IO a
bcast sendobj :: Maybe a
sendobj root :: Rank
root comm :: Comm
comm =
  do Rank
rank <- Comm -> IO Rank
MPI.commRank Comm
comm
     if Rank
rank Rank -> Rank -> Bool
forall a. Eq a => a -> a -> Bool
== Rank
root
       then do Bool -> String -> IO ()
mpiAssert (Maybe a -> Bool
forall a. Maybe a -> Bool
isJust Maybe a
sendobj) "bcast: expected send object on root"
               a -> Rank -> Comm -> IO a
forall a. CanSerialize a => a -> Rank -> Comm -> IO a
bcastSend (Maybe a -> a
forall a. HasCallStack => Maybe a -> a
fromJust Maybe a
sendobj) Rank
root Comm
comm
       else Rank -> Comm -> IO a
forall a. CanSerialize a => Rank -> Comm -> IO a
bcastRecv Rank
root Comm
comm

-- | Broadcast a message from one process (the "root") to all other
-- processes in the communicator. Call this function on all non-root
-- processes. Call 'bcastSend' instead on the root process.
bcastRecv :: CanSerialize a
          => Rank
          -> Comm
          -> IO a
bcastRecv :: Rank -> Comm -> IO a
bcastRecv root :: Rank
root comm :: Comm
comm =
  do Rank
rank <- Comm -> IO Rank
MPI.commRank Comm
comm
     Bool -> String -> IO ()
mpiAssert (Rank
rank Rank -> Rank -> Bool
forall a. Eq a => a -> a -> Bool
/= Rank
root) "bcastRecv: expected rank /= root"
     ForeignPtr CLong
lenbuf <- Storable CLong => IO (ForeignPtr CLong)
forall a. Storable a => IO (ForeignPtr a)
mallocForeignPtr @CLong
     Request
lenreq <- (ForeignPtr CLong, Int) -> Rank -> Comm -> IO Request
forall b. Buffer b => b -> Rank -> Comm -> IO Request
MPI.ibcast (ForeignPtr CLong
lenbuf, 1::Int) Rank
root Comm
comm
     IO Bool -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m ()
whileM_ (Bool -> Bool
not (Bool -> Bool) -> IO Bool -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Request -> IO Bool
MPI.test_ Request
lenreq) IO ()
yield
     CLong
len <- ForeignPtr CLong -> (Ptr CLong -> IO CLong) -> IO CLong
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CLong
lenbuf Ptr CLong -> IO CLong
forall a. Storable a => Ptr a -> IO a
peek
     Ptr CChar
ptr <- Int -> IO (Ptr CChar)
forall a. Int -> IO (Ptr a)
mallocBytes (CLong -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral CLong
len)
     ByteString
recvbuf <- CStringLen -> IO ByteString
B.unsafePackMallocCStringLen (Ptr CChar
ptr, CLong -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral CLong
len)
     Request
req <- ByteString -> Rank -> Comm -> IO Request
forall b. Buffer b => b -> Rank -> Comm -> IO Request
MPI.ibcast ByteString
recvbuf Rank
root Comm
comm               
     IO Bool -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m ()
whileM_ (Bool -> Bool
not (Bool -> Bool) -> IO Bool -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Request -> IO Bool
MPI.test_ Request
req) IO ()
yield
     a
recvobj <- ByteString -> IO a
forall a. CanSerialize a => ByteString -> IO a
deserialize ByteString
recvbuf
     a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
recvobj

-- | Broadcast a message from one process (the "root") to all other
-- processes in the communicator. Call this function on the root
-- process. Call 'bcastRecv' instead on all non-root processes.
bcastSend_ :: CanSerialize a
           => a
           -> Rank
           -> Comm
           -> IO ()
bcastSend_ :: a -> Rank -> Comm -> IO ()
bcastSend_ sendobj :: a
sendobj root :: Rank
root comm :: Comm
comm =
  do Rank
rank <- Comm -> IO Rank
MPI.commRank Comm
comm
     Bool -> String -> IO ()
mpiAssert (Rank
rank Rank -> Rank -> Bool
forall a. Eq a => a -> a -> Bool
== Rank
root) "bcastSend: expected rank == root"
     ByteString
sendbuf <- a -> IO ByteString
forall a. CanSerialize a => a -> IO ByteString
serialize a
sendobj
     ForeignPtr CLong
lenbuf <- Storable CLong => IO (ForeignPtr CLong)
forall a. Storable a => IO (ForeignPtr a)
mallocForeignPtr @CLong
     ForeignPtr CLong -> (Ptr CLong -> IO ()) -> IO ()
forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr CLong
lenbuf ((Ptr CLong -> IO ()) -> IO ()) -> (Ptr CLong -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ptr :: Ptr CLong
ptr -> Ptr CLong -> CLong -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr CLong
ptr (Int -> CLong
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int
B.length ByteString
sendbuf))
     Request
lenreq <- (ForeignPtr CLong, Int) -> Rank -> Comm -> IO Request
forall b. Buffer b => b -> Rank -> Comm -> IO Request
MPI.ibcast (ForeignPtr CLong
lenbuf, 1::Int) Rank
root Comm
comm
     IO Bool -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m ()
whileM_ (Bool -> Bool
not (Bool -> Bool) -> IO Bool -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Request -> IO Bool
MPI.test_ Request
lenreq) IO ()
yield
     Request
req <- ByteString -> Rank -> Comm -> IO Request
forall b. Buffer b => b -> Rank -> Comm -> IO Request
MPI.ibcast ByteString
sendbuf Rank
root Comm
comm
     IO Bool -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m ()
whileM_ (Bool -> Bool
not (Bool -> Bool) -> IO Bool -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Request -> IO Bool
MPI.test_ Request
req) IO ()
yield

-- | Broadcast a message from one process (the "root") to all other
-- processes in the communicator. Call this function on the root
-- process. Call 'bcastRecv' instead on all non-root processes.
bcastSend :: CanSerialize a
          => a
          -> Rank
          -> Comm
          -> IO a
bcastSend :: a -> Rank -> Comm -> IO a
bcastSend sendobj :: a
sendobj root :: Rank
root comm :: Comm
comm =
  do a -> Rank -> Comm -> IO ()
forall a. CanSerialize a => a -> Rank -> Comm -> IO ()
bcastSend_ a
sendobj Rank
root Comm
comm
     a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
sendobj

-- | Begin a barrier. Call 'test' or 'wait' to finish the
-- communication.
ibarrier :: Comm
         -> IO (Request ())
ibarrier :: Comm -> IO (Request ())
ibarrier comm :: Comm
comm =
  do MVar (Status, ())
result <- IO (MVar (Status, ()))
forall a. IO (MVar a)
newEmptyMVar
     ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$
       do Request
req <- Comm -> IO Request
MPI.ibarrier Comm
comm
          IO Bool -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m ()
whileM_ (Bool -> Bool
not (Bool -> Bool) -> IO Bool -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Request -> IO Bool
MPI.test_ Request
req) IO ()
yield
          MVar (Status, ()) -> (Status, ()) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Status, ())
result (Rank -> Tag -> Status
Status Rank
MPI.anySource Tag
MPI.anyTag, ())
     Request () -> IO (Request ())
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (Status, ()) -> Request ()
forall a. MVar (Status, a) -> Request a
Request MVar (Status, ())
result)

ibcast :: CanSerialize a
       => Maybe a
       -> Rank
       -> Comm
       -> IO (Request a)
ibcast :: Maybe a -> Rank -> Comm -> IO (Request a)
ibcast sendobj :: Maybe a
sendobj root :: Rank
root comm :: Comm
comm =
  do Rank
rank <- Comm -> IO Rank
MPI.commRank Comm
comm
     if Rank
rank Rank -> Rank -> Bool
forall a. Eq a => a -> a -> Bool
== Rank
root
       then do Bool -> String -> IO ()
mpiAssert (Maybe a -> Bool
forall a. Maybe a -> Bool
isJust Maybe a
sendobj) "ibcast: expected send object on root"
               a -> Rank -> Comm -> IO (Request a)
forall a. CanSerialize a => a -> Rank -> Comm -> IO (Request a)
ibcastSend (Maybe a -> a
forall a. HasCallStack => Maybe a -> a
fromJust Maybe a
sendobj) Rank
root Comm
comm
       else Rank -> Comm -> IO (Request a)
forall a. CanSerialize a => Rank -> Comm -> IO (Request a)
ibcastRecv Rank
root Comm
comm

ibcastRecv :: CanSerialize a
           => Rank
           -> Comm
           -> IO (Request a)
ibcastRecv :: Rank -> Comm -> IO (Request a)
ibcastRecv root :: Rank
root comm :: Comm
comm =
  do MVar (Status, a)
result <- IO (MVar (Status, a))
forall a. IO (MVar a)
newEmptyMVar
     ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$
       do a
recvobj <- Rank -> Comm -> IO a
forall a. CanSerialize a => Rank -> Comm -> IO a
bcastRecv Rank
root Comm
comm
          MVar (Status, a) -> (Status, a) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Status, a)
result (Rank -> Tag -> Status
Status Rank
root Tag
MPI.anyTag, a
recvobj)
     Request a -> IO (Request a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (Status, a) -> Request a
forall a. MVar (Status, a) -> Request a
Request MVar (Status, a)
result)

ibcastSend :: CanSerialize a
           => a
           -> Rank
           -> Comm
           -> IO (Request a)
ibcastSend :: a -> Rank -> Comm -> IO (Request a)
ibcastSend sendobj :: a
sendobj root :: Rank
root comm :: Comm
comm =
  do MVar (Status, a)
result <- IO (MVar (Status, a))
forall a. IO (MVar a)
newEmptyMVar
     ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$
       do a -> Rank -> Comm -> IO ()
forall a. CanSerialize a => a -> Rank -> Comm -> IO ()
bcastSend_ a
sendobj Rank
root Comm
comm
          MVar (Status, a) -> (Status, a) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Status, a)
result (Rank -> Tag -> Status
Status Rank
root Tag
MPI.anyTag, a
sendobj)
     Request a -> IO (Request a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (Status, a) -> Request a
forall a. MVar (Status, a) -> Request a
Request MVar (Status, a)
result)

ibcastSend_ :: CanSerialize a
            => a
            -> Rank
            -> Comm
            -> IO (Request ())
ibcastSend_ :: a -> Rank -> Comm -> IO (Request ())
ibcastSend_ sendobj :: a
sendobj root :: Rank
root comm :: Comm
comm =
  do MVar (Status, ())
result <- IO (MVar (Status, ()))
forall a. IO (MVar a)
newEmptyMVar
     ThreadId
_ <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$
       do a -> Rank -> Comm -> IO ()
forall a. CanSerialize a => a -> Rank -> Comm -> IO ()
bcastSend_ a
sendobj Rank
root Comm
comm
          MVar (Status, ()) -> (Status, ()) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Status, ())
result (Rank -> Tag -> Status
Status Rank
root Tag
MPI.anyTag, ())
     Request () -> IO (Request ())
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (Status, ()) -> Request ()
forall a. MVar (Status, a) -> Request a
Request MVar (Status, ())
result)