module Network.Transport (
Transport(..),
TransportException(..),
Dispatcher(..),
dispatcher,
Mailboxes,
pullMessage,
dispatchMessage,
withTransport,
withEndpoint,
withClient,
withServer,
Binding(..),
withBinding,
withBinding2,
withBinding3,
withBinding4,
Connection(..),
withConnection,
withConnection2,
withConnection3,
withConnection4,
withConnections,
withCompleteNetwork
) where
import Network.Endpoints
import Control.Concurrent.Async
import Control.Concurrent.Mailbox
import Control.Concurrent.STM
import Control.Exception
import qualified Data.Map as M
import qualified Data.Set as S
import Data.Typeable
data Transport = Transport {
bind :: Endpoint -> Name -> IO Binding,
dispatch :: Endpoint -> IO Dispatcher,
connect :: Endpoint -> Name -> IO Connection,
shutdown :: IO ()
}
data Dispatcher = Dispatcher {
stop :: IO ()
}
data TransportException =
NoDataRead |
DataUnderflow
deriving (Show,Typeable)
instance Exception TransportException
dispatcher :: Mailboxes -> Endpoint -> IO Dispatcher
dispatcher mailboxes endpoint = do
d <- async disp
return Dispatcher {
stop = cancel d
}
where
disp = do
atomically $ do
envelope <- readMailbox $ endpointOutbound endpoint
let name = messageDestination envelope
msg = envelopeMessage envelope
dispatchMessage mailboxes name msg
disp
type Mailboxes = TVar (M.Map Name (Mailbox Message))
pullMessage :: Mailboxes -> Name -> STM Message
pullMessage mailboxes destination = do
outbound <- readTVar mailboxes
case M.lookup destination outbound of
Nothing -> retry
Just mailbox -> readMailbox mailbox
dispatchMessage :: Mailboxes -> Name -> Message -> STM ()
dispatchMessage mailboxes name message = do
outbound <- readTVar mailboxes
mailbox <- case M.lookup name outbound of
Nothing -> do
mailbox <- newMailbox
modifyTVar mailboxes $ M.insert name mailbox
return mailbox
Just mailbox -> return mailbox
writeMailbox mailbox message
withTransport :: IO Transport -> (Transport -> IO a) -> IO a
withTransport factory fn = do
transport <- factory
finally (fn transport)
(shutdown transport)
withEndpoint :: Transport -> Endpoint -> IO a -> IO a
withEndpoint transport endpoint fn = do
d <- dispatch transport endpoint
finally fn
(stop d)
data Binding = Binding {
bindingName :: Name,
unbind :: IO ()
}
withBinding :: Transport -> Endpoint -> Name -> IO a -> IO a
withBinding transport endpoint name actor = do
atomically $ do
bindings <- readTVar $ boundEndpointNames endpoint
if S.member name bindings
then throw $ BindingExists name
else modifyTVar (boundEndpointNames endpoint) $ S.insert name
binding <- bind transport endpoint name
finally actor $ do
unbind binding
atomically $ modifyTVar (boundEndpointNames endpoint) $ S.delete name
withBinding2 :: Transport -> (Endpoint,Name) -> (Endpoint,Name) -> IO a -> IO a
withBinding2 transport (endpoint1,name1) (endpoint2,name2) fn =
withBinding transport endpoint1 name1 $
withBinding transport endpoint2 name2 fn
withBinding3 :: Transport -> (Endpoint,Name) -> (Endpoint,Name) -> (Endpoint,Name) -> IO a -> IO a
withBinding3 transport (endpoint1,name1) (endpoint2,name2) (endpoint3,name3) fn =
withBinding transport endpoint1 name1 $
withBinding transport endpoint2 name2 $
withBinding transport endpoint3 name3 fn
withBinding4 :: Transport -> (Endpoint,Name) -> (Endpoint,Name) -> (Endpoint,Name) -> (Endpoint,Name) -> IO a -> IO a
withBinding4 transport (endpoint1,name1) (endpoint2,name2) (endpoint3,name3) (endpoint4,name4) fn =
withBinding transport endpoint1 name1 $
withBinding transport endpoint2 name2 $
withBinding transport endpoint3 name3 $
withBinding transport endpoint4 name4 fn
data Connection = Connection {
disconnect :: IO ()
}
withConnection :: Transport -> Endpoint -> Name -> IO a -> IO a
withConnection transport endpoint name fn = do
connection <- connect transport endpoint name
finally fn $ disconnect connection
withConnection2 :: Transport -> Endpoint -> Name -> Name -> IO a -> IO a
withConnection2 transport endpoint name1 name2 = withConnections transport endpoint [name1,name2]
withConnection3 :: Transport -> Endpoint -> Name -> Name -> Name -> IO a -> IO a
withConnection3 transport endpoint name1 name2 name3 = withConnections transport endpoint [name1,name2,name3]
withConnection4 :: Transport -> Endpoint -> Name -> Name -> Name -> Name -> IO a -> IO a
withConnection4 transport endpoint name1 name2 name3 name4 = withConnections transport endpoint [name1,name2,name3,name4]
withConnections :: Transport -> Endpoint -> [Name] -> IO a -> IO a
withConnections _ _ [] fn = fn
withConnections transport endpoint (destination:destinations) fn =
withConnection transport endpoint destination $
withConnections transport endpoint destinations fn
withCompleteNetwork :: Transport -> [Name] -> Endpoint -> Name -> IO a -> IO a
withCompleteNetwork _ [] _ _ fn = fn
withCompleteNetwork transport(destination:destinations) endpoint origin fn =
if destination == origin
then withConnections transport endpoint destinations fn
else withCompleteNetwork transport destinations endpoint origin fn
withClient :: IO Transport -> Name -> (Endpoint -> IO a) -> IO a
withClient transportFactory name clientFn =
withTransport transportFactory $ \transport -> do
endpoint <- newEndpoint
withEndpoint transport endpoint $
withName endpoint name $
clientFn endpoint
withServer :: IO Transport -> Name -> (Transport -> Endpoint -> IO a) -> IO a
withServer transportFactory name serverFn =
withTransport transportFactory $ \transport -> do
endpoint <- newEndpoint
withEndpoint transport endpoint $
withBinding transport endpoint name $
serverFn transport endpoint