{-# LANGUAGE ApplicativeDo #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
module Control.Monad.Schedule.Class where


-- base
import Control.Arrow
import Control.Concurrent
import Control.Monad (void)
import Control.Monad.IO.Class
import Data.Either
import Data.Foldable (fold, forM_)
import Data.Function
import Data.Functor.Identity
import Data.Kind (Type)
import Data.List.NonEmpty hiding (length)
import Data.Maybe (fromJust)
import Data.Void
import Prelude hiding (map, zip)
import Unsafe.Coerce (unsafeCoerce)

import qualified Data.List.NonEmpty as NonEmpty

-- transformers
import Control.Monad.Trans.Accum
import Control.Monad.Trans.Class
import Control.Monad.Trans.Cont
import Control.Monad.Trans.Except
import Control.Monad.Trans.Identity
import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Reader
import qualified Control.Monad.Trans.Writer.CPS as CPSWriter
import qualified Control.Monad.Trans.Writer.Lazy as LazyWriter
import qualified Control.Monad.Trans.Writer.Strict as StrictWriter

{- | 'Monad's in which actions can be scheduled concurrently.

@'schedule' actions@ is expected to run @actions@ concurrently,
whatever that means for a particular monad @m@.
'schedule' does not return before at least one value has finished,
and the returned values @'NonEmpty' a@ are all those that finish first.
The actions @[m a]@ (possibly empty) are the remaining, still running ones.
Executing any of them is expected to be blocking,
and awaits the return of the corresponding action.

A lawful instance is considered to satisfy these conditions:

  * The set of returned values is invariant under scheduling.
    In other words, @sequence@ will result in the same set of values as @scheduleAndFinish@.
'schedule' thus can be thought of as a concurrency-utilizing version of 'sequence'.
-}
class MonadSchedule m where
  -- | Run the actions concurrently,
  --   and return the result of the first finishers,
  --   together with completions for the unfinished actions.
  schedule :: NonEmpty (m a) -> m (NonEmpty a, [m a])

-- | Keeps 'schedule'ing actions until all are finished.
--   Returns the same set of values as 'sequence',
--   but utilises concurrency and may thus change the order of the values.
scheduleAndFinish :: (Monad m, MonadSchedule m) => NonEmpty (m a) -> m (NonEmpty a)
scheduleAndFinish :: forall (m :: * -> *) a.
(Monad m, MonadSchedule m) =>
NonEmpty (m a) -> m (NonEmpty a)
scheduleAndFinish NonEmpty (m a)
actions = do
  (NonEmpty a
finishedFirst, [m a]
running) <- forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule NonEmpty (m a)
actions
  case [m a]
running of
    [] -> forall (m :: * -> *) a. Monad m => a -> m a
return NonEmpty a
finishedFirst
    (m a
a : [m a]
as) -> do
      NonEmpty a
finishedLater <- forall (m :: * -> *) a.
(Monad m, MonadSchedule m) =>
NonEmpty (m a) -> m (NonEmpty a)
scheduleAndFinish forall a b. (a -> b) -> a -> b
$ m a
a forall a. a -> [a] -> NonEmpty a
:| [m a]
as
      return $ NonEmpty a
finishedFirst forall a. Semigroup a => a -> a -> a
<> NonEmpty a
finishedLater

-- | Uses 'scheduleAndFinish' to execute all actions concurrently,
--   then orders them again.
--   Thus it behaves semantically like 'sequence',
--   but leverages concurrency.
sequenceScheduling :: (Monad m, MonadSchedule m) => NonEmpty (m a) -> m (NonEmpty a)
sequenceScheduling :: forall (m :: * -> *) a.
(Monad m, MonadSchedule m) =>
NonEmpty (m a) -> m (NonEmpty a)
sequenceScheduling
  =   forall a b. NonEmpty a -> NonEmpty b -> NonEmpty (a, b)
zip [Integer
1..]
  forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall a b. (a -> b) -> NonEmpty a -> NonEmpty b
map forall (m :: * -> *) a b. Functor m => (a, m b) -> m (a, b)
strength
  forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (m :: * -> *) a.
(Monad m, MonadSchedule m) =>
NonEmpty (m a) -> m (NonEmpty a)
scheduleAndFinish
  forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall o a. Ord o => (a -> o) -> NonEmpty a -> NonEmpty a
sortWith forall a b. (a, b) -> a
fst forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall a b. (a -> b) -> NonEmpty a -> NonEmpty b
map forall a b. (a, b) -> b
snd)
  where
    strength :: Functor m => (a, m b) -> m (a, b)
    strength :: forall (m :: * -> *) a b. Functor m => (a, m b) -> m (a, b)
strength (a
a, m b
mb) = (a
a, ) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m b
mb

-- | When there are no effects, return all values immediately
instance MonadSchedule Identity where
  schedule :: forall a.
NonEmpty (Identity a) -> Identity (NonEmpty a, [Identity a])
schedule NonEmpty (Identity a)
as = ( , []) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence NonEmpty (Identity a)
as

{- |
Fork all actions concurrently in separate threads and wait for the first one to complete.

Many monadic actions complete at nondeterministic times
(such as event listeners),
and it is thus impossible to schedule them deterministically
with most other actions.
Using concurrency, they can still be scheduled with all other actions in 'IO',
by running them in separate GHC threads.
-}
instance MonadSchedule IO where
  schedule :: forall a. NonEmpty (IO a) -> IO (NonEmpty a, [IO a])
schedule NonEmpty (IO a)
as = do
    MVar a
var <- forall a. IO (MVar a)
newEmptyMVar
    forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ NonEmpty (IO a)
as forall a b. (a -> b) -> a -> b
$ \IO a
action -> IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO ()
putMVar MVar a
var forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO a
action
    a
a <- forall a. MVar a -> IO a
takeMVar MVar a
var
    [a]
as' <- forall a. MVar a -> IO [a]
drain MVar a
var
    let remaining :: [IO a]
remaining = forall a. Int -> a -> [a]
replicate (forall (t :: * -> *) a. Foldable t => t a -> Int
length NonEmpty (IO a)
as forall a. Num a => a -> a -> a
- Int
1 forall a. Num a => a -> a -> a
- forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
as') forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar MVar a
var
    return (a
a forall a. a -> [a] -> NonEmpty a
:| [a]
as', [IO a]
remaining)
      where
        drain :: MVar a -> IO [a]
        drain :: forall a. MVar a -> IO [a]
drain MVar a
var = do
          Maybe a
aMaybe <- forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar a
var
          case Maybe a
aMaybe of
            Just a
a -> do
              [a]
as' <- forall a. MVar a -> IO [a]
drain MVar a
var
              return $ a
a forall a. a -> [a] -> [a]
: [a]
as'
            Maybe a
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return []

-- TODO Needs dependency
-- instance MonadSchedule STM where

-- | Pass through the scheduling functionality of the underlying monad
instance (Functor m, MonadSchedule m) => MonadSchedule (IdentityT m) where
  schedule :: forall a.
NonEmpty (IdentityT m a)
-> IdentityT m (NonEmpty a, [IdentityT m a])
schedule
    =   forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall {k} (f :: k -> *) (a :: k). IdentityT f a -> f a
runIdentityT
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall {k} (f :: k -> *) (a :: k). f a -> IdentityT f a
IdentityT))
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall {k} (f :: k -> *) (a :: k). f a -> IdentityT f a
IdentityT

-- | Write in the order of scheduling:
--   The first actions to return write first.
instance (Monoid w, Functor m, MonadSchedule m) => MonadSchedule (LazyWriter.WriterT w m) where
  schedule :: forall a.
NonEmpty (WriterT w m a)
-> WriterT w m (NonEmpty a, [WriterT w m a])
schedule = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall w (m :: * -> *) a. WriterT w m a -> m (a, w)
LazyWriter.runWriterT
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (b, d) (c, d)
first (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> a
fst forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> b
snd forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold)) forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall a c. ((a, w), c) -> ((a, c), w)
assoc forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (b, d) (c, d)
first (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall w (m :: * -> *) a. m (a, w) -> WriterT w m a
LazyWriter.WriterT))
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall w (m :: * -> *) a. m (a, w) -> WriterT w m a
LazyWriter.WriterT
    where
      assoc :: ((a, w), c) -> ((a, c), w)
      assoc :: forall a c. ((a, w), c) -> ((a, c), w)
assoc ((a
a, w
w), c
c) = ((a
a, c
c), w
w)

-- | Write in the order of scheduling:
--   The first actions to return write first.
instance (Monoid w, Functor m, MonadSchedule m) => MonadSchedule (StrictWriter.WriterT w m) where
  schedule :: forall a.
NonEmpty (WriterT w m a)
-> WriterT w m (NonEmpty a, [WriterT w m a])
schedule = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall w (m :: * -> *) a. WriterT w m a -> m (a, w)
StrictWriter.runWriterT
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (b, d) (c, d)
first (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> a
fst forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> b
snd forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold)) forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall a c. ((a, w), c) -> ((a, c), w)
assoc forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (b, d) (c, d)
first (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall w (m :: * -> *) a. m (a, w) -> WriterT w m a
StrictWriter.WriterT))
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall w (m :: * -> *) a. m (a, w) -> WriterT w m a
StrictWriter.WriterT
    where
      assoc :: ((a, w), c) -> ((a, c), w)
      assoc :: forall a c. ((a, w), c) -> ((a, c), w)
assoc ((a
a, w
w), c
c) = ((a
a, c
c), w
w)

-- | Write in the order of scheduling:
--   The first actions to return write first.
instance (Monoid w, Functor m, MonadSchedule m) => MonadSchedule (CPSWriter.WriterT w m) where
  schedule :: forall a.
NonEmpty (WriterT w m a)
-> WriterT w m (NonEmpty a, [WriterT w m a])
schedule = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall w (m :: * -> *) a. Monoid w => WriterT w m a -> m (a, w)
CPSWriter.runWriterT
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (b, d) (c, d)
first (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> a
fst forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> b
snd forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold)) forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall a c. ((a, w), c) -> ((a, c), w)
assoc forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (b, d) (c, d)
first (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (m :: * -> *) w a.
(Functor m, Monoid w) =>
m (a, w) -> WriterT w m a
CPSWriter.writerT))
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (m :: * -> *) w a.
(Functor m, Monoid w) =>
m (a, w) -> WriterT w m a
CPSWriter.writerT
    where
      assoc :: ((a, w), c) -> ((a, c), w)
      assoc :: forall a c. ((a, w), c) -> ((a, c), w)
assoc ((a
a, w
w), c
c) = ((a
a, c
c), w
w)

-- | Broadcast the same environment to all actions.
--   The continuations keep this initial environment.
instance (Monad m, MonadSchedule m) => MonadSchedule (ReaderT r m) where
  schedule :: forall a.
NonEmpty (ReaderT r m a)
-> ReaderT r m (NonEmpty a, [ReaderT r m a])
schedule NonEmpty (ReaderT r m a)
actions = forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT forall a b. (a -> b) -> a -> b
$ \r
r
    -> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
`runReaderT` r
r) NonEmpty (ReaderT r m a)
actions
    forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule
    forall a b. a -> (a -> b) -> b
& forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift)

-- | Combination of 'WriterT' and 'ReaderT'.
--   Pass the same initial environment to all actions
--   and write to the log in the order of scheduling in @m@.
instance (Monoid w, Monad m, MonadSchedule m) => MonadSchedule (AccumT w m) where
  schedule :: forall a.
NonEmpty (AccumT w m a) -> AccumT w m (NonEmpty a, [AccumT w m a])
schedule NonEmpty (AccumT w m a)
actions = forall w (m :: * -> *) a. (w -> m (a, w)) -> AccumT w m a
AccumT forall a b. (a -> b) -> a -> b
$ \w
w
    -> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall w (m :: * -> *) a. AccumT w m a -> w -> m (a, w)
`runAccumT` w
w) NonEmpty (AccumT w m a)
actions
    forall a b. a -> (a -> b) -> b
& forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule
    forall a b. a -> (a -> b) -> b
& forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a.
Monoid w =>
(NonEmpty (a, w), [m (a, w)]) -> ((NonEmpty a, [AccumT w m a]), w)
collectWritesAndWrap
    where
      collectWritesAndWrap ::
        Monoid w =>
        (NonEmpty (a, w), [m (a, w)]) ->
        ((NonEmpty a, [AccumT w m a]), w)
      collectWritesAndWrap :: forall a.
Monoid w =>
(NonEmpty (a, w), [m (a, w)]) -> ((NonEmpty a, [AccumT w m a]), w)
collectWritesAndWrap (NonEmpty (a, w)
finished, [m (a, w)]
running) =
        let (NonEmpty a
as, NonEmpty w
logs) = forall (f :: * -> *) a b. Functor f => f (a, b) -> (f a, f b)
NonEmpty.unzip NonEmpty (a, w)
finished
        in ((NonEmpty a
as, forall w (m :: * -> *) a. (w -> m (a, w)) -> AccumT w m a
AccumT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> b -> a
const forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [m (a, w)]
running), forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold NonEmpty w
logs)

-- | Schedule all actions according to @m@ and in case of exceptions
--   throw the first exception of the immediately returning actions.
instance (Monad m, MonadSchedule m) => MonadSchedule (ExceptT e m) where
  schedule :: forall a.
NonEmpty (ExceptT e m a)
-> ExceptT e m (NonEmpty a, [ExceptT e m a])
schedule
    =   forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
sequenceA forall (a :: * -> * -> *) b c b' c'.
Arrow a =>
a b c -> a b' c' -> a (b, b') (c, c')
*** forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT) forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall a b. (Either e a, b) -> Either e (a, b)
extrudeEither)
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT
    where
      extrudeEither :: (Either e a, b) -> Either e (a, b)
      extrudeEither :: forall a b. (Either e a, b) -> Either e (a, b)
extrudeEither (Either e a
ea, b
b) = (, b
b) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either e a
ea

instance (Monad m, MonadSchedule m) => MonadSchedule (MaybeT m) where
  schedule :: forall a.
NonEmpty (MaybeT m a) -> MaybeT m (NonEmpty a, [MaybeT m a])
schedule
    =   forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (m :: * -> *) e a.
Functor m =>
e -> MaybeT m a -> ExceptT e m a
maybeToExceptT ())
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (m :: * -> *) e a. Functor m => ExceptT e m a -> MaybeT m a
exceptToMaybeT
    forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (m :: * -> *) e a. Functor m => ExceptT e m a -> MaybeT m a
exceptToMaybeT)

-- instance (Monad m, MonadSchedule m) => MonadSchedule (ContT r m) where
--   schedule actions = ContT $ \scheduler
--     -> fmap (runContT >>> _) actions
--     & schedule
--     & _

-- | Runs two values in a 'MonadSchedule' concurrently
--   and returns the first one that yields a value
--   and a continuation for the other value.
race
  :: (Monad m, MonadSchedule m)
  => m a -> m b
  -> m (Either (a, m b) (m a, b))
race :: forall (m :: * -> *) a b.
(Monad m, MonadSchedule m) =>
m a -> m b -> m (Either (a, m b) (m a, b))
race m a
aM m b
bM = forall (m :: * -> *) a b.
Monad m =>
(NonEmpty (Either a b), [m (Either a b)])
-> Either (a, m b) (m a, b)
recoverResult forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule ((forall a b. a -> Either a b
Left forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m a
aM) forall a. a -> [a] -> NonEmpty a
:| [forall a b. b -> Either a b
Right forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m b
bM])
  where
    recoverResult :: Monad m => (NonEmpty (Either a b), [m (Either a b)]) -> Either (a, m b) (m a, b)
    recoverResult :: forall (m :: * -> *) a b.
Monad m =>
(NonEmpty (Either a b), [m (Either a b)])
-> Either (a, m b) (m a, b)
recoverResult (Left a
a :| [], [Item [m (Either a b)]
bM']) = forall a b. a -> Either a b
Left (a
a, forall b a. b -> Either a b -> b
fromRight forall {a}. a
e forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Item [m (Either a b)]
bM')
    recoverResult (Right b
b :| [], [Item [m (Either a b)]
aM']) = forall a b. b -> Either a b
Right (forall a b. a -> Either a b -> a
fromLeft forall {a}. a
e forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Item [m (Either a b)]
aM', b
b)
    recoverResult (Left a
a :| [Right b
b], []) = forall a b. a -> Either a b
Left (a
a, forall (m :: * -> *) a. Monad m => a -> m a
return b
b)
    recoverResult (Right b
b :| [Left a
a], []) = forall a b. b -> Either a b
Right (forall (m :: * -> *) a. Monad m => a -> m a
return a
a, b
b)
    recoverResult (NonEmpty (Either a b), [m (Either a b)])
_ = forall {a}. a
e
    e :: a
e = forall a. HasCallStack => [Char] -> a
error [Char]
"race: Internal error"

-- FIXME I should only need Selective
-- | Runs both schedules concurrently and returns their results at the end.
async
  :: (Monad m, MonadSchedule m)
  => m  a -> m b
  -> m (a,     b)
async :: forall (m :: * -> *) a b.
(Monad m, MonadSchedule m) =>
m a -> m b -> m (a, b)
async m a
aSched m b
bSched = do
  Either (a, m b) (m a, b)
ab <- forall (m :: * -> *) a b.
(Monad m, MonadSchedule m) =>
m a -> m b -> m (Either (a, m b) (m a, b))
race m a
aSched m b
bSched
  case Either (a, m b) (m a, b)
ab of
    Left  (a
a, m b
bCont) -> do
      b
b <- m b
bCont
      return (a
a, b
b)
    Right (m a
aCont, b
b) -> do
      a
a <- m a
aCont
      return (a
a, b
b)