{-# LANGUAGE MultiParamTypeClasses, TypeSynonymInstances, FlexibleInstances #-}


{-
--------------------------------------------------------------------------------
--
-- Copyright (C) 2008 Martin Sulzmann, Edmund Lam. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

    * Redistributions of source code must retain the above copyright
      notice, this list of conditions and the following disclaimer.

    * Redistributions in binary form must reproduce the above
      copyright notice, this list of conditions and the following
      disclaimer in the documentation and/or other materials provided
      with the distribution.

    * Neither the name of Isaac Jones nor the names of other
      contributors may be used to endorse or promote products derived
      from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

-}


module Actor.ActorLinearSearch where

 
{-

An implementation of actors using channels for the (external) mailbox
and a linear search algorithm for implementing multi-set message matching.

NOTE: We can only support memo for receive bodies returning () "unit",
      For any other return type, some types would be too polymorphic.

-}

import IO
import Monad
import Data.IORef
import Control.Concurrent
import Control.Concurrent.Chan

import Actor.Timeout
import Actor.ActorBase
import Actor.ActorCompiler
import Actor.SList
import Actor.QList


----------------------------------------
-- primitives for hashing messages

type HashIdx = Int

data HashOp msg = 
   HashOp { numberOfTables :: HashIdx
          , hashMsg :: msg -> HashIdx }

-- we use QLists to represent each hash table

-----------------------------------
-- a concrete actor instance


-- linear store representation
data Act msg  = Act { external_mailbox :: Chan msg, 
                      internal_mailbox :: SList (InternalMsg msg),
                      internal_mailbox_barrier :: Iterator (InternalMsg msg),
                      memoTable :: IORef [(Int,[CompClause (Act msg) (Location msg) ()])],
                      hashTable :: [QList (InternalMsg msg)],
                      hashOp :: HashOp msg, 
                      thread_id :: MVar ThreadId
                    }

	
{-
The mailbox consists of two parts:

   The internal mailbox is represented as a singly-linked list.
   The barrier (a iterator) separates active from inactive messages.
   A message is inactive if it couldn't contribute in firing a receive clause.
   We'll maintain a store of inactive messages because in combination with the
   latest active message we may actually fire a receive clause.

   getMessage retrieves the next active message, either by pushing up the barrier.
   If the list is empty, we read from the external mailbox.

   nextMsg scans the store (ie inactive messages)

   deleteMsg logically deletes a message. That is, we turn on a flag saying that the
   message has been logically deleted.

   The actual physical delete happens during getMessage. 

   resetMB re-actives all store messages, we assign the barrier to a new iterator

-}

-- abbreviations

type PID msg = (Chan msg,MVar ThreadId)

type Location msg = IORef (Item (InternalMsg msg))

-- actor interface

actorToPID :: Act msg -> PID msg
actorToPID act = (external_mailbox act, thread_id act)

kill :: PID msg -> IO ()
kill (_,tid) = do { id <- readMVar tid
                  ; killThread id }


send :: PID msg -> msg -> IO ()
send (emb,_) msg = writeChan emb msg
 

createActor :: HashOp msg -> IO (Act msg)
createActor hop =
    do { emb <- newChan                     -- create empty external mailbox
       ; imb <- newSList                    -- create empty internal store
       ; barrier <- newIterator imb         -- initial barrier
       ; memTble <- newIORef []
       ; let n = numberOfTables hop
       ; ht <- mapM (\ _ -> newQList) [1..n]
       ; tid <- newEmptyMVar
       -- only after runActor we can initialize this value
       -- we use MVars to avoid access to the uninitialized value
       ; let actor = Act {external_mailbox = emb,
                          internal_mailbox = imb,
                          internal_mailbox_barrier = barrier,
                          hashTable = ht,
                          hashOp = hop,
                          memoTable = memTble,
                          thread_id = tid}
       ; return actor
       }

runActor :: Act msg -> (Act msg -> IO ()) -> IO ()
runActor (actor@(Act {thread_id = tid})) compiled_actor =
    do { forkIO ( do { actual_tid <- myThreadId
                     ; putMVar tid actual_tid
                     ; compiled_actor actor } )
       ; return ()
       }

{-
newActor :: (Act msg -> IO ()) -> IO (Act msg)
we don't support this functionality anymore
must use createActor and runActor instead
-}



instance (EMatch msg, Eq msg, Show msg) => Actor (Act msg) 
                                       msg (Location msg)
                                       HashIdx (QIterator (InternalMsg msg)) where
-- get new active message:
-- (1) message may be available in the internal store, check barrier
--      Check if barrier points to active node
---(2) Otherwise, read from external mailbox (blocking)
--getMessage :: Act msg -> IO (Location msg)
  getMessage (act@(Act {external_mailbox = emb,
                   internal_mailbox = imb,
                   internal_mailbox_barrier = barrier })) time = 
    do curNode <- iterateSList barrier
       case curNode of
          Just curPtr -> return (Just curPtr)
          Nothing -> -- we must have hit the tail
           let -- read from external mailbox, possibly apply time-out
               checkTimeout Nothing = do 
                                         r <- readChan emb
                                         
                                         return (Just r)
               checkTimeout (Just t) = timeout t (readChan emb)
           in do check <- checkTimeout time
                 case check of
                   Nothing -> return Nothing     -- time-out applied

                   Just  untagged_new_msg ->
                    do
                       -- add newly received message to store
                      new_tag <- newTag
                      let new_msg = InternalMsg {message = untagged_new_msg, msg_tag = new_tag}
                      curPtr <- addToTail imb new_msg
                      iterateSList barrier -- bump up the barrier

                      -- add to appropriate hash table
                      let hashIdx = (hashMsg (hashOp act)) untagged_new_msg
                      let qlist = (hashTable act) !! (hashIdx -1)
                      enQueue qlist curPtr
 
                      return (Just curPtr)
       

--getIndex :: Act msg -> msg  -> IO HashIdx
  getIndex act m = return ((hashMsg (hashOp act)) m)


-- we perform a systematic search of the hash table (a queue), 
-- we use an iterator, a pointer to a pointer, via which we scan
-- the queue from 'oldest' (head) to 'newest' (tail)
--initSearch :: Act msg -> HashIdx -> IO (QIterator (InternalMsg msg))
  initSearch  act idx =
     do let qlist = (hashTable act) !! (idx -1) 
        it <- newQIterator qlist  
        return it


-- we use the iterator to scan through the queue 
--nextMsg :: Act msg -> (QIterator (InternalMsg msg)) -> IO (Maybe (Location msg))
  nextMsg act curIterator =
    do res <- iterateQList curIterator
       return res


-- we only logically delete a message, the actual physical delete (in the store and hash table)
-- happens when applying getMessage (iterateSList) and newMsg (iterateQList)
--deleteMsg :: Act mst -> (Location msg) -> IO ()
  deleteMsg _ ptr = 
    do node <- readIORef ptr
       case node of
         Null -> error "deleteMsg: something went wrong"
         Node {val = x, next = n} -> writeIORef ptr (Node {val = x, deleted = True, next = n})

-- we reset the barrier to the head/beginning of the store
-- NOTE: no need to "reset" hashtables, already stored messages remain
-- where they are, same applies to hashed messages
--resetMB :: Act msg -> IO ()
  resetMB (Act {internal_mailbox = imb, 
              internal_mailbox_barrier = barrier}) =  
     do restartIt <- newIterator imb
        assignIterator barrier restartIt

--extractMsg :: Location msg -> IO (InternalMsg msg)
  extractMsg ptr =
    do node <- readIORef ptr
       return (val node)

  codeLookup (Act {memoTable = memTble}) line_no =
      do tble <- readIORef memTble
         return (lookup line_no tble)

  memoCode (Act {memoTable = memTble}) (x@(line_no,code)) =
      do tble <- readIORef memTble
         writeIORef memTble (x:tble)
         return ()