-- ----------------------------------------------------------------------------

{- |
  Module     : Holumbus.Distribution.DStreamPort
  Copyright  : Copyright (C) 2009 Stefan Schmidt
  License    : MIT

  Maintainer : Stefan Schmidt (stefanschmidt@web.de)
  Stability  : experimental
  Portability: portable
  Version    : 0.1

  This module offers distributed streams and ports.

  Because a DChan allows external read access, the idea came up to
  split a DChan into two parts: a stream and a port. A stream only allows
  you to read data from it. The read-access is limited to the local process which
  created the stream. To send data to a stream, you need a port. This can be used
  on forgein nodes to send data to your local stream.

-- ----------------------------------------------------------------------------

module Holumbus.Distribution.DStreamPort
  -- * datatypes
  , DPort
  , StreamPortMessage(..)
  -- * creating and closing a stream
  , newDStream
  , closeDStream

  -- * operations on a stream
  , isEmptyDStream  
  , receive
  , receiveMsg
  , tryReceive
  , tryReceiveMsg
  , tryWaitReceive
  , tryWaitReceiveMsg
  , withStream
  -- * creating a port
  , newDPortFromStream
  , newDPort

  -- * operations on a port
  , send
  , sendWithGeneric
  , sendWithMaybeGeneric

import           Data.Binary
--import           Holumbus.Common.MRBinary
import qualified Data.ByteString.Lazy as B
import           System.Log.Logger

import           Holumbus.Distribution.DChan

localLogger :: String
localLogger = "Holumbus.Distribution.DStreamPort"

-- ----------------------------------------------------------------------------
-- | Message Datatype.
--   We are sending additional information, to do debugging
data (Binary a) => StreamPortMessage a = StreamPortMessage {
    spm_Data           :: ! a                    -- ^ the data  
  , spm_Generic        :: ! (Maybe B.ByteString) -- ^ some generic data -- could be another port

instance (Binary a) => Binary (StreamPortMessage a) where
  put(StreamPortMessage d g) = put d >> put g
  get = get >>= \d -> get >>= \g -> return $ (StreamPortMessage d g)

-- ----------------------------------------------------------------------------
newtype DStream a = DStream (DChan (StreamPortMessage a))

-- | Creates a new local stream.
newDStream :: (Binary a) => String -> IO (DStream a) 
newDStream s
  = do
    dc <- newDChan s
    return (DStream dc)

-- | Closes a stream.
closeDStream :: DStream a -> IO ()
closeDStream (DStream dc) = closeDChan dc

-- | Tests, if a stream has no more data to read.
isEmptyDStream :: DStream a -> IO Bool
isEmptyDStream (DStream dc) = isEmptyDChan dc

getMaybeMsgData :: (Binary a) => Maybe (StreamPortMessage a) -> Maybe a
getMaybeMsgData (Nothing)  = Nothing
getMaybeMsgData (Just msg) = Just (spm_Data msg)

-- | Reads the data packet of the next message from a stream.
--   If stream is empty, this function will block until a new message arrives.
receive :: (Binary a) => DStream a -> IO a
receive s = (receiveMsg s) >>= \msg -> return (spm_Data msg)

-- | Reads the next message from a stream (data packet + message header).
--   If stream is empty, this function will block until a new message arrives.
receiveMsg :: (Binary a) => DStream a -> IO (StreamPortMessage a)
receiveMsg (DStream dc) = readDChan dc

-- | Reads the data packet of the next message from a stream.
--   If stream is empty, this function will immediately return with Nothing.
tryReceive :: (Binary a) => DStream a -> IO (Maybe a)
tryReceive s = tryReceiveMsg s >>= \msg -> return $ getMaybeMsgData msg

-- | Reads the next message from a stream (data packet + message header).
--   If stream is empty, this function will immediately return with Nothing.
tryReceiveMsg :: (Binary a) => DStream a -> IO (Maybe (StreamPortMessage a))
tryReceiveMsg (DStream dc) = tryReadDChan dc

-- | Reads the data packet of the next message from a stream.
--   If stream is empty, this function will wait for new messages until the 
--   time is up and if no message has arrived, return with Nothing.
tryWaitReceive :: (Binary a) => DStream a -> Int -> IO (Maybe a)
tryWaitReceive s t = (tryWaitReceiveMsg s t) >>= \msg -> return $ getMaybeMsgData msg

-- | Reads the next message from a stream (data packet + message header).
--   If stream is empty, this function will wait for new messages until the 
--   time is up and if no message has arrived, return with Nothing.
tryWaitReceiveMsg :: (Binary a) => DStream a -> Int -> IO (Maybe (StreamPortMessage a))
tryWaitReceiveMsg (DStream dc) t = tryWaitReadDChan dc t

-- | Encapsulates a stream.
--   A new stream is created, then some user-action is done an after that the
--   stream is closed. 
withStream :: (Binary a) => (DStream a -> IO b) -> IO b
withStream f
  = do
    debugM localLogger "withStream: creating new stream"
    s <- newDStream ""
    debugM localLogger "withStream: new stream created"
    res <- f s
    closeDStream s
    return res

-- ----------------------------------------------------------------------------
newtype DPort a = DPort (DChan (StreamPortMessage a))

instance (Binary a) => Binary (DPort a) where
  put(DPort dc) = put dc
  get = get >>= \dc -> return $ (DPort dc)
-- | Creates a new Port, which is bound to a stream.
newDPortFromStream :: DStream a -> IO (DPort a)
newDPortFromStream (DStream dc) = return (DPort dc)

-- | Creates a new port from a streamname and its socketId.
--   The first parameter is the name of the resource and the second one
--   the name of the node.
newDPort :: String -> String -> IO (DPort a)
newDPort r n = newRemoteDChan r n >>= \dc -> return (DPort dc)
-- | Send data to the stream of the port.
--   The data is send via network, if the stream is located on an external
--   processor
send :: (Binary a) => DPort a -> a -> IO ()
send p d = sendWithMaybeGeneric p d Nothing

-- | Like "send", but here we can give some generic data (e.g. a port for reply 
--   messages).
sendWithGeneric :: (Binary a) => DPort a -> a -> B.ByteString -> IO ()
sendWithGeneric p d rp = sendWithMaybeGeneric p d (Just rp) 

-- | Like "sendWithGeneric", but the generic data is optional
sendWithMaybeGeneric :: (Binary a) => DPort a -> a -> Maybe B.ByteString -> IO ()
sendWithMaybeGeneric (DPort dc) d rp = writeDChan dc (StreamPortMessage d rp)