{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TupleSections #-}
module LndClient.Watcher
( Watcher,
spawnLink,
spawnLinkUnit,
watch,
watchUnit,
unWatch,
unWatchUnit,
dupLndTChan,
delete,
)
where
import qualified Data.Map as Map
import LndClient.Import hiding (newSev, spawnLink)
data Watcher a b
= Watcher
{ watcherCmdChan :: TChan (Cmd a),
watcherLndChan :: TChan (a, b),
watcherProc :: Async ()
}
data Cmd a
= Watch a
| UnWatch a
data Event a b
= EventCmd (Cmd a)
| EventLnd b
| EventTask (a, Either LndError ())
data WatcherState a b m
= WatcherState
{ watcherStateCmdChan :: TChan (Cmd a),
watcherStateLndChan :: TChan (a, b),
watcherStateSub :: a -> m (Either LndError ()),
watcherStateHandler :: a -> Either LndError b -> m (),
watcherStateTasks :: Map a (Async (a, Either LndError ())),
watcherStateLndEnv :: LndEnv
}
newSev :: WatcherState a b m -> Severity -> Severity
newSev w s = newSeverity (watcherStateLndEnv w) s Nothing Nothing
spawnLink ::
(Ord a, MonadUnliftIO m, KatipContext m) =>
LndEnv ->
(Maybe (TChan (a, b)) -> LndEnv -> a -> m (Either LndError ())) ->
(Watcher a b -> a -> Either LndError b -> m ()) ->
m (Watcher a b)
spawnLink env sub handler = do
w <- withRunInIO $ \run -> do
( writeCmdChan,
writeLndChan,
readCmdChan,
readLndChan
) <- atomically $ do
writeCmdChan <- newBroadcastTChan
writeLndChan <- newBroadcastTChan
readCmdChan <- dupTChan writeCmdChan
readLndChan <- dupTChan writeLndChan
pure (writeCmdChan, writeLndChan, readCmdChan, readLndChan)
let w =
Watcher
{ watcherCmdChan = writeCmdChan,
watcherLndChan = writeLndChan,
watcherProc = error "PARTIAL_WATCHER"
}
varProc <- newEmptyMVar
proc <-
async . run $ do
proc <- takeMVar varProc
loop $
WatcherState
{ watcherStateCmdChan = readCmdChan,
watcherStateLndChan = readLndChan,
watcherStateSub = sub (Just writeLndChan) env,
watcherStateHandler = handler $ w {watcherProc = proc},
watcherStateTasks = mempty,
watcherStateLndEnv = env
}
liftIO $ putMVar varProc proc
pure $ w {watcherProc = proc}
let proc = watcherProc w
liftIO $ link proc
$(logTM) (newSeverity env InfoS Nothing Nothing)
$ logStr
$ ("Watcher spawned as " :: Text)
<> show (asyncThreadId proc)
pure w
spawnLinkUnit ::
(MonadUnliftIO m, KatipContext m) =>
LndEnv ->
(Maybe (TChan ((), b)) -> LndEnv -> m (Either LndError ())) ->
(Watcher () b -> Either LndError b -> m ()) ->
m (Watcher () b)
spawnLinkUnit env0 sub handler =
spawnLink
env0
(\mChan env1 _ -> sub mChan env1)
(\chan _ x -> handler chan x)
watch :: (MonadUnliftIO m) => Watcher a b -> a -> m ()
watch w = atomically . writeTChan (watcherCmdChan w) . Watch
watchUnit :: (MonadUnliftIO m) => Watcher () b -> m ()
watchUnit w = watch w ()
unWatch :: (MonadUnliftIO m) => Watcher a b -> a -> m ()
unWatch w = atomically . writeTChan (watcherCmdChan w) . UnWatch
unWatchUnit :: (MonadUnliftIO m) => Watcher () b -> m ()
unWatchUnit w = unWatch w ()
dupLndTChan :: (MonadIO m) => Watcher a b -> m (TChan (a, b))
dupLndTChan = atomically . dupTChan . watcherLndChan
delete :: (MonadUnliftIO m) => Watcher a b -> m ()
delete (Watcher _ _ proc) = liftIO $ cancel proc
loop ::
(Ord a, MonadUnliftIO m, KatipContext m) =>
WatcherState a b m ->
m ()
loop w = do
$(logTM) (newSev w InfoS) "Watcher - cmd <|> lnd <|> task"
me <- maybeDeadlock . atomically $ cmd <|> lnd <|> task
event <- case me of
Nothing -> do
$(logTM) (newSev w InfoS) "Watcher - cmd <|> task"
atomically $ cmd <|> task
Just x -> return x
$(logTM) (newSev w InfoS) "Watcher - applying event"
case event of
EventCmd x -> applyCmd w x
EventLnd x -> applyLnd w (second Right x)
EventTask x -> applyTask w x
where
cmd = EventCmd <$> readTChan (watcherStateCmdChan w)
lnd = EventLnd <$> readTChan (watcherStateLndChan w)
task =
EventTask . snd
<$> waitAnySTM (Map.elems $ watcherStateTasks w)
applyCmd ::
(Ord a, MonadUnliftIO m, KatipContext m) =>
WatcherState a b m ->
Cmd a ->
m ()
applyCmd w = \case
Watch x -> do
$(logTM) (newSev w InfoS) "Watcher - applying Cmd Watch"
if isJust $ Map.lookup x ts
then loop w
else do
t <-
withRunInIO $ \run -> do
t <- async . run $ (x,) <$> watcherStateSub w x
link t
return t
loop w {watcherStateTasks = Map.insert x t ts}
UnWatch x -> do
$(logTM) (newSev w InfoS) "Watcher - applying Cmd UnWatch"
case Map.lookup x ts of
Nothing -> loop w
Just t -> do
liftIO $ cancel t
loop w {watcherStateTasks = Map.delete x ts}
where
ts = watcherStateTasks w
applyLnd ::
(Ord a, MonadUnliftIO m, KatipContext m) =>
WatcherState a b m ->
(a, Either LndError b) ->
m ()
applyLnd w (x0, x1) = do
$(logTM) (newSev w InfoS) "Watcher - applying Lnd"
watcherStateHandler w x0 x1
loop w
applyTask ::
(Ord a, MonadUnliftIO m, KatipContext m) =>
WatcherState a b m ->
(a, Either LndError ()) ->
m ()
applyTask w0 (x, res) = do
$(logTM) (newSev w0 InfoS) "Watcher - applying Task"
case Map.lookup x ts of
Nothing -> loop w0
Just t -> do
case res of
Left (e :: LndError) -> watcherStateHandler w0 x $ Left e
Right () -> liftIO $ cancel t
loop w1
where
ts = watcherStateTasks w0
w1 = w0 {watcherStateTasks = Map.delete x ts}