{-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TupleSections #-} -- | Generic async worker to automate LND gRPC subscriptions. -- It watches any amount of subscriptions of the same type, -- and re-subscribes if subscription was not terminated properly. -- The only way to terminate subscription is to apply `unWatch` or -- `unWatchUnit` function. module LndClient.Watcher ( Watcher, spawnLink, spawnLinkUnit, watch, watchUnit, unWatch, unWatchUnit, dupLndTChan, delete, ) where import qualified Data.Map as Map import LndClient.Import hiding (newSev, spawnLink) -- -- TODO : maybe pass OnSub | OnExit callbacks? -- data Watcher a b = Watcher { watcherCmdChan :: TChan (Cmd a), watcherLndChan :: TChan (a, b), watcherProc :: Async () } -- -- TODO : introduce UnWatchAll -- and use it in withEnv etc -- data Cmd a = Watch a | UnWatch a data Event a b = EventCmd (Cmd a) | EventLnd b | EventTask (a, Either LndError ()) data WatcherState a b m = WatcherState { watcherStateCmdChan :: TChan (Cmd a), watcherStateLndChan :: TChan (a, b), watcherStateSub :: a -> m (Either LndError ()), watcherStateHandler :: a -> Either LndError b -> m (), watcherStateTasks :: Map a (Async (a, Either LndError ())), watcherStateLndEnv :: LndEnv } newSev :: WatcherState a b m -> Severity -> Severity newSev w s = newSeverity (watcherStateLndEnv w) s Nothing Nothing -- Spawn watcher where subscription accepts argument -- for example `subscribeInvoicesChan` spawnLink :: (Ord a, MonadUnliftIO m, KatipContext m) => LndEnv -> (Maybe (TChan (a, b)) -> LndEnv -> a -> m (Either LndError ())) -> (Watcher a b -> a -> Either LndError b -> m ()) -> m (Watcher a b) spawnLink env sub handler = do w <- withRunInIO $ \run -> do ( writeCmdChan, writeLndChan, readCmdChan, readLndChan ) <- atomically $ do writeCmdChan <- newBroadcastTChan writeLndChan <- newBroadcastTChan readCmdChan <- dupTChan writeCmdChan readLndChan <- dupTChan writeLndChan pure (writeCmdChan, writeLndChan, readCmdChan, readLndChan) let w = Watcher { watcherCmdChan = writeCmdChan, watcherLndChan = writeLndChan, watcherProc = error "PARTIAL_WATCHER" } varProc <- newEmptyMVar proc <- async . run $ do proc <- takeMVar varProc loop $ WatcherState { watcherStateCmdChan = readCmdChan, watcherStateLndChan = readLndChan, watcherStateSub = sub (Just writeLndChan) env, watcherStateHandler = handler $ w {watcherProc = proc}, watcherStateTasks = mempty, watcherStateLndEnv = env } liftIO $ putMVar varProc proc pure $ w {watcherProc = proc} let proc = watcherProc w liftIO $ link proc $(logTM) (newSeverity env InfoS Nothing Nothing) $ logStr $ ("Watcher spawned as " :: Text) <> show (asyncThreadId proc) pure w -- Spawn watcher where subscription don't accept argument -- for example `subscribeChannelEventsChan` spawnLinkUnit :: (MonadUnliftIO m, KatipContext m) => LndEnv -> (Maybe (TChan ((), b)) -> LndEnv -> m (Either LndError ())) -> (Watcher () b -> Either LndError b -> m ()) -> m (Watcher () b) spawnLinkUnit env0 sub handler = spawnLink env0 (\mChan env1 _ -> sub mChan env1) (\chan _ x -> handler chan x) watch :: (MonadUnliftIO m) => Watcher a b -> a -> m () watch w = atomically . writeTChan (watcherCmdChan w) . Watch watchUnit :: (MonadUnliftIO m) => Watcher () b -> m () watchUnit w = watch w () unWatch :: (MonadUnliftIO m) => Watcher a b -> a -> m () unWatch w = atomically . writeTChan (watcherCmdChan w) . UnWatch unWatchUnit :: (MonadUnliftIO m) => Watcher () b -> m () unWatchUnit w = unWatch w () dupLndTChan :: (MonadIO m) => Watcher a b -> m (TChan (a, b)) dupLndTChan = atomically . dupTChan . watcherLndChan -- -- TODO : atomically cancel all linked processes -- delete :: (MonadUnliftIO m) => Watcher a b -> m () delete (Watcher _ _ proc) = liftIO $ cancel proc loop :: (Ord a, MonadUnliftIO m, KatipContext m) => WatcherState a b m -> m () loop w = do -- Here is the trick. Async watcher task can be already -- terminated, and runtime detects that there are no any references -- to watcherStateLndChan anymore. -- This may cause `BlockedIndefinitelyOnSTM` -- async exception, because all alternative <|> expressions are -- evaluated independently. In this case we need to retry -- alternative computation but without reading from -- watcherStateLndChan. $(logTM) (newSev w InfoS) "Watcher - cmd <|> lnd <|> task" me <- maybeDeadlock . atomically $ cmd <|> lnd <|> task event <- case me of Nothing -> do $(logTM) (newSev w InfoS) "Watcher - cmd <|> task" atomically $ cmd <|> task Just x -> return x $(logTM) (newSev w InfoS) "Watcher - applying event" case event of EventCmd x -> applyCmd w x EventLnd x -> applyLnd w (second Right x) EventTask x -> applyTask w x where cmd = EventCmd <$> readTChan (watcherStateCmdChan w) lnd = EventLnd <$> readTChan (watcherStateLndChan w) task = EventTask . snd <$> waitAnySTM (Map.elems $ watcherStateTasks w) applyCmd :: (Ord a, MonadUnliftIO m, KatipContext m) => WatcherState a b m -> Cmd a -> m () applyCmd w = \case Watch x -> do $(logTM) (newSev w InfoS) "Watcher - applying Cmd Watch" if isJust $ Map.lookup x ts then loop w else do t <- withRunInIO $ \run -> do t <- async . run $ (x,) <$> watcherStateSub w x link t return t loop w {watcherStateTasks = Map.insert x t ts} UnWatch x -> do $(logTM) (newSev w InfoS) "Watcher - applying Cmd UnWatch" case Map.lookup x ts of Nothing -> loop w Just t -> do liftIO $ cancel t loop w {watcherStateTasks = Map.delete x ts} where ts = watcherStateTasks w applyLnd :: (Ord a, MonadUnliftIO m, KatipContext m) => WatcherState a b m -> (a, Either LndError b) -> m () applyLnd w (x0, x1) = do $(logTM) (newSev w InfoS) "Watcher - applying Lnd" watcherStateHandler w x0 x1 loop w applyTask :: (Ord a, MonadUnliftIO m, KatipContext m) => WatcherState a b m -> (a, Either LndError ()) -> m () applyTask w0 (x, res) = do $(logTM) (newSev w0 InfoS) "Watcher - applying Task" case Map.lookup x ts of Nothing -> loop w0 Just t -> do case res of Left (e :: LndError) -> watcherStateHandler w0 x $ Left e Right () -> liftIO $ cancel t loop w1 where ts = watcherStateTasks w0 w1 = w0 {watcherStateTasks = Map.delete x ts}