{-# LANGUAGE ApplicativeDo #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
module Control.Monad.Schedule.Class where


-- base
import Control.Arrow
import Control.Concurrent
import Data.Either
import Data.Foldable (fold, forM_)
import Data.List.NonEmpty hiding (length)
import Data.Function
import Data.Kind (Type)
import Data.Void

-- transformers
import Control.Monad.Trans.Accum
import Control.Monad.Trans.Class
import Control.Monad.Trans.Writer
import Control.Monad.Trans.Reader
import qualified Data.List.NonEmpty as NonEmpty
import Control.Monad.Trans.Cont
import Control.Monad (void)
import Unsafe.Coerce (unsafeCoerce)
import Data.Functor.Identity
import Data.Maybe (fromJust)
import Prelude hiding (map, zip)
import Control.Monad.IO.Class
import Control.Monad.Trans.Except
import Control.Monad.Trans.Maybe

{- | 'Monad's in which actions can be scheduled concurrently.

@'schedule' actions@ is expected to run @actions@ concurrently,
whatever that means for a particular monad @m@.
'schedule' does not return before at least one value has finished,
and the returned values @'NonEmpty' a@ are all those that finish first.
The actions @[m a]@ (possibly empty) are the remaining, still running ones.
Executing any of them is expected to be blocking,
and awaits the return of the corresponding action.

A lawful instance is considered to satisfy these conditions:

  * The set of returned values is invariant under scheduling.
    In other words, @sequence@ will result in the same set of values as @scheduleAndFinish@.
'schedule' thus can be thought of as a concurrency-utilizing version of 'sequence'.
-}
class MonadSchedule m where
  -- | Run the actions concurrently,
  --   and return the result of the first finishers,
  --   together with completions for the unfinished actions.
  schedule :: NonEmpty (m a) -> m (NonEmpty a, [m a])

-- | Keeps 'schedule'ing actions until all are finished.
--   Returns the same set of values as 'sequence',
--   but utilises concurrency and may thus change the order of the values.
scheduleAndFinish :: (Monad m, MonadSchedule m) => NonEmpty (m a) -> m (NonEmpty a)
scheduleAndFinish :: NonEmpty (m a) -> m (NonEmpty a)
scheduleAndFinish NonEmpty (m a)
actions = do
  (NonEmpty a
finishedFirst, [m a]
running) <- NonEmpty (m a) -> m (NonEmpty a, [m a])
forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule NonEmpty (m a)
actions
  case [m a]
running of
    [] -> NonEmpty a -> m (NonEmpty a)
forall (m :: * -> *) a. Monad m => a -> m a
return NonEmpty a
finishedFirst
    (m a
a : [m a]
as) -> do
      NonEmpty a
finishedLater <- NonEmpty (m a) -> m (NonEmpty a)
forall (m :: * -> *) a.
(Monad m, MonadSchedule m) =>
NonEmpty (m a) -> m (NonEmpty a)
scheduleAndFinish (NonEmpty (m a) -> m (NonEmpty a))
-> NonEmpty (m a) -> m (NonEmpty a)
forall a b. (a -> b) -> a -> b
$ m a
a m a -> [m a] -> NonEmpty (m a)
forall a. a -> [a] -> NonEmpty a
:| [m a]
as
      return $ NonEmpty a
finishedFirst NonEmpty a -> NonEmpty a -> NonEmpty a
forall a. Semigroup a => a -> a -> a
<> NonEmpty a
finishedLater

-- | Uses 'scheduleAndFinish' to execute all actions concurrently,
--   then orders them again.
--   Thus it behaves semantically like 'sequence',
--   but leverages concurrency.
sequenceScheduling :: (Monad m, MonadSchedule m) => NonEmpty (m a) -> m (NonEmpty a)
sequenceScheduling :: NonEmpty (m a) -> m (NonEmpty a)
sequenceScheduling
  =   NonEmpty Integer -> NonEmpty (m a) -> NonEmpty (Integer, m a)
forall a b. NonEmpty a -> NonEmpty b -> NonEmpty (a, b)
zip [Item (NonEmpty Integer)
1..]
  (NonEmpty (m a) -> NonEmpty (Integer, m a))
-> (NonEmpty (Integer, m a) -> m (NonEmpty a))
-> NonEmpty (m a)
-> m (NonEmpty a)
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> ((Integer, m a) -> m (Integer, a))
-> NonEmpty (Integer, m a) -> NonEmpty (m (Integer, a))
forall a b. (a -> b) -> NonEmpty a -> NonEmpty b
map (Integer, m a) -> m (Integer, a)
forall (m :: * -> *) a b. Functor m => (a, m b) -> m (a, b)
strength
  (NonEmpty (Integer, m a) -> NonEmpty (m (Integer, a)))
-> (NonEmpty (m (Integer, a)) -> m (NonEmpty a))
-> NonEmpty (Integer, m a)
-> m (NonEmpty a)
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> NonEmpty (m (Integer, a)) -> m (NonEmpty (Integer, a))
forall (m :: * -> *) a.
(Monad m, MonadSchedule m) =>
NonEmpty (m a) -> m (NonEmpty a)
scheduleAndFinish
  (NonEmpty (m (Integer, a)) -> m (NonEmpty (Integer, a)))
-> (m (NonEmpty (Integer, a)) -> m (NonEmpty a))
-> NonEmpty (m (Integer, a))
-> m (NonEmpty a)
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> (NonEmpty (Integer, a) -> NonEmpty a)
-> m (NonEmpty (Integer, a)) -> m (NonEmpty a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (((Integer, a) -> Integer)
-> NonEmpty (Integer, a) -> NonEmpty (Integer, a)
forall o a. Ord o => (a -> o) -> NonEmpty a -> NonEmpty a
sortWith (Integer, a) -> Integer
forall a b. (a, b) -> a
fst (NonEmpty (Integer, a) -> NonEmpty (Integer, a))
-> (NonEmpty (Integer, a) -> NonEmpty a)
-> NonEmpty (Integer, a)
-> NonEmpty a
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> ((Integer, a) -> a) -> NonEmpty (Integer, a) -> NonEmpty a
forall a b. (a -> b) -> NonEmpty a -> NonEmpty b
map (Integer, a) -> a
forall a b. (a, b) -> b
snd)
  where
    strength :: Functor m => (a, m b) -> m (a, b)
    strength :: (a, m b) -> m (a, b)
strength (a
a, m b
mb) = (a
a, ) (b -> (a, b)) -> m b -> m (a, b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m b
mb

{- |
Fork all actions concurrently in separate threads and wait for the first one to complete.

Many monadic actions complete at nondeterministic times
(such as event listeners),
and it is thus impossible to schedule them deterministically
with most other actions.
Using concurrency, they can still be scheduled with all other actions in 'IO',
by running them in separate GHC threads.
-}
instance MonadSchedule IO where
  schedule :: NonEmpty (IO a) -> IO (NonEmpty a, [IO a])
schedule NonEmpty (IO a)
as = do
    MVar a
var <- IO (MVar a)
forall a. IO (MVar a)
newEmptyMVar
    NonEmpty (IO a) -> (IO a -> IO ThreadId) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ NonEmpty (IO a)
as ((IO a -> IO ThreadId) -> IO ()) -> (IO a -> IO ThreadId) -> IO ()
forall a b. (a -> b) -> a -> b
$ \IO a
action -> IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
var (a -> IO ()) -> IO a -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO a
action
    a
a <- MVar a -> IO a
forall a. MVar a -> IO a
takeMVar MVar a
var
    [a]
as' <- MVar a -> IO [a]
forall a. MVar a -> IO [a]
drain MVar a
var
    let remaining :: [IO a]
remaining = Int -> IO a -> [IO a]
forall a. Int -> a -> [a]
replicate (NonEmpty (IO a) -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length NonEmpty (IO a)
as Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1 Int -> Int -> Int
forall a. Num a => a -> a -> a
- [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [a]
as') (IO a -> [IO a]) -> IO a -> [IO a]
forall a b. (a -> b) -> a -> b
$ MVar a -> IO a
forall a. MVar a -> IO a
takeMVar MVar a
var
    return (a
a a -> [a] -> NonEmpty a
forall a. a -> [a] -> NonEmpty a
:| [a]
as', [IO a]
remaining)
      where
        drain :: MVar a -> IO [a]
        drain :: MVar a -> IO [a]
drain MVar a
var = do
          Maybe a
aMaybe <- MVar a -> IO (Maybe a)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar a
var
          case Maybe a
aMaybe of
            Just a
a -> do
              [a]
as' <- MVar a -> IO [a]
forall a. MVar a -> IO [a]
drain MVar a
var
              return $ a
a a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
as'
            Maybe a
Nothing -> [a] -> IO [a]
forall (m :: * -> *) a. Monad m => a -> m a
return []

-- TODO Needs dependency
-- instance MonadSchedule STM where

-- | Write in the order of scheduling:
--   The first actions to return write first.
instance (Monoid w, Functor m, MonadSchedule m) => MonadSchedule (WriterT w m) where
  schedule :: NonEmpty (WriterT w m a)
-> WriterT w m (NonEmpty a, [WriterT w m a])
schedule = (WriterT w m a -> m (a, w))
-> NonEmpty (WriterT w m a) -> NonEmpty (m (a, w))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap WriterT w m a -> m (a, w)
forall w (m :: * -> *) a. WriterT w m a -> m (a, w)
runWriterT
    (NonEmpty (WriterT w m a) -> NonEmpty (m (a, w)))
-> (NonEmpty (m (a, w))
    -> WriterT w m (NonEmpty a, [WriterT w m a]))
-> NonEmpty (WriterT w m a)
-> WriterT w m (NonEmpty a, [WriterT w m a])
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> NonEmpty (m (a, w)) -> m (NonEmpty (a, w), [m (a, w)])
forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule
    (NonEmpty (m (a, w)) -> m (NonEmpty (a, w), [m (a, w)]))
-> (m (NonEmpty (a, w), [m (a, w)])
    -> WriterT w m (NonEmpty a, [WriterT w m a]))
-> NonEmpty (m (a, w))
-> WriterT w m (NonEmpty a, [WriterT w m a])
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> ((NonEmpty (a, w), [m (a, w)])
 -> ((NonEmpty a, [WriterT w m a]), w))
-> m (NonEmpty (a, w), [m (a, w)])
-> m ((NonEmpty a, [WriterT w m a]), w)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((NonEmpty (a, w) -> (NonEmpty a, w))
-> (NonEmpty (a, w), [m (a, w)]) -> ((NonEmpty a, w), [m (a, w)])
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (b, d) (c, d)
first (((a, w) -> a) -> NonEmpty (a, w) -> NonEmpty a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, w) -> a
forall a b. (a, b) -> a
fst (NonEmpty (a, w) -> NonEmpty a)
-> (NonEmpty (a, w) -> w) -> NonEmpty (a, w) -> (NonEmpty a, w)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& (((a, w) -> w) -> NonEmpty (a, w) -> NonEmpty w
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, w) -> w
forall a b. (a, b) -> b
snd (NonEmpty (a, w) -> NonEmpty w)
-> (NonEmpty w -> w) -> NonEmpty (a, w) -> w
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> NonEmpty w -> w
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold)) ((NonEmpty (a, w), [m (a, w)]) -> ((NonEmpty a, w), [m (a, w)]))
-> (((NonEmpty a, w), [m (a, w)])
    -> ((NonEmpty a, [WriterT w m a]), w))
-> (NonEmpty (a, w), [m (a, w)])
-> ((NonEmpty a, [WriterT w m a]), w)
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> ((NonEmpty a, w), [m (a, w)]) -> ((NonEmpty a, [m (a, w)]), w)
forall a c. ((a, w), c) -> ((a, c), w)
assoc (((NonEmpty a, w), [m (a, w)]) -> ((NonEmpty a, [m (a, w)]), w))
-> (((NonEmpty a, [m (a, w)]), w)
    -> ((NonEmpty a, [WriterT w m a]), w))
-> ((NonEmpty a, w), [m (a, w)])
-> ((NonEmpty a, [WriterT w m a]), w)
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> ((NonEmpty a, [m (a, w)]) -> (NonEmpty a, [WriterT w m a]))
-> ((NonEmpty a, [m (a, w)]), w)
-> ((NonEmpty a, [WriterT w m a]), w)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (b, d) (c, d)
first (([m (a, w)] -> [WriterT w m a])
-> (NonEmpty a, [m (a, w)]) -> (NonEmpty a, [WriterT w m a])
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second (([m (a, w)] -> [WriterT w m a])
 -> (NonEmpty a, [m (a, w)]) -> (NonEmpty a, [WriterT w m a]))
-> ([m (a, w)] -> [WriterT w m a])
-> (NonEmpty a, [m (a, w)])
-> (NonEmpty a, [WriterT w m a])
forall a b. (a -> b) -> a -> b
$ (m (a, w) -> WriterT w m a) -> [m (a, w)] -> [WriterT w m a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap m (a, w) -> WriterT w m a
forall w (m :: * -> *) a. m (a, w) -> WriterT w m a
WriterT))
    (m (NonEmpty (a, w), [m (a, w)])
 -> m ((NonEmpty a, [WriterT w m a]), w))
-> (m ((NonEmpty a, [WriterT w m a]), w)
    -> WriterT w m (NonEmpty a, [WriterT w m a]))
-> m (NonEmpty (a, w), [m (a, w)])
-> WriterT w m (NonEmpty a, [WriterT w m a])
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> m ((NonEmpty a, [WriterT w m a]), w)
-> WriterT w m (NonEmpty a, [WriterT w m a])
forall w (m :: * -> *) a. m (a, w) -> WriterT w m a
WriterT
    where
      assoc :: ((a, w), c) -> ((a, c), w)
      assoc :: ((a, w), c) -> ((a, c), w)
assoc ((a
a, w
w), c
c) = ((a
a, c
c), w
w)

-- | Broadcast the same environment to all actions.
--   The continuations keep this initial environment.
instance (Monad m, MonadSchedule m) => MonadSchedule (ReaderT r m) where
  schedule :: NonEmpty (ReaderT r m a)
-> ReaderT r m (NonEmpty a, [ReaderT r m a])
schedule NonEmpty (ReaderT r m a)
actions = (r -> m (NonEmpty a, [ReaderT r m a]))
-> ReaderT r m (NonEmpty a, [ReaderT r m a])
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT ((r -> m (NonEmpty a, [ReaderT r m a]))
 -> ReaderT r m (NonEmpty a, [ReaderT r m a]))
-> (r -> m (NonEmpty a, [ReaderT r m a]))
-> ReaderT r m (NonEmpty a, [ReaderT r m a])
forall a b. (a -> b) -> a -> b
$ \r
r
    -> (ReaderT r m a -> m a)
-> NonEmpty (ReaderT r m a) -> NonEmpty (m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (ReaderT r m a -> r -> m a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
`runReaderT` r
r) NonEmpty (ReaderT r m a)
actions
    NonEmpty (m a)
-> (NonEmpty (m a) -> m (NonEmpty a, [m a]))
-> m (NonEmpty a, [m a])
forall a b. a -> (a -> b) -> b
& NonEmpty (m a) -> m (NonEmpty a, [m a])
forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule
    m (NonEmpty a, [m a])
-> (m (NonEmpty a, [m a]) -> m (NonEmpty a, [ReaderT r m a]))
-> m (NonEmpty a, [ReaderT r m a])
forall a b. a -> (a -> b) -> b
& ((NonEmpty a, [m a]) -> (NonEmpty a, [ReaderT r m a]))
-> m (NonEmpty a, [m a]) -> m (NonEmpty a, [ReaderT r m a])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (([m a] -> [ReaderT r m a])
-> (NonEmpty a, [m a]) -> (NonEmpty a, [ReaderT r m a])
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second (([m a] -> [ReaderT r m a])
 -> (NonEmpty a, [m a]) -> (NonEmpty a, [ReaderT r m a]))
-> ([m a] -> [ReaderT r m a])
-> (NonEmpty a, [m a])
-> (NonEmpty a, [ReaderT r m a])
forall a b. (a -> b) -> a -> b
$ (m a -> ReaderT r m a) -> [m a] -> [ReaderT r m a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap m a -> ReaderT r m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift)

-- | Combination of 'WriterT' and 'ReaderT'.
--   Pass the same initial environment to all actions
--   and write to the log in the order of scheduling in @m@.
instance (Monoid w, Monad m, MonadSchedule m) => MonadSchedule (AccumT w m) where
  schedule :: NonEmpty (AccumT w m a) -> AccumT w m (NonEmpty a, [AccumT w m a])
schedule NonEmpty (AccumT w m a)
actions = (w -> m ((NonEmpty a, [AccumT w m a]), w))
-> AccumT w m (NonEmpty a, [AccumT w m a])
forall w (m :: * -> *) a. (w -> m (a, w)) -> AccumT w m a
AccumT ((w -> m ((NonEmpty a, [AccumT w m a]), w))
 -> AccumT w m (NonEmpty a, [AccumT w m a]))
-> (w -> m ((NonEmpty a, [AccumT w m a]), w))
-> AccumT w m (NonEmpty a, [AccumT w m a])
forall a b. (a -> b) -> a -> b
$ \w
w
    -> (AccumT w m a -> m (a, w))
-> NonEmpty (AccumT w m a) -> NonEmpty (m (a, w))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (AccumT w m a -> w -> m (a, w)
forall w (m :: * -> *) a. AccumT w m a -> w -> m (a, w)
`runAccumT` w
w) NonEmpty (AccumT w m a)
actions
    NonEmpty (m (a, w))
-> (NonEmpty (m (a, w)) -> m (NonEmpty (a, w), [m (a, w)]))
-> m (NonEmpty (a, w), [m (a, w)])
forall a b. a -> (a -> b) -> b
& NonEmpty (m (a, w)) -> m (NonEmpty (a, w), [m (a, w)])
forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule
    m (NonEmpty (a, w), [m (a, w)])
-> (m (NonEmpty (a, w), [m (a, w)])
    -> m ((NonEmpty a, [AccumT w m a]), w))
-> m ((NonEmpty a, [AccumT w m a]), w)
forall a b. a -> (a -> b) -> b
& ((NonEmpty (a, w), [m (a, w)])
 -> ((NonEmpty a, [AccumT w m a]), w))
-> m (NonEmpty (a, w), [m (a, w)])
-> m ((NonEmpty a, [AccumT w m a]), w)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (NonEmpty (a, w), [m (a, w)]) -> ((NonEmpty a, [AccumT w m a]), w)
forall a.
Monoid w =>
(NonEmpty (a, w), [m (a, w)]) -> ((NonEmpty a, [AccumT w m a]), w)
collectWritesAndWrap
    where
      collectWritesAndWrap ::
        Monoid w =>
        (NonEmpty (a, w), [m (a, w)]) ->
        ((NonEmpty a, [AccumT w m a]), w)
      collectWritesAndWrap :: (NonEmpty (a, w), [m (a, w)]) -> ((NonEmpty a, [AccumT w m a]), w)
collectWritesAndWrap (NonEmpty (a, w)
finished, [m (a, w)]
running) =
        let (NonEmpty a
as, NonEmpty w
logs) = NonEmpty (a, w) -> (NonEmpty a, NonEmpty w)
forall (f :: * -> *) a b. Functor f => f (a, b) -> (f a, f b)
NonEmpty.unzip NonEmpty (a, w)
finished
        in ((NonEmpty a
as, (w -> m (a, w)) -> AccumT w m a
forall w (m :: * -> *) a. (w -> m (a, w)) -> AccumT w m a
AccumT ((w -> m (a, w)) -> AccumT w m a)
-> (m (a, w) -> w -> m (a, w)) -> m (a, w) -> AccumT w m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (a, w) -> w -> m (a, w)
forall a b. a -> b -> a
const (m (a, w) -> AccumT w m a) -> [m (a, w)] -> [AccumT w m a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [m (a, w)]
running), NonEmpty w -> w
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold NonEmpty w
logs)

-- | Schedule all actions according to @m@ and in case of exceptions
--   throw the first exception of the immediately returning actions.
instance (Monad m, MonadSchedule m) => MonadSchedule (ExceptT e m) where
  schedule :: NonEmpty (ExceptT e m a)
-> ExceptT e m (NonEmpty a, [ExceptT e m a])
schedule
    =   (ExceptT e m a -> m (Either e a))
-> NonEmpty (ExceptT e m a) -> NonEmpty (m (Either e a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ExceptT e m a -> m (Either e a)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT
    (NonEmpty (ExceptT e m a) -> NonEmpty (m (Either e a)))
-> (NonEmpty (m (Either e a))
    -> ExceptT e m (NonEmpty a, [ExceptT e m a]))
-> NonEmpty (ExceptT e m a)
-> ExceptT e m (NonEmpty a, [ExceptT e m a])
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> NonEmpty (m (Either e a))
-> m (NonEmpty (Either e a), [m (Either e a)])
forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule
    (NonEmpty (m (Either e a))
 -> m (NonEmpty (Either e a), [m (Either e a)]))
-> (m (NonEmpty (Either e a), [m (Either e a)])
    -> ExceptT e m (NonEmpty a, [ExceptT e m a]))
-> NonEmpty (m (Either e a))
-> ExceptT e m (NonEmpty a, [ExceptT e m a])
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> ((NonEmpty (Either e a), [m (Either e a)])
 -> Either e (NonEmpty a, [ExceptT e m a]))
-> m (NonEmpty (Either e a), [m (Either e a)])
-> m (Either e (NonEmpty a, [ExceptT e m a]))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((NonEmpty (Either e a) -> Either e (NonEmpty a)
forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
sequenceA (NonEmpty (Either e a) -> Either e (NonEmpty a))
-> ([m (Either e a)] -> [ExceptT e m a])
-> (NonEmpty (Either e a), [m (Either e a)])
-> (Either e (NonEmpty a), [ExceptT e m a])
forall (a :: * -> * -> *) b c b' c'.
Arrow a =>
a b c -> a b' c' -> a (b, b') (c, c')
*** (m (Either e a) -> ExceptT e m a)
-> [m (Either e a)] -> [ExceptT e m a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap m (Either e a) -> ExceptT e m a
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT) ((NonEmpty (Either e a), [m (Either e a)])
 -> (Either e (NonEmpty a), [ExceptT e m a]))
-> ((Either e (NonEmpty a), [ExceptT e m a])
    -> Either e (NonEmpty a, [ExceptT e m a]))
-> (NonEmpty (Either e a), [m (Either e a)])
-> Either e (NonEmpty a, [ExceptT e m a])
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> (Either e (NonEmpty a), [ExceptT e m a])
-> Either e (NonEmpty a, [ExceptT e m a])
forall a b. (Either e a, b) -> Either e (a, b)
extrudeEither)
    (m (NonEmpty (Either e a), [m (Either e a)])
 -> m (Either e (NonEmpty a, [ExceptT e m a])))
-> (m (Either e (NonEmpty a, [ExceptT e m a]))
    -> ExceptT e m (NonEmpty a, [ExceptT e m a]))
-> m (NonEmpty (Either e a), [m (Either e a)])
-> ExceptT e m (NonEmpty a, [ExceptT e m a])
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> m (Either e (NonEmpty a, [ExceptT e m a]))
-> ExceptT e m (NonEmpty a, [ExceptT e m a])
forall e (m :: * -> *) a. m (Either e a) -> ExceptT e m a
ExceptT
    where
      extrudeEither :: (Either e a, b) -> Either e (a, b)
      extrudeEither :: (Either e a, b) -> Either e (a, b)
extrudeEither (Either e a
ea, b
b) = (, b
b) (a -> (a, b)) -> Either e a -> Either e (a, b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Either e a
ea

instance (Monad m, MonadSchedule m) => MonadSchedule (MaybeT m) where
  schedule :: NonEmpty (MaybeT m a) -> MaybeT m (NonEmpty a, [MaybeT m a])
schedule
    =   (MaybeT m a -> ExceptT () m a)
-> NonEmpty (MaybeT m a) -> NonEmpty (ExceptT () m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (() -> MaybeT m a -> ExceptT () m a
forall (m :: * -> *) e a.
Functor m =>
e -> MaybeT m a -> ExceptT e m a
maybeToExceptT ())
    (NonEmpty (MaybeT m a) -> NonEmpty (ExceptT () m a))
-> (NonEmpty (ExceptT () m a)
    -> MaybeT m (NonEmpty a, [MaybeT m a]))
-> NonEmpty (MaybeT m a)
-> MaybeT m (NonEmpty a, [MaybeT m a])
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> NonEmpty (ExceptT () m a)
-> ExceptT () m (NonEmpty a, [ExceptT () m a])
forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule
    (NonEmpty (ExceptT () m a)
 -> ExceptT () m (NonEmpty a, [ExceptT () m a]))
-> (ExceptT () m (NonEmpty a, [ExceptT () m a])
    -> MaybeT m (NonEmpty a, [MaybeT m a]))
-> NonEmpty (ExceptT () m a)
-> MaybeT m (NonEmpty a, [MaybeT m a])
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> ExceptT () m (NonEmpty a, [ExceptT () m a])
-> MaybeT m (NonEmpty a, [ExceptT () m a])
forall (m :: * -> *) e a. Functor m => ExceptT e m a -> MaybeT m a
exceptToMaybeT
    (ExceptT () m (NonEmpty a, [ExceptT () m a])
 -> MaybeT m (NonEmpty a, [ExceptT () m a]))
-> (MaybeT m (NonEmpty a, [ExceptT () m a])
    -> MaybeT m (NonEmpty a, [MaybeT m a]))
-> ExceptT () m (NonEmpty a, [ExceptT () m a])
-> MaybeT m (NonEmpty a, [MaybeT m a])
forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> ((NonEmpty a, [ExceptT () m a]) -> (NonEmpty a, [MaybeT m a]))
-> MaybeT m (NonEmpty a, [ExceptT () m a])
-> MaybeT m (NonEmpty a, [MaybeT m a])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (([ExceptT () m a] -> [MaybeT m a])
-> (NonEmpty a, [ExceptT () m a]) -> (NonEmpty a, [MaybeT m a])
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second (([ExceptT () m a] -> [MaybeT m a])
 -> (NonEmpty a, [ExceptT () m a]) -> (NonEmpty a, [MaybeT m a]))
-> ([ExceptT () m a] -> [MaybeT m a])
-> (NonEmpty a, [ExceptT () m a])
-> (NonEmpty a, [MaybeT m a])
forall a b. (a -> b) -> a -> b
$ (ExceptT () m a -> MaybeT m a) -> [ExceptT () m a] -> [MaybeT m a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ExceptT () m a -> MaybeT m a
forall (m :: * -> *) e a. Functor m => ExceptT e m a -> MaybeT m a
exceptToMaybeT)

-- instance (Monad m, MonadSchedule m) => MonadSchedule (ContT r m) where
--   schedule actions = ContT $ \scheduler
--     -> fmap (runContT >>> _) actions
--     & schedule
--     & _

-- | Runs two values in a 'MonadSchedule' concurrently
--   and returns the first one that yields a value
--   and a continuation for the other value.
race
  :: (Monad m, MonadSchedule m)
  => m a -> m b
  -> m (Either (a, m b) (m a, b))
race :: m a -> m b -> m (Either (a, m b) (m a, b))
race m a
aM m b
bM = (NonEmpty (Either a b), [m (Either a b)])
-> Either (a, m b) (m a, b)
forall (m :: * -> *) a b.
Monad m =>
(NonEmpty (Either a b), [m (Either a b)])
-> Either (a, m b) (m a, b)
recoverResult ((NonEmpty (Either a b), [m (Either a b)])
 -> Either (a, m b) (m a, b))
-> m (NonEmpty (Either a b), [m (Either a b)])
-> m (Either (a, m b) (m a, b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NonEmpty (m (Either a b))
-> m (NonEmpty (Either a b), [m (Either a b)])
forall (m :: * -> *) a.
MonadSchedule m =>
NonEmpty (m a) -> m (NonEmpty a, [m a])
schedule ((a -> Either a b
forall a b. a -> Either a b
Left (a -> Either a b) -> m a -> m (Either a b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m a
aM) m (Either a b) -> [m (Either a b)] -> NonEmpty (m (Either a b))
forall a. a -> [a] -> NonEmpty a
:| [b -> Either a b
forall a b. b -> Either a b
Right (b -> Either a b) -> m b -> m (Either a b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m b
bM])
  where
    recoverResult :: Monad m => (NonEmpty (Either a b), [m (Either a b)]) -> Either (a, m b) (m a, b)
    recoverResult :: (NonEmpty (Either a b), [m (Either a b)])
-> Either (a, m b) (m a, b)
recoverResult (Left a
a :| [], [Item [m (Either a b)]
bM']) = (a, m b) -> Either (a, m b) (m a, b)
forall a b. a -> Either a b
Left (a
a, b -> Either a b -> b
forall b a. b -> Either a b -> b
fromRight b
forall a. a
e (Either a b -> b) -> m (Either a b) -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (Either a b)
Item [m (Either a b)]
bM')
    recoverResult (Right b
b :| [], [Item [m (Either a b)]
aM']) = (m a, b) -> Either (a, m b) (m a, b)
forall a b. b -> Either a b
Right (a -> Either a b -> a
forall a b. a -> Either a b -> a
fromLeft a
forall a. a
e (Either a b -> a) -> m (Either a b) -> m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m (Either a b)
Item [m (Either a b)]
aM', b
b)
    recoverResult (Left a
a :| [Right b], []) = (a, m b) -> Either (a, m b) (m a, b)
forall a b. a -> Either a b
Left (a
a, b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
b)
    recoverResult (Right b
b :| [Left a], []) = (m a, b) -> Either (a, m b) (m a, b)
forall a b. b -> Either a b
Right (a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a, b
b)
    recoverResult (NonEmpty (Either a b), [m (Either a b)])
_ = Either (a, m b) (m a, b)
forall a. a
e
    e :: a
e = [Char] -> a
forall a. HasCallStack => [Char] -> a
error [Char]
"race: Internal error"

-- FIXME I should only need Selective
-- | Runs both schedules concurrently and returns their results at the end.
async
  :: (Monad m, MonadSchedule m)
  => m  a -> m b
  -> m (a,     b)
async :: m a -> m b -> m (a, b)
async m a
aSched m b
bSched = do
  Either (a, m b) (m a, b)
ab <- m a -> m b -> m (Either (a, m b) (m a, b))
forall (m :: * -> *) a b.
(Monad m, MonadSchedule m) =>
m a -> m b -> m (Either (a, m b) (m a, b))
race m a
aSched m b
bSched
  case Either (a, m b) (m a, b)
ab of
    Left  (a
a, m b
bCont) -> do
      b
b <- m b
bCont
      return (a
a, b
b)
    Right (m a
aCont, b
b) -> do
      a
a <- m a
aCont
      return (a
a, b
b)