-- ----------------------------------------------------------------------------

{- |
  Module     : Holumbus.Distribution.DChan
  Copyright  : Copyright (C) 2009 Stefan Schmidt
  License    : MIT

  Maintainer : Stefan Schmidt (stefanschmidt@web.de)
  Stability  : experimental
  Portability: portable
  Version    : 0.1
  
  This module offers a distributed channel datatype (DChan).

  It is similar to Control.Concurrent.Chan, except that you can use it
  between multiple processes on different computers. You can access a
  DChan (reading and writing) from your local process as well as from
  another one.
-}

-- ----------------------------------------------------------------------------

module Holumbus.Distribution.DChan 
(
  -- * datatypes
    DChan
  
  -- * creating and closing channels
  , newDChan
  , newRemoteDChan
  , closeDChan
  
  -- * operations on a channel
  , writeDChan
  , readDChan
  , tryReadDChan
  , tryWaitReadDChan
  , isEmptyDChan
)
where

import           Control.Concurrent.Chan
import           Data.Binary
import qualified Data.ByteString.Lazy as B
import           System.IO
import           System.Log.Logger
import           System.Timeout

import           Holumbus.Distribution.DNode.Base

localLogger :: String
localLogger = "Holumbus.Distribution.DChan"


dChanType :: DResourceType
dChanType = mkDResourceType "DCHAN"

mkDChanEntry :: (Binary a) => DChanReference a -> DResourceEntry
mkDChanEntry d = DResourceEntry {
    dre_Dispatcher   = dispatchDChanRequest d 
  }


data DChanRequestMessage
  = DCMReqRead
  | DCMReqWrite B.ByteString
  | DCMReqIsEmpty
  deriving (Show)

instance Binary DChanRequestMessage where
  put(DCMReqRead)    = putWord8 1
  put(DCMReqWrite a) = putWord8 2 >> put a
  put(DCMReqIsEmpty) = putWord8 3
  get
    = do
      t <- getWord8
      case t of
        1 -> return (DCMReqRead)
        2 -> get >>= \a -> return (DCMReqWrite a)
        3 -> return (DCMReqIsEmpty)
        _ -> error "DChanRequestMessage: wrong encoding"

  
data DChanResponseMessage
  = DCMRspRead B.ByteString
  | DCMRspWrite
  | DCMRspIsEmpty Bool
  deriving (Show)


instance Binary DChanResponseMessage where
  put(DCMRspRead a)    = putWord8 1 >> put a
  put(DCMRspWrite)     = putWord8 2
  put(DCMRspIsEmpty b) = putWord8 3 >> put b
  get
    = do
      t <- getWord8
      case t of
        1 -> get >>= \a -> return (DCMRspRead a)
        2 -> return (DCMRspWrite)
        3 -> get >>= \b -> return (DCMRspIsEmpty b)
        _ -> error "DChanResponseMessage: wrong encoding"


dispatchDChanRequest :: (Binary a) => DChanReference a -> DNodeId -> Handle -> IO () 
dispatchDChanRequest dch _ hdl
  = do
    debugM localLogger "dispatcher: getting message from handle"
    raw <- getByteStringMessage hdl
    let msg = (decode raw)
    -- debugM localLogger $ "dispatcher: Message: " ++ show msg
    case msg of
      (DCMReqRead)    -> handleRead dch hdl
      (DCMReqWrite d) -> handleWrite dch (decode d) hdl
      (DCMReqIsEmpty) -> handleIsEmpty dch hdl

    
-- | The DChan datatype.
--   Notice that this datatype implements the Data.Binary typeclass.
--   That means that you can pass a DChan, so that another computer
--   can access the channel.
data DChan a
  = DChanLocal DResourceAddress (Chan a)
  | DChanRemote DResourceAddress

instance Binary (DChan a) where
  put(DChanLocal a _) = put a
  put(DChanRemote a)  = put a
  get = get >>= \a -> return (DChanRemote a)


data DChanReference a = DChanReference DResourceAddress (Chan a) 


-- | Creates a new DChan on the local computer. The first parameter
--   is the name of the Channel which could be used in other processes to
--   access this stream. If you leave it empty, a random Id will be created.
newDChan :: (Binary a) => String -> IO (DChan a)
newDChan s
  = do
    dra <- genLocalResourceAddress dChanType s
    c <- newChan
    let dch = (DChanLocal dra c)
        dcr = (DChanReference dra c)
        dce = (mkDChanEntry dcr)
    addLocalResource dra dce
    return dch
    
-- TODO merge this with newDChan?
-- | Creates a reference to a DChan which was created in a different
--   process.
--   The first parameter is the name of the resource and the second one
--   the name of the node.
newRemoteDChan :: String -> String -> IO (DChan a)
newRemoteDChan r n
  = do
    return $ DChanRemote dra
    where
    dra = mkDResourceAddress dChanType r n


-- | Closes a DChan object, could not be used anymore after this call.
closeDChan :: (DChan a) -> IO ()
closeDChan (DChanLocal dra _)
  = do
    delLocalResource dra
closeDChan (DChanRemote dra)
  = do
    delForeignResource dra



requestRead :: (Binary a) => Handle -> IO a
requestRead hdl
  = do
    putByteStringMessage (encode $ DCMReqRead) hdl
    raw <- getByteStringMessage hdl
    let rsp = (decode raw)
    case rsp of
      (DCMRspRead d)  -> return $ decode d
      _ -> error "DChan - requestRead: invalid response"
    

handleRead :: (Binary a) => DChanReference a -> Handle -> IO ()
handleRead (DChanReference _ ch) hdl
  = do
    a <- readChan ch
    putByteStringMessage (encode $ DCMRspRead $ encode a) hdl


requestWrite :: (Binary a) => a -> Handle -> IO ()
requestWrite a hdl
  = do 
    putByteStringMessage (encode $ DCMReqWrite $ encode a) hdl
    raw <- getByteStringMessage hdl
    let rsp = (decode raw)
    case rsp of
      (DCMRspWrite) -> return ()
      _ -> error "DChan - requestWrite: invalid response"
    

handleWrite :: DChanReference a -> a -> Handle -> IO ()
handleWrite (DChanReference _ ch) a hdl
  = do
    writeChan ch a
    putByteStringMessage (encode $ DCMRspWrite) hdl


requestIsEmpty :: Handle -> IO Bool
requestIsEmpty hdl
  = do
    putByteStringMessage (encode $ DCMReqIsEmpty) hdl
    raw <- getByteStringMessage hdl
    let rsp = (decode raw)
    case rsp of
      (DCMRspIsEmpty b) -> return b
      _ -> error "DChan - requestIsEmpty: invalid response"
    

handleIsEmpty :: DChanReference a -> Handle -> IO ()
handleIsEmpty (DChanReference _ ch) hdl
  = do
    b <- isEmptyChan ch
    putByteStringMessage (encode $ DCMRspIsEmpty b) hdl 


-- | Writes data to a DChan.
writeDChan :: (Binary a) => DChan a -> a -> IO ()
writeDChan (DChanLocal _ c) v
  = do
    writeChan c v
writeDChan (DChanRemote a) v
  = do
    unsafeAccessForeignResource a (requestWrite v)


-- | Reads data from a DChan, blocks if DChan is empty.
readDChan :: (Binary a) => DChan a -> IO a
readDChan (DChanLocal _ c)
  = do
    readChan c
readDChan (DChanRemote a)
  = do
    unsafeAccessForeignResource a requestRead


-- | Tries to read data from a DChan, if the DChan is empty,
--   the function return with Nothing.
tryReadDChan :: (Binary a) => DChan a -> IO (Maybe a)
tryReadDChan dc
  = do
    empty <- isEmptyDChan dc
    if (not empty) 
      then do timeout 1000 (readDChan dc)
      else do return Nothing


-- | Reads data from a DChan. If the channel is empty, it waits
--   for a given time (in microseconds) an returns immediately
--   when new data arrives, otherwise it return Nothing.
tryWaitReadDChan :: (Binary a) => DChan a -> Int -> IO (Maybe a) 
tryWaitReadDChan dc t = timeout t (readDChan dc)
    

-- | Tests, if a DChan is empty.
isEmptyDChan :: DChan a -> IO Bool
isEmptyDChan (DChanLocal _ c)
  = do
    isEmptyChan c
isEmptyDChan (DChanRemote a)
  = do
    unsafeAccessForeignResource a requestIsEmpty