{-|
Module      : Control.Concurrent.Concurrential
Description : Description of concurrent computation with sequential components. 
Copyright   : (c) Alexander Vieth, 2015
Licence     : BSD3
Maintainer  : aovieth@gmail.com
Stability   : experimental
Portability : non-portable (GHC only)

The functions @sequentially@ and @concurrently@ inject @IO@ terms into the
@ConcurrentialAp@ applicative functor, whose applicative instance will exploit
as much concurrency as possible such that all @sequentially@ terms will be run
in the order in which they would have been run had they been typical IOs.

Terms of @ConcurrentialAp@ can be transformed into terms of @Concurrential@,
which is a monad. The order of sequential terms is respected even through
binds; a sequential term will not be evaluted until all binds appearing
syntactically earlier than it have been expanded.
-}

{-# LANGUAGE GADTs #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE AutoDeriveTypeable #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Control.Concurrent.Concurrential (

    Concurrential
  , ConcurrentialAp(ConcurrentialAp)

  , runConcurrential

  , sequentially
  , concurrently
  , concurrentially

  , wait

  ) where

import Control.Applicative
import Control.Monad
import Control.Concurrent.MVar
import Control.Concurrent.Async hiding (concurrently)

-- | An Async without a type parameter, which can be waited for.
data SomeAsync where
  SomeAsync :: Async a -> SomeAsync

-- | Wait for a SomeAsync to complete.
waitSomeAsync :: SomeAsync -> IO ()
waitSomeAsync (SomeAsync async) = wait async >> return ()

-- | Our own Identity functor, so that we don't have to depend upon some
--   other package.
newtype Identity a = Identity {
    runIdentity :: a
  } deriving (Functor)

instance Applicative Identity where
  pure = Identity
  f <*> x = Identity $ (runIdentity f) (runIdentity x)

instance Monad Identity where
  return = Identity
  x >>= k = Identity $ (runIdentity . k) (runIdentity x)

-- | Description of the way in which a monadic term's evaluation should be
--   carried out.
data Choice m t = Sequential (m t) | Concurrent (m t)

instance Functor m => Functor (Choice m) where
  fmap f choice = case choice of
      Sequential io -> Sequential $ fmap f io
      Concurrent io -> Concurrent $ fmap f io

-- | Description of computation which is composed of sequential and concurrent
--   parts.
data Concurrential t where
    SCAtom :: Choice IO t -> Concurrential t
    SCBind :: Concurrential s -> (s -> Concurrential t) -> Concurrential t
    SCAp :: Concurrential (r -> t) -> Concurrential r -> Concurrential t

instance Functor Concurrential where
  fmap f sc = case sc of
    SCAtom choice -> SCAtom $ fmap f choice
    SCBind sc k -> SCBind sc ((fmap . fmap) f k)
    SCAp sf sx -> SCAp ((fmap . fmap) f sf) sx

instance Applicative Concurrential where
  pure = SCAtom . Sequential . pure
  cf <*> cx = SCBind cf (\f -> SCBind cx (\x -> pure (f x)))

instance Monad Concurrential where
  return = pure
  (>>=) = SCBind

-- | Concurrential without a Monad instance, but an Applicative instance
--   which exploits concurrency.
newtype ConcurrentialAp t = ConcurrentialAp {
    unConcurrentialAp :: Concurrential t
  }

instance Functor ConcurrentialAp where
  fmap f sc = ConcurrentialAp $ fmap f (unConcurrentialAp sc)

instance Applicative ConcurrentialAp where
  pure = ConcurrentialAp . pure
  cf <*> cx = ConcurrentialAp $ SCAp (unConcurrentialAp cf) (unConcurrentialAp cx)

-- | Run a Concurrential term with a continuation. We choose CPS here because
--   it allows us to explot @withAsync@, giving us a guarantee that an
--   exception in a spawning thread will kill spawned threads.
runConcurrentialK
  :: Concurrential t
  -- ^ The computation to run.
  -> SomeAsync
  -- ^ The sequential part.
  -> ((SomeAsync, Async t) -> IO r)
  -- ^ The continuation; fst is sequential part, snd is value part.
  -> IO r
runConcurrentialK cc sequentialPart k = case cc of
    SCAtom choice -> case choice of
        -- The async created becomes the sequential part and the value
        -- part. So when another Sequential is encountered, its value part
        -- will have to wait for this computation to complete.
        Sequential em -> withAsync
                         (waitSomeAsync sequentialPart >> em)
                         (\async -> k (SomeAsync async, async))
        -- The async created is the value part, but the sequential part
        -- remains the same.
        Concurrent em -> withAsync
                         (em)
                         (\async -> k (sequentialPart, async))

    SCBind sc next ->
        runConcurrentialK sc sequentialPart $ \(sequentialPart, asyncS) -> do
        synchronizeSequentialPart <- newEmptyMVar
        let waitAndContinue = do
                s <- wait asyncS
                let synchronizeAndWait = \(sequentialPart, valuePart) -> do
                        putMVar synchronizeSequentialPart sequentialPart
                        wait valuePart
                let continue = \x ->
                        runConcurrentialK
                        (next x)
                        sequentialPart
                        synchronizeAndWait
                continue s
        -- This is a very sensitive part of the definition. We fire off a thread
        -- to wait for @asyncS@ and then continue through @next@, but we also
        -- create a thread which blocks until the aforementioned has determined
        -- what is the sequential part of the computation through @next@, as
        -- we need that in order to call the continuation @k@.
        -- We don't actually have to carry the sequential part through; we just
        -- need to create another SomeAsync which waits for that sequential
        -- part. To achieve this, we use an MVar.
        withAsync waitAndContinue $ \async ->
          withAsync (takeMVar synchronizeSequentialPart >>= waitSomeAsync) $ \sequentialPart ->
            k (SomeAsync sequentialPart, async)

    SCAp left right ->
        runConcurrentialK left sequentialPart $ \(sequentialPart, asyncF) ->
        runConcurrentialK right sequentialPart $ \(sequentialPart, asyncX) ->
        let waitAndApply = do
                f <- wait asyncF
                x <- wait asyncX
                return $ f x
        in  withAsync waitAndApply (\async -> k (sequentialPart, async))

-- | Run a Concurrential term, realizing the effects of the IO terms which
--   compose it.
runConcurrential
  :: Concurrential t
  -> (Async t -> IO r)
  -- ^ Similar contract to withAsync; the Async argument is useless outside of
  -- this function.
  -> IO r
runConcurrential cc k = do
    let action = \sequentialPart ->
            runConcurrentialK cc (SomeAsync sequentialPart) (k . snd)
    withAsync (return ()) action

-- | Create an effect which must be run sequentially.
--   If a @sequentially io@ appears in a @Concurrential t@ term then it will
--   always be run to completion before any later sequential part of the term
--   is run. Consider the following terms:
--
--   @
--     a = someConcurrential *> sequentially io *> someOtherConcurrential
--     b = someConcurrential *> concurrently io *> someOtherConcurrential
--     c = someConcurrential *> sequentially io *> concurrently otherIo
--   @
--
--   When running the term @a@, we are guaranteed that @io@ is completed before
--   any sequential part of @someOtherConcurrential@ is begun, but when running
--   the term @b@, this is not the case; @io@ may be interleaved with or even
--   run after any part of @someOtherConcurrential@. The term @c@ highlights an
--   important point: @concurrently otherIo@ may be run before, during or after
--   @sequentially io@! The ordering through applicative combinators is
--   guaranteed only among sequential terms.
sequentially :: IO t -> ConcurrentialAp t
sequentially = ConcurrentialAp . SCAtom . Sequential

-- | Create an effect which is run concurrently where possible, i.e. whenever it
--   combined applicatively with other terms. For instance:
--
--   @
--     a = concurrently io *> someConcurrential
--     b = concurrently io >> someConcurrential
--   @
--
--   When running the term @a@, the IO term @io@ will be run concurrently with
--   @someConcurrential@, but not so in @b@, because monadic composition has
--   been used.
concurrently :: IO t -> ConcurrentialAp t
concurrently = ConcurrentialAp . SCAtom . Concurrent

-- | Inject a ConcurrentialAp into Concurrential, losing the
--   concurrency-enabling Applicative instance but gaining a Monad instance.
concurrentially :: ConcurrentialAp t -> Concurrential t
concurrentially = unConcurrentialAp