{-# LANGUAGE DataKinds              #-}
{-# LANGUAGE FlexibleContexts       #-}
{-# LANGUAGE FlexibleInstances      #-}
{-# LANGUAGE GADTs                  #-}
{-# LANGUAGE MultiParamTypeClasses  #-}
{-# LANGUAGE RankNTypes             #-}
{-# LANGUAGE RecordWildCards        #-}
{-# LANGUAGE ScopedTypeVariables    #-}
{-# LANGUAGE TypeFamilyDependencies #-}
{-# LANGUAGE TypeOperators          #-}
{-# LANGUAGE UndecidableInstances   #-}

-- |
-- Module      :  Network.Ethereum.Contract.Event.SingleFilter
-- Copyright   :  FOAM team <http://foam.space> 2018
-- License     :  Apache-2.0
--
-- Maintainer  :  mail@akru.me
-- Stability   :  experimental
-- Portability :  unportable
--
-- Contract event filters.
--

module Network.Ethereum.Contract.Event.SingleFilter
    (
      event
    , eventMany
    , eventNoFilter
    , eventManyNoFilter
    ) where

import           Control.Concurrent                     (threadDelay)
import           Control.Monad                          (forM, void, when)
import           Control.Monad.IO.Class                 (MonadIO (..))
import           Control.Monad.Trans.Class              (lift)
import           Control.Monad.Trans.Reader             (ReaderT (..))
import           Data.Machine                           (MachineT, asParts,
                                                         autoM, await,
                                                         construct, final,
                                                         repeatedly, runT,
                                                         unfoldPlan, (~>))
import           Data.Machine.Plan                      (PlanT, stop, yield)
import           Data.Maybe                             (catMaybes, listToMaybe)

import           Data.Solidity.Event                    (DecodeEvent (..))
import qualified Network.Ethereum.Api.Eth               as Eth
import           Network.Ethereum.Api.Types             (Change (..),
                                                         DefaultBlock (..),
                                                         Filter (..), Quantity)
import           Network.Ethereum.Contract.Event.Common
import           Network.JsonRpc.TinyClient             (JsonRpc (..))

-- | Run one block at a time.
event :: (DecodeEvent i ni e, JsonRpc m)
       => Filter e
       -> (e -> ReaderT Change m EventAction)
       -> m ()
event :: Filter e -> (e -> ReaderT Change m EventAction) -> m ()
event Filter e
fltr = Filter e -> Integer -> (e -> ReaderT Change m EventAction) -> m ()
forall i ni e (m :: * -> *).
(DecodeEvent i ni e, JsonRpc m) =>
Filter e -> Integer -> (e -> ReaderT Change m EventAction) -> m ()
eventMany Filter e
fltr Integer
0

-- | 'eventMany' take s a filter, a window size, and a handler.
--
-- It runs the handler over the results of 'eventLogs' results using
-- 'reduceEventStream'. If no 'TerminateEvent' action is thrown and
-- the toBlock is not yet reached, it then transitions to polling.
--
eventMany :: (DecodeEvent i ni e, JsonRpc m)
           => Filter e
           -> Integer
           -> (e -> ReaderT Change m EventAction)
           -> m ()
eventMany :: Filter e -> Integer -> (e -> ReaderT Change m EventAction) -> m ()
eventMany Filter e
fltr Integer
window e -> ReaderT Change m EventAction
handler = do
    Quantity
start <- DefaultBlock -> m Quantity
forall (m :: * -> *). JsonRpc m => DefaultBlock -> m Quantity
mkBlockNumber (DefaultBlock -> m Quantity) -> DefaultBlock -> m Quantity
forall a b. (a -> b) -> a -> b
$ Filter e -> DefaultBlock
forall e. Filter e -> DefaultBlock
filterFromBlock Filter e
fltr
    let initState :: FilterStreamState e
initState = FilterStreamState :: forall e. Quantity -> Filter e -> Integer -> FilterStreamState e
FilterStreamState { fssCurrentBlock :: Quantity
fssCurrentBlock = Quantity
start
                                      , fssInitialFilter :: Filter e
fssInitialFilter = Filter e
fltr
                                      , fssWindowSize :: Integer
fssWindowSize = Integer
window
                                      }
    Maybe (EventAction, Quantity)
mLastProcessedFilterState <- MachineT m Any [FilterChange e]
-> (e -> ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) (k :: * -> *) a.
Monad m =>
MachineT m k [FilterChange a]
-> (a -> ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceEventStream (FilterStreamState e -> MachineT m Any [FilterChange e]
forall i ni e (m :: * -> *) (k :: * -> *).
(DecodeEvent i ni e, JsonRpc m) =>
FilterStreamState e -> MachineT m k [FilterChange e]
playOldLogs FilterStreamState e
initState) e -> ReaderT Change m EventAction
handler
    case Maybe (EventAction, Quantity)
mLastProcessedFilterState of
      Maybe (EventAction, Quantity)
Nothing -> Filter Any -> m ()
startPolling Filter e
fltr {filterFromBlock :: DefaultBlock
filterFromBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
start}
      Just (EventAction
act, Quantity
lastBlock) -> do
        Quantity
end <- DefaultBlock -> m Quantity
forall (m :: * -> *). JsonRpc m => DefaultBlock -> m Quantity
mkBlockNumber (DefaultBlock -> m Quantity)
-> (Filter e -> DefaultBlock) -> Filter e -> m Quantity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Filter e -> DefaultBlock
forall e. Filter e -> DefaultBlock
filterToBlock (Filter e -> m Quantity) -> Filter e -> m Quantity
forall a b. (a -> b) -> a -> b
$ Filter e
fltr
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (EventAction
act EventAction -> EventAction -> Bool
forall a. Eq a => a -> a -> Bool
/= EventAction
TerminateEvent Bool -> Bool -> Bool
&& Quantity
lastBlock Quantity -> Quantity -> Bool
forall a. Ord a => a -> a -> Bool
< Quantity
end) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          let pollingFromBlock :: Quantity
pollingFromBlock = Quantity
lastBlock Quantity -> Quantity -> Quantity
forall a. Num a => a -> a -> a
+ Quantity
1
          in Filter Any -> m ()
startPolling Filter e
fltr {filterFromBlock :: DefaultBlock
filterFromBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
pollingFromBlock}
  where
    startPolling :: Filter Any -> m ()
startPolling Filter Any
fltr' = do
      Quantity
filterId <- Filter Any -> m Quantity
forall (m :: * -> *) e. JsonRpc m => Filter e -> m Quantity
Eth.newFilter Filter Any
fltr'
      let pollTo :: DefaultBlock
pollTo = Filter Any -> DefaultBlock
forall e. Filter e -> DefaultBlock
filterToBlock Filter Any
fltr'
      m (Maybe (EventAction, Quantity)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Maybe (EventAction, Quantity)) -> m ())
-> m (Maybe (EventAction, Quantity)) -> m ()
forall a b. (a -> b) -> a -> b
$ MachineT m Any [FilterChange e]
-> (e -> ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) (k :: * -> *) a.
Monad m =>
MachineT m k [FilterChange a]
-> (a -> ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceEventStream (Quantity -> DefaultBlock -> MachineT m Any [FilterChange e]
forall i ni e (k :: * -> *) (m :: * -> *).
(DecodeEvent i ni e, JsonRpc m) =>
Quantity -> DefaultBlock -> MachineT m k [FilterChange e]
pollFilter Quantity
filterId DefaultBlock
pollTo) e -> ReaderT Change m EventAction
handler

-- | Effectively a mapM_ over the machine using the given handler.
reduceEventStream :: Monad m
                  => MachineT m k [FilterChange a]
                  -> (a -> ReaderT Change m EventAction)
                  -> m (Maybe (EventAction, Quantity))
reduceEventStream :: MachineT m k [FilterChange a]
-> (a -> ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceEventStream MachineT m k [FilterChange a]
filterChanges a -> ReaderT Change m EventAction
handler = ([(EventAction, Quantity)] -> Maybe (EventAction, Quantity))
-> m [(EventAction, Quantity)] -> m (Maybe (EventAction, Quantity))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [(EventAction, Quantity)] -> Maybe (EventAction, Quantity)
forall a. [a] -> Maybe a
listToMaybe (m [(EventAction, Quantity)] -> m (Maybe (EventAction, Quantity)))
-> (MachineT m k (EventAction, Quantity)
    -> m [(EventAction, Quantity)])
-> MachineT m k (EventAction, Quantity)
-> m (Maybe (EventAction, Quantity))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MachineT m k (EventAction, Quantity) -> m [(EventAction, Quantity)]
forall (m :: * -> *) (k :: * -> *) b.
Monad m =>
MachineT m k b -> m [b]
runT (MachineT m k (EventAction, Quantity)
 -> m (Maybe (EventAction, Quantity)))
-> MachineT m k (EventAction, Quantity)
-> m (Maybe (EventAction, Quantity))
forall a b. (a -> b) -> a -> b
$
       MachineT m k [FilterChange a]
filterChanges
    MachineT m k [FilterChange a]
-> ProcessT m [FilterChange a] [(EventAction, Quantity)]
-> MachineT m k [(EventAction, Quantity)]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ([FilterChange a] -> m [(EventAction, Quantity)])
-> ProcessT m [FilterChange a] [(EventAction, Quantity)]
forall (k :: * -> * -> *) (m :: * -> *) a b.
(Category k, Monad m) =>
(a -> m b) -> MachineT m (k a) b
autoM ((a -> ReaderT Change m EventAction)
-> [FilterChange a] -> m [(EventAction, Quantity)]
forall (m :: * -> *) a.
Monad m =>
(a -> ReaderT Change m EventAction)
-> [FilterChange a] -> m [(EventAction, Quantity)]
processChanges a -> ReaderT Change m EventAction
handler)
    MachineT m k [(EventAction, Quantity)]
-> ProcessT m [(EventAction, Quantity)] (EventAction, Quantity)
-> MachineT m k (EventAction, Quantity)
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ProcessT m [(EventAction, Quantity)] (EventAction, Quantity)
forall (f :: * -> *) a. Foldable f => Process (f a) a
asParts
    MachineT m k (EventAction, Quantity)
-> ProcessT m (EventAction, Quantity) (EventAction, Quantity)
-> MachineT m k (EventAction, Quantity)
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ((EventAction, Quantity) -> Bool)
-> ProcessT m (EventAction, Quantity) (EventAction, Quantity)
forall (m :: * -> *) (k :: * -> * -> *) o.
(Monad m, Category k) =>
(o -> Bool) -> MachineT m (k o) o
runWhile (\(EventAction
act, Quantity
_) -> EventAction
act EventAction -> EventAction -> Bool
forall a. Eq a => a -> a -> Bool
/= EventAction
TerminateEvent)
    MachineT m k (EventAction, Quantity)
-> ProcessT m (EventAction, Quantity) (EventAction, Quantity)
-> MachineT m k (EventAction, Quantity)
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ProcessT m (EventAction, Quantity) (EventAction, Quantity)
forall (k :: * -> * -> *) a. Category k => Machine (k a) a
final
  where
    runWhile :: (o -> Bool) -> MachineT m (k o) o
runWhile o -> Bool
p = PlanT (k o) o m () -> MachineT m (k o) o
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
repeatedly (PlanT (k o) o m () -> MachineT m (k o) o)
-> PlanT (k o) o m () -> MachineT m (k o) o
forall a b. (a -> b) -> a -> b
$ do
      o
v <- PlanT (k o) o m o
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await
      if o -> Bool
p o
v
        then o -> Plan (k o) o ()
forall o (k :: * -> *). o -> Plan k o ()
yield o
v
        else o -> Plan (k o) o ()
forall o (k :: * -> *). o -> Plan k o ()
yield o
v PlanT (k o) o m () -> PlanT (k o) o m () -> PlanT (k o) o m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> PlanT (k o) o m ()
forall (k :: * -> *) o a. Plan k o a
stop
    processChanges :: Monad m
                   => (a -> ReaderT Change m EventAction)
                   -> [FilterChange a]
                   -> m [(EventAction, Quantity)]
    processChanges :: (a -> ReaderT Change m EventAction)
-> [FilterChange a] -> m [(EventAction, Quantity)]
processChanges a -> ReaderT Change m EventAction
handler' [FilterChange a]
changes = ([Maybe (EventAction, Quantity)] -> [(EventAction, Quantity)])
-> m [Maybe (EventAction, Quantity)] -> m [(EventAction, Quantity)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Maybe (EventAction, Quantity)] -> [(EventAction, Quantity)]
forall a. [Maybe a] -> [a]
catMaybes (m [Maybe (EventAction, Quantity)] -> m [(EventAction, Quantity)])
-> m [Maybe (EventAction, Quantity)] -> m [(EventAction, Quantity)]
forall a b. (a -> b) -> a -> b
$
        [FilterChange a]
-> (FilterChange a -> m (Maybe (EventAction, Quantity)))
-> m [Maybe (EventAction, Quantity)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [FilterChange a]
changes ((FilterChange a -> m (Maybe (EventAction, Quantity)))
 -> m [Maybe (EventAction, Quantity)])
-> (FilterChange a -> m (Maybe (EventAction, Quantity)))
-> m [Maybe (EventAction, Quantity)]
forall a b. (a -> b) -> a -> b
$ \FilterChange{a
Change
filterChangeEvent :: forall a. FilterChange a -> a
filterChangeRawChange :: forall a. FilterChange a -> Change
filterChangeEvent :: a
filterChangeRawChange :: Change
..} -> do
            EventAction
act <- (ReaderT Change m EventAction -> Change -> m EventAction)
-> Change -> ReaderT Change m EventAction -> m EventAction
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT Change m EventAction -> Change -> m EventAction
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT Change
filterChangeRawChange (ReaderT Change m EventAction -> m EventAction)
-> ReaderT Change m EventAction -> m EventAction
forall a b. (a -> b) -> a -> b
$
                a -> ReaderT Change m EventAction
handler' a
filterChangeEvent
            Maybe (EventAction, Quantity) -> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) a. Monad m => a -> m a
return ((,) EventAction
act (Quantity -> (EventAction, Quantity))
-> Maybe Quantity -> Maybe (EventAction, Quantity)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Change -> Maybe Quantity
changeBlockNumber Change
filterChangeRawChange)

-- | 'playLogs' streams the 'filterStream' and calls eth_getLogs on these 'Filter' objects.
playOldLogs :: (DecodeEvent i ni e, JsonRpc m)
            => FilterStreamState e
            -> MachineT m k [FilterChange e]
playOldLogs :: FilterStreamState e -> MachineT m k [FilterChange e]
playOldLogs FilterStreamState e
s = FilterStreamState e -> MachineT m k (Filter e)
forall (m :: * -> *) e (k :: * -> *).
JsonRpc m =>
FilterStreamState e -> MachineT m k (Filter e)
filterStream FilterStreamState e
s
          MachineT m k (Filter e)
-> ProcessT m (Filter e) [Change] -> MachineT m k [Change]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> (Filter e -> m [Change]) -> ProcessT m (Filter e) [Change]
forall (k :: * -> * -> *) (m :: * -> *) a b.
(Category k, Monad m) =>
(a -> m b) -> MachineT m (k a) b
autoM Filter e -> m [Change]
forall (m :: * -> *) e. JsonRpc m => Filter e -> m [Change]
Eth.getLogs
          MachineT m k [Change]
-> ProcessT m [Change] [FilterChange e]
-> MachineT m k [FilterChange e]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ([Change] -> m [FilterChange e])
-> ProcessT m [Change] [FilterChange e]
forall (k :: * -> * -> *) (m :: * -> *) a b.
(Category k, Monad m) =>
(a -> m b) -> MachineT m (k a) b
autoM (IO [FilterChange e] -> m [FilterChange e]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [FilterChange e] -> m [FilterChange e])
-> ([Change] -> IO [FilterChange e])
-> [Change]
-> m [FilterChange e]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Change] -> IO [FilterChange e]
forall i ni e.
DecodeEvent i ni e =>
[Change] -> IO [FilterChange e]
mkFilterChanges)

-- | Polls a filter from the given filterId until the target toBlock is reached.
pollFilter :: forall i ni e k m . (DecodeEvent i ni e, JsonRpc m)
           => Quantity
           -> DefaultBlock
           -> MachineT m k [FilterChange e]
pollFilter :: Quantity -> DefaultBlock -> MachineT m k [FilterChange e]
pollFilter Quantity
i = PlanT k [FilterChange e] m () -> MachineT m k [FilterChange e]
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
construct (PlanT k [FilterChange e] m () -> MachineT m k [FilterChange e])
-> (DefaultBlock -> PlanT k [FilterChange e] m ())
-> DefaultBlock
-> MachineT m k [FilterChange e]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Quantity -> DefaultBlock -> PlanT k [FilterChange e] m ()
pollPlan Quantity
i
  where
    pollPlan :: Quantity -> DefaultBlock -> PlanT k [FilterChange e] m ()
    pollPlan :: Quantity -> DefaultBlock -> PlanT k [FilterChange e] m ()
pollPlan Quantity
fid DefaultBlock
end = do
      Quantity
bn <- m Quantity -> PlanT k [FilterChange e] m Quantity
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Quantity -> PlanT k [FilterChange e] m Quantity)
-> m Quantity -> PlanT k [FilterChange e] m Quantity
forall a b. (a -> b) -> a -> b
$ m Quantity
forall (m :: * -> *). JsonRpc m => m Quantity
Eth.blockNumber
      if Quantity -> DefaultBlock
BlockWithNumber Quantity
bn DefaultBlock -> DefaultBlock -> Bool
forall a. Ord a => a -> a -> Bool
> DefaultBlock
end
        then do
          Bool
_ <- m Bool -> PlanT k [FilterChange e] m Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Bool -> PlanT k [FilterChange e] m Bool)
-> m Bool -> PlanT k [FilterChange e] m Bool
forall a b. (a -> b) -> a -> b
$ Quantity -> m Bool
forall (m :: * -> *). JsonRpc m => Quantity -> m Bool
Eth.uninstallFilter Quantity
fid
          PlanT k [FilterChange e] m ()
forall (k :: * -> *) o a. Plan k o a
stop
        else do
          IO () -> PlanT k [FilterChange e] m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> PlanT k [FilterChange e] m ())
-> IO () -> PlanT k [FilterChange e] m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
1000000
          [FilterChange e]
changes <- m [FilterChange e] -> PlanT k [FilterChange e] m [FilterChange e]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m [FilterChange e] -> PlanT k [FilterChange e] m [FilterChange e])
-> m [FilterChange e]
-> PlanT k [FilterChange e] m [FilterChange e]
forall a b. (a -> b) -> a -> b
$ Quantity -> m [Change]
forall (m :: * -> *). JsonRpc m => Quantity -> m [Change]
Eth.getFilterChanges Quantity
fid m [Change]
-> ([Change] -> m [FilterChange e]) -> m [FilterChange e]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO [FilterChange e] -> m [FilterChange e]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [FilterChange e] -> m [FilterChange e])
-> ([Change] -> IO [FilterChange e])
-> [Change]
-> m [FilterChange e]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Change] -> IO [FilterChange e]
forall i ni e.
DecodeEvent i ni e =>
[Change] -> IO [FilterChange e]
mkFilterChanges
          [FilterChange e] -> Plan k [FilterChange e] ()
forall o (k :: * -> *). o -> Plan k o ()
yield ([FilterChange e] -> Plan k [FilterChange e] ())
-> [FilterChange e] -> Plan k [FilterChange e] ()
forall a b. (a -> b) -> a -> b
$ [FilterChange e]
changes
          Quantity -> DefaultBlock -> PlanT k [FilterChange e] m ()
pollPlan Quantity
fid DefaultBlock
end


-- | 'filterStream' is a machine which represents taking an initial filter
-- over a range of blocks b1, ... bn (where bn is possibly `Latest` or `Pending`,
-- but b1 is an actual block number), and making a stream of filter objects
-- which cover this filter in intervals of size `windowSize`. The machine
-- halts whenever the `fromBlock` of a spanning filter either (1) excedes then
-- initial filter's `toBlock` or (2) is greater than the chain head's block number.
filterStream :: JsonRpc m
             => FilterStreamState e
             -> MachineT m k (Filter e)
filterStream :: FilterStreamState e -> MachineT m k (Filter e)
filterStream FilterStreamState e
initialPlan = FilterStreamState e
-> (FilterStreamState e
    -> PlanT k (Filter e) m (FilterStreamState e))
-> MachineT m k (Filter e)
forall (m :: * -> *) s (k :: * -> *) o.
Monad m =>
s -> (s -> PlanT k o m s) -> MachineT m k o
unfoldPlan FilterStreamState e
initialPlan FilterStreamState e -> PlanT k (Filter e) m (FilterStreamState e)
forall (m :: * -> *) e (k :: * -> *).
JsonRpc m =>
FilterStreamState e -> PlanT k (Filter e) m (FilterStreamState e)
filterPlan
  where
    filterPlan :: JsonRpc m => FilterStreamState e -> PlanT k (Filter e) m (FilterStreamState e)
    filterPlan :: FilterStreamState e -> PlanT k (Filter e) m (FilterStreamState e)
filterPlan initialState :: FilterStreamState e
initialState@FilterStreamState{Integer
Quantity
Filter e
fssWindowSize :: Integer
fssInitialFilter :: Filter e
fssCurrentBlock :: Quantity
fssWindowSize :: forall e. FilterStreamState e -> Integer
fssInitialFilter :: forall e. FilterStreamState e -> Filter e
fssCurrentBlock :: forall e. FilterStreamState e -> Quantity
..} = do
      Quantity
end <- m Quantity -> PlanT k (Filter e) m Quantity
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Quantity -> PlanT k (Filter e) m Quantity)
-> (DefaultBlock -> m Quantity)
-> DefaultBlock
-> PlanT k (Filter e) m Quantity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DefaultBlock -> m Quantity
forall (m :: * -> *). JsonRpc m => DefaultBlock -> m Quantity
mkBlockNumber (DefaultBlock -> PlanT k (Filter e) m Quantity)
-> DefaultBlock -> PlanT k (Filter e) m Quantity
forall a b. (a -> b) -> a -> b
$ Filter e -> DefaultBlock
forall e. Filter e -> DefaultBlock
filterToBlock Filter e
fssInitialFilter
      if Quantity
fssCurrentBlock Quantity -> Quantity -> Bool
forall a. Ord a => a -> a -> Bool
> Quantity
end
        then PlanT k (Filter e) m (FilterStreamState e)
forall (k :: * -> *) o a. Plan k o a
stop
        else do
          let to' :: Quantity
to' = Quantity -> Quantity -> Quantity
forall a. Ord a => a -> a -> a
min Quantity
end (Quantity -> Quantity) -> Quantity -> Quantity
forall a b. (a -> b) -> a -> b
$ Quantity
fssCurrentBlock Quantity -> Quantity -> Quantity
forall a. Num a => a -> a -> a
+ Integer -> Quantity
forall a. Num a => Integer -> a
fromInteger Integer
fssWindowSize
              filter' :: Filter e
filter' = Filter e
fssInitialFilter { filterFromBlock :: DefaultBlock
filterFromBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
fssCurrentBlock
                                         , filterToBlock :: DefaultBlock
filterToBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
to'
                                         }
          Filter e -> Plan k (Filter e) ()
forall o (k :: * -> *). o -> Plan k o ()
yield Filter e
filter'
          FilterStreamState e -> PlanT k (Filter e) m (FilterStreamState e)
forall (m :: * -> *) e (k :: * -> *).
JsonRpc m =>
FilterStreamState e -> PlanT k (Filter e) m (FilterStreamState e)
filterPlan (FilterStreamState e -> PlanT k (Filter e) m (FilterStreamState e))
-> FilterStreamState e
-> PlanT k (Filter e) m (FilterStreamState e)
forall a b. (a -> b) -> a -> b
$ FilterStreamState e
initialState { fssCurrentBlock :: Quantity
fssCurrentBlock = Quantity
to' Quantity -> Quantity -> Quantity
forall a. Num a => a -> a -> a
+ Quantity
1 }

--------------------------------------------------------------------------------

eventNoFilter :: (DecodeEvent i ni e, JsonRpc m)
              => Filter e
              -> (e -> ReaderT Change m EventAction)
              -> m ()
eventNoFilter :: Filter e -> (e -> ReaderT Change m EventAction) -> m ()
eventNoFilter Filter e
fltr = Filter e -> Integer -> (e -> ReaderT Change m EventAction) -> m ()
forall i ni e (m :: * -> *).
(DecodeEvent i ni e, JsonRpc m) =>
Filter e -> Integer -> (e -> ReaderT Change m EventAction) -> m ()
eventManyNoFilter Filter e
fltr Integer
0

eventManyNoFilter :: (DecodeEvent i ni e, JsonRpc m)
                  => Filter e
                  -> Integer
                  -> (e -> ReaderT Change m EventAction)
                  -> m ()
eventManyNoFilter :: Filter e -> Integer -> (e -> ReaderT Change m EventAction) -> m ()
eventManyNoFilter Filter e
fltr Integer
window e -> ReaderT Change m EventAction
handler = do
    Quantity
start <- DefaultBlock -> m Quantity
forall (m :: * -> *). JsonRpc m => DefaultBlock -> m Quantity
mkBlockNumber (DefaultBlock -> m Quantity) -> DefaultBlock -> m Quantity
forall a b. (a -> b) -> a -> b
$ Filter e -> DefaultBlock
forall e. Filter e -> DefaultBlock
filterFromBlock Filter e
fltr
    let initState :: FilterStreamState e
initState = FilterStreamState :: forall e. Quantity -> Filter e -> Integer -> FilterStreamState e
FilterStreamState { fssCurrentBlock :: Quantity
fssCurrentBlock = Quantity
start
                                      , fssInitialFilter :: Filter e
fssInitialFilter = Filter e
fltr
                                      , fssWindowSize :: Integer
fssWindowSize = Integer
window
                                      }
    Maybe (EventAction, Quantity)
mLastProcessedFilterState <- MachineT m Any [FilterChange e]
-> (e -> ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) (k :: * -> *) a.
Monad m =>
MachineT m k [FilterChange a]
-> (a -> ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceEventStream (FilterStreamState e -> MachineT m Any [FilterChange e]
forall i ni e (m :: * -> *) (k :: * -> *).
(DecodeEvent i ni e, JsonRpc m) =>
FilterStreamState e -> MachineT m k [FilterChange e]
playOldLogs FilterStreamState e
initState) e -> ReaderT Change m EventAction
handler
    case Maybe (EventAction, Quantity)
mLastProcessedFilterState of
      Maybe (EventAction, Quantity)
Nothing ->
        let pollingFilterState :: FilterStreamState e
pollingFilterState = FilterStreamState :: forall e. Quantity -> Filter e -> Integer -> FilterStreamState e
FilterStreamState { fssCurrentBlock :: Quantity
fssCurrentBlock = Quantity
start
                                                   , fssInitialFilter :: Filter e
fssInitialFilter = Filter e
fltr
                                                   , fssWindowSize :: Integer
fssWindowSize = Integer
1
                                                   }

        in m (Maybe (EventAction, Quantity)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Maybe (EventAction, Quantity)) -> m ())
-> m (Maybe (EventAction, Quantity)) -> m ()
forall a b. (a -> b) -> a -> b
$ MachineT m Any [FilterChange e]
-> (e -> ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) (k :: * -> *) a.
Monad m =>
MachineT m k [FilterChange a]
-> (a -> ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceEventStream (FilterStreamState e -> MachineT m Any [FilterChange e]
forall i ni e (m :: * -> *) (k :: * -> *).
(DecodeEvent i ni e, JsonRpc m) =>
FilterStreamState e -> MachineT m k [FilterChange e]
playNewLogs FilterStreamState e
pollingFilterState) e -> ReaderT Change m EventAction
handler
      Just (EventAction
act, Quantity
lastBlock) -> do
        Quantity
end <- DefaultBlock -> m Quantity
forall (m :: * -> *). JsonRpc m => DefaultBlock -> m Quantity
mkBlockNumber (DefaultBlock -> m Quantity)
-> (Filter e -> DefaultBlock) -> Filter e -> m Quantity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Filter e -> DefaultBlock
forall e. Filter e -> DefaultBlock
filterToBlock (Filter e -> m Quantity) -> Filter e -> m Quantity
forall a b. (a -> b) -> a -> b
$ Filter e
fltr
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (EventAction
act EventAction -> EventAction -> Bool
forall a. Eq a => a -> a -> Bool
/= EventAction
TerminateEvent Bool -> Bool -> Bool
&& Quantity
lastBlock Quantity -> Quantity -> Bool
forall a. Ord a => a -> a -> Bool
< Quantity
end) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          let pollingFilterState :: FilterStreamState e
pollingFilterState = FilterStreamState :: forall e. Quantity -> Filter e -> Integer -> FilterStreamState e
FilterStreamState { fssCurrentBlock :: Quantity
fssCurrentBlock = Quantity
lastBlock Quantity -> Quantity -> Quantity
forall a. Num a => a -> a -> a
+ Quantity
1
                                                     , fssInitialFilter :: Filter e
fssInitialFilter = Filter e
fltr
                                                     , fssWindowSize :: Integer
fssWindowSize = Integer
1
                                                     }
          in m (Maybe (EventAction, Quantity)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Maybe (EventAction, Quantity)) -> m ())
-> m (Maybe (EventAction, Quantity)) -> m ()
forall a b. (a -> b) -> a -> b
$ MachineT m Any [FilterChange e]
-> (e -> ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) (k :: * -> *) a.
Monad m =>
MachineT m k [FilterChange a]
-> (a -> ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceEventStream (FilterStreamState e -> MachineT m Any [FilterChange e]
forall i ni e (m :: * -> *) (k :: * -> *).
(DecodeEvent i ni e, JsonRpc m) =>
FilterStreamState e -> MachineT m k [FilterChange e]
playNewLogs FilterStreamState e
pollingFilterState) e -> ReaderT Change m EventAction
handler

-- | 'playLogs' streams the 'filterStream' and calls eth_getLogs on these 'Filter' objects.
playNewLogs :: (DecodeEvent i ni e, JsonRpc m)
            => FilterStreamState e
            -> MachineT m k [FilterChange e]
playNewLogs :: FilterStreamState e -> MachineT m k [FilterChange e]
playNewLogs FilterStreamState e
s =
     FilterStreamState e -> MachineT m k (Filter e)
forall (m :: * -> *) e (k :: * -> *).
JsonRpc m =>
FilterStreamState e -> MachineT m k (Filter e)
newFilterStream FilterStreamState e
s
  MachineT m k (Filter e)
-> ProcessT m (Filter e) [Change] -> MachineT m k [Change]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> (Filter e -> m [Change]) -> ProcessT m (Filter e) [Change]
forall (k :: * -> * -> *) (m :: * -> *) a b.
(Category k, Monad m) =>
(a -> m b) -> MachineT m (k a) b
autoM Filter e -> m [Change]
forall (m :: * -> *) e. JsonRpc m => Filter e -> m [Change]
Eth.getLogs
  MachineT m k [Change]
-> ProcessT m [Change] [FilterChange e]
-> MachineT m k [FilterChange e]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ([Change] -> m [FilterChange e])
-> ProcessT m [Change] [FilterChange e]
forall (k :: * -> * -> *) (m :: * -> *) a b.
(Category k, Monad m) =>
(a -> m b) -> MachineT m (k a) b
autoM (IO [FilterChange e] -> m [FilterChange e]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [FilterChange e] -> m [FilterChange e])
-> ([Change] -> IO [FilterChange e])
-> [Change]
-> m [FilterChange e]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Change] -> IO [FilterChange e]
forall i ni e.
DecodeEvent i ni e =>
[Change] -> IO [FilterChange e]
mkFilterChanges)

newFilterStream :: JsonRpc m
                => FilterStreamState e
                -> MachineT m k (Filter e)
newFilterStream :: FilterStreamState e -> MachineT m k (Filter e)
newFilterStream FilterStreamState e
initialState = FilterStreamState e
-> (FilterStreamState e
    -> PlanT k (Filter e) m (FilterStreamState e))
-> MachineT m k (Filter e)
forall (m :: * -> *) s (k :: * -> *) o.
Monad m =>
s -> (s -> PlanT k o m s) -> MachineT m k o
unfoldPlan FilterStreamState e
initialState FilterStreamState e -> PlanT k (Filter e) m (FilterStreamState e)
forall (m :: * -> *) e (k :: * -> *).
JsonRpc m =>
FilterStreamState e -> PlanT k (Filter e) m (FilterStreamState e)
filterPlan
  where
    filterPlan :: JsonRpc m => FilterStreamState e -> PlanT k (Filter e) m (FilterStreamState e)
    filterPlan :: FilterStreamState e -> PlanT k (Filter e) m (FilterStreamState e)
filterPlan s :: FilterStreamState e
s@FilterStreamState{Integer
Quantity
Filter e
fssWindowSize :: Integer
fssInitialFilter :: Filter e
fssCurrentBlock :: Quantity
fssWindowSize :: forall e. FilterStreamState e -> Integer
fssInitialFilter :: forall e. FilterStreamState e -> Filter e
fssCurrentBlock :: forall e. FilterStreamState e -> Quantity
..} = do
      if Quantity -> DefaultBlock
BlockWithNumber Quantity
fssCurrentBlock DefaultBlock -> DefaultBlock -> Bool
forall a. Ord a => a -> a -> Bool
> Filter e -> DefaultBlock
forall e. Filter e -> DefaultBlock
filterToBlock Filter e
fssInitialFilter
        then PlanT k (Filter e) m (FilterStreamState e)
forall (k :: * -> *) o a. Plan k o a
stop
        else do
          Quantity
newestBlockNumber <- m Quantity -> PlanT k (Filter e) m Quantity
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Quantity -> PlanT k (Filter e) m Quantity)
-> (Quantity -> m Quantity)
-> Quantity
-> PlanT k (Filter e) m Quantity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Quantity -> m Quantity
forall (m :: * -> *). JsonRpc m => Quantity -> m Quantity
pollTillBlockProgress (Quantity -> PlanT k (Filter e) m Quantity)
-> Quantity -> PlanT k (Filter e) m Quantity
forall a b. (a -> b) -> a -> b
$ Quantity
fssCurrentBlock
          let filter' :: Filter e
filter' = Filter e
fssInitialFilter { filterFromBlock :: DefaultBlock
filterFromBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
fssCurrentBlock
                                         , filterToBlock :: DefaultBlock
filterToBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
newestBlockNumber
                                         }
          Filter e -> Plan k (Filter e) ()
forall o (k :: * -> *). o -> Plan k o ()
yield Filter e
filter'
          FilterStreamState e -> PlanT k (Filter e) m (FilterStreamState e)
forall (m :: * -> *) e (k :: * -> *).
JsonRpc m =>
FilterStreamState e -> PlanT k (Filter e) m (FilterStreamState e)
filterPlan (FilterStreamState e -> PlanT k (Filter e) m (FilterStreamState e))
-> FilterStreamState e
-> PlanT k (Filter e) m (FilterStreamState e)
forall a b. (a -> b) -> a -> b
$ FilterStreamState e
s { fssCurrentBlock :: Quantity
fssCurrentBlock = Quantity
newestBlockNumber Quantity -> Quantity -> Quantity
forall a. Num a => a -> a -> a
+ Quantity
1 }