{-|
Module      : Neptune.Channel
Description : Neptune Client
Copyright   : (c) Jiasen Wu, 2020
License     : BSD-3-Clause
-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE RecordWildCards           #-}
{-# LANGUAGE ScopedTypeVariables       #-}
module Neptune.Channel where

import           Control.Concurrent.Event  as E (isSet, set)
import           Control.Lens
import           Control.Retry             (recoverAll, retryPolicyDefault)
import           Data.Typeable             (Proxy (..), cast)
import           RIO                       hiding (Lens', (.~), (^.), (^..),
                                            (^?))
import qualified RIO.HashMap               as M

import qualified Neptune.Backend.API       as NBAPI
import           Neptune.Backend.Core      (configLogContext,
                                            configLogExecWithContext)
import           Neptune.Backend.Logging   (_log, levelError)
import           Neptune.Backend.MimeTypes
import           Neptune.Backend.Model     hiding (Experiment)
import           Neptune.Backend.ModelLens
import           Neptune.Session

data DataChannelWithData = forall a. NeptDataType a => DataChannelWithData (DataChannel a, [DataPoint a])

-- | Background thread for transmission
transmitter :: HasCallStack
            => NeptuneSession
            -> Experiment
            -> IO ()
transmitter :: NeptuneSession -> Experiment -> IO ()
transmitter session :: NeptuneSession
session@NeptuneSession{ThreadId
MVar OAuth2Session
Manager
NeptuneBackendConfig
ProjectWithRoleDTO
ClientToken
Dispatcher
_neptune_dispatch :: NeptuneSession -> Dispatcher
_neptune_project :: NeptuneSession -> ProjectWithRoleDTO
_neptune_oauth2_refresh :: NeptuneSession -> ThreadId
_neptune_oauth2 :: NeptuneSession -> MVar OAuth2Session
_neptune_config :: NeptuneSession -> NeptuneBackendConfig
_neptune_client_token :: NeptuneSession -> ClientToken
_neptune_http_manager :: NeptuneSession -> Manager
_neptune_dispatch :: Dispatcher
_neptune_project :: ProjectWithRoleDTO
_neptune_oauth2_refresh :: ThreadId
_neptune_oauth2 :: MVar OAuth2Session
_neptune_config :: NeptuneBackendConfig
_neptune_client_token :: ClientToken
_neptune_http_manager :: Manager
..} Experiment{ThreadId
ChannelHashMap
Event
TChan DataPointAny
ExperimentId
_exp_abort_handler :: Experiment -> ThreadId
_exp_transmitter :: Experiment -> ThreadId
_exp_transmitter_flag :: Experiment -> Event
_exp_stop_flag :: Experiment -> Event
_exp_user_channels :: Experiment -> ChannelHashMap
_exp_outbound_q :: Experiment -> TChan DataPointAny
_exp_experiment_id :: Experiment -> ExperimentId
_exp_abort_handler :: ThreadId
_exp_transmitter :: ThreadId
_exp_transmitter_flag :: Event
_exp_stop_flag :: Event
_exp_user_channels :: ChannelHashMap
_exp_outbound_q :: TChan DataPointAny
_exp_experiment_id :: ExperimentId
..} = IO ()
go
    where
        go :: IO ()
go = do
            Bool
stop_flag <- Event -> IO Bool
E.isSet Event
_exp_stop_flag
            [DataPointAny]
dat <- STM [DataPointAny] -> IO [DataPointAny]
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM [DataPointAny] -> IO [DataPointAny])
-> STM [DataPointAny] -> IO [DataPointAny]
forall a b. (a -> b) -> a -> b
$
                    if Bool
stop_flag
                      then TChan DataPointAny -> STM [DataPointAny]
forall a. TChan a -> STM [a]
readTChanFull TChan DataPointAny
_exp_outbound_q
                      else Int -> TChan DataPointAny -> STM [DataPointAny]
forall a. Int -> TChan a -> STM [a]
readTChanAtMost Int
100 TChan DataPointAny
_exp_outbound_q
            -- Retry sending the whole batch upto 6 times.
            -- Discard and log if fails continuously
            (SomeException -> IO ()) -> IO () -> IO ()
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
(e -> m a) -> m a -> m a
handle SomeException -> IO ()
forall (m :: * -> *). MonadIO m => SomeException -> m ()
onErr (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                RetryPolicyM IO -> (RetryStatus -> IO ()) -> IO ()
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m -> (RetryStatus -> m a) -> m a
recoverAll RetryPolicyM IO
RetryPolicy
retryPolicyDefault ((RetryStatus -> IO ()) -> IO ())
-> (RetryStatus -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \RetryStatus
_ ->
                    [DataPointAny] -> IO ()
forall (t :: * -> *). Traversable t => t DataPointAny -> IO ()
send [DataPointAny]
dat
            if Bool -> Bool
not Bool
stop_flag
               then IO ()
go
               else Event -> IO ()
E.set Event
_exp_transmitter_flag

        send :: t DataPointAny -> IO ()
send t DataPointAny
dat = do
            let dup :: Optic' (->) (Const (Endo [(Text, [DataPointAny])])) b (b, b)
dup           = (b -> (b, b))
-> Optic' (->) (Const (Endo [(Text, [DataPointAny])])) b (b, b)
forall (p :: * -> * -> *) (f :: * -> *) s a.
(Profunctor p, Contravariant f) =>
(s -> a) -> Optic' p f s a
Control.Lens.to (\b
a -> (b
a, b
a))
                singleton :: Optic'
  (->)
  (AlongsideRight
     (Const (Endo [(Text, [DataPointAny])])) DataPointAny)
  a
  [a]
singleton     = (a -> [a])
-> Optic'
     (->)
     (AlongsideRight
        (Const (Endo [(Text, [DataPointAny])])) DataPointAny)
     a
     [a]
forall (p :: * -> * -> *) (f :: * -> *) s a.
(Profunctor p, Contravariant f) =>
(s -> a) -> Optic' p f s a
Control.Lens.to (\a
a -> [a
a])
                merge :: [(Text, [a])] -> [(Text, [a])]
merge         = HashMap Text [a] -> [(Text, [a])]
forall k v. HashMap k v -> [(k, v)]
M.toList (HashMap Text [a] -> [(Text, [a])])
-> ([(Text, [a])] -> HashMap Text [a])
-> [(Text, [a])]
-> [(Text, [a])]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (HashMap Text [a] -> HashMap Text [a] -> HashMap Text [a])
-> HashMap Text [a] -> [HashMap Text [a]] -> HashMap Text [a]
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (([a] -> [a] -> [a])
-> HashMap Text [a] -> HashMap Text [a] -> HashMap Text [a]
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> HashMap k v -> HashMap k v -> HashMap k v
M.unionWith [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
(++)) HashMap Text [a]
forall k v. HashMap k v
M.empty ([HashMap Text [a]] -> HashMap Text [a])
-> ([(Text, [a])] -> [HashMap Text [a]])
-> [(Text, [a])]
-> HashMap Text [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Text, [a]) -> HashMap Text [a])
-> [(Text, [a])] -> [HashMap Text [a]]
forall a b. (a -> b) -> [a] -> [b]
map ((Text -> [a] -> HashMap Text [a])
-> (Text, [a]) -> HashMap Text [a]
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry Text -> [a] -> HashMap Text [a]
forall k v. Hashable k => k -> v -> HashMap k v
M.singleton)
                dat_with_name :: [(Text, [DataPointAny])]
                dat_with_name :: [(Text, [DataPointAny])]
dat_with_name = [(Text, [DataPointAny])] -> [(Text, [DataPointAny])]
forall a. [(Text, [a])] -> [(Text, [a])]
merge ([(Text, [DataPointAny])] -> [(Text, [DataPointAny])])
-> [(Text, [DataPointAny])] -> [(Text, [DataPointAny])]
forall a b. (a -> b) -> a -> b
$ t DataPointAny
dat t DataPointAny
-> Getting
     (Endo [(Text, [DataPointAny])])
     (t DataPointAny)
     (Text, [DataPointAny])
-> [(Text, [DataPointAny])]
forall s a. s -> Getting (Endo [a]) s a -> [a]
^.. (DataPointAny
 -> Const (Endo [(Text, [DataPointAny])]) DataPointAny)
-> t DataPointAny
-> Const (Endo [(Text, [DataPointAny])]) (t DataPointAny)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse ((DataPointAny
  -> Const (Endo [(Text, [DataPointAny])]) DataPointAny)
 -> t DataPointAny
 -> Const (Endo [(Text, [DataPointAny])]) (t DataPointAny))
-> (((Text, [DataPointAny])
     -> Const (Endo [(Text, [DataPointAny])]) (Text, [DataPointAny]))
    -> DataPointAny
    -> Const (Endo [(Text, [DataPointAny])]) DataPointAny)
-> Getting
     (Endo [(Text, [DataPointAny])])
     (t DataPointAny)
     (Text, [DataPointAny])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Optic'
  (->)
  (Const (Endo [(Text, [DataPointAny])]))
  DataPointAny
  (DataPointAny, DataPointAny)
forall b.
Optic' (->) (Const (Endo [(Text, [DataPointAny])])) b (b, b)
dup Optic'
  (->)
  (Const (Endo [(Text, [DataPointAny])]))
  DataPointAny
  (DataPointAny, DataPointAny)
-> (((Text, [DataPointAny])
     -> Const (Endo [(Text, [DataPointAny])]) (Text, [DataPointAny]))
    -> (DataPointAny, DataPointAny)
    -> Const
         (Endo [(Text, [DataPointAny])]) (DataPointAny, DataPointAny))
-> ((Text, [DataPointAny])
    -> Const (Endo [(Text, [DataPointAny])]) (Text, [DataPointAny]))
-> DataPointAny
-> Const (Endo [(Text, [DataPointAny])]) DataPointAny
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LensLike
  (AlongsideLeft
     (Const (Endo [(Text, [DataPointAny])])) [DataPointAny])
  DataPointAny
  DataPointAny
  Text
  Text
-> LensLike
     (AlongsideRight
        (Const (Endo [(Text, [DataPointAny])])) DataPointAny)
     DataPointAny
     DataPointAny
     [DataPointAny]
     [DataPointAny]
-> ((Text, [DataPointAny])
    -> Const (Endo [(Text, [DataPointAny])]) (Text, [DataPointAny]))
-> (DataPointAny, DataPointAny)
-> Const
     (Endo [(Text, [DataPointAny])]) (DataPointAny, DataPointAny)
forall (f :: * -> *) b' s t a b s' t' a'.
LensLike (AlongsideLeft f b') s t a b
-> LensLike (AlongsideRight f t) s' t' a' b'
-> LensLike f (s, s') (t, t') (a, a') (b, b')
alongside LensLike
  (AlongsideLeft
     (Const (Endo [(Text, [DataPointAny])])) [DataPointAny])
  DataPointAny
  DataPointAny
  Text
  Text
Lens' DataPointAny Text
dpt_name_A LensLike
  (AlongsideRight
     (Const (Endo [(Text, [DataPointAny])])) DataPointAny)
  DataPointAny
  DataPointAny
  [DataPointAny]
  [DataPointAny]
forall a.
Optic'
  (->)
  (AlongsideRight
     (Const (Endo [(Text, [DataPointAny])])) DataPointAny)
  a
  [a]
singleton

            [DataChannelWithData]
chn_with_dat <- [(Text, [DataPointAny])]
-> ((Text, [DataPointAny]) -> IO DataChannelWithData)
-> IO [DataChannelWithData]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [(Text, [DataPointAny])]
dat_with_name (((Text, [DataPointAny]) -> IO DataChannelWithData)
 -> IO [DataChannelWithData])
-> ((Text, [DataPointAny]) -> IO DataChannelWithData)
-> IO [DataChannelWithData]
forall a b. (a -> b) -> a -> b
$ \(Text
chn_name, [DataPointAny]
dat) -> do
                -- If channel doesn't exist, then we create one with
                -- channel type from the first element.
                DataChannelAny
chn <- case [DataPointAny] -> DataPointAny
forall a. [a] -> a
head [DataPointAny]
dat of
                         DataPointAny DataPoint a
d0 ->
                             Proxy a
-> ChannelHashMap
-> Text
-> IO (DataChannel a)
-> IO DataChannelAny
forall t.
NeptDataType t =>
Proxy t
-> ChannelHashMap
-> Text
-> IO (DataChannel t)
-> IO DataChannelAny
getOrCreateChannel
                                (DataPoint a -> Proxy a
forall (f :: * -> *) a. f a -> Proxy a
proxy DataPoint a
d0)
                                ChannelHashMap
_exp_user_channels
                                Text
chn_name
                                (NeptuneSession -> ExperimentId -> Text -> IO (DataChannel a)
forall t.
(NeptDataType t, HasCallStack) =>
NeptuneSession -> ExperimentId -> Text -> IO (DataChannel t)
createChannel NeptuneSession
session ExperimentId
_exp_experiment_id Text
chn_name)

                case DataChannelAny
chn of
                  DataChannelAny DataChannel a
chn -> do
                      let ([Text]
errs, [DataPoint a]
grouped) = Proxy a -> [DataPointAny] -> ([Text], [DataPoint a])
forall a.
NeptDataType a =>
Proxy a -> [DataPointAny] -> ([Text], [DataPoint a])
gatherDataPoints (DataChannel a -> Proxy a
forall (f :: * -> *) a. f a -> Proxy a
proxy DataChannel a
chn) [DataPointAny]
dat
                      (Text -> IO ()) -> [Text] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (NeptuneBackendConfig -> Text -> IO ()
forall (m :: * -> *).
MonadIO m =>
NeptuneBackendConfig -> Text -> m ()
logE NeptuneBackendConfig
_neptune_config) [Text]
errs
                      DataChannelWithData -> IO DataChannelWithData
forall (m :: * -> *) a. Monad m => a -> m a
return (DataChannelWithData -> IO DataChannelWithData)
-> DataChannelWithData -> IO DataChannelWithData
forall a b. (a -> b) -> a -> b
$ (DataChannel a, [DataPoint a]) -> DataChannelWithData
forall a.
NeptDataType a =>
(DataChannel a, [DataPoint a]) -> DataChannelWithData
DataChannelWithData (DataChannel a
chn, [DataPoint a]
grouped)

            HasCallStack =>
NeptuneSession -> ExperimentId -> [DataChannelWithData] -> IO ()
NeptuneSession -> ExperimentId -> [DataChannelWithData] -> IO ()
sendChannel NeptuneSession
session ExperimentId
_exp_experiment_id [DataChannelWithData]
chn_with_dat

        onErr :: SomeException -> m ()
onErr SomeException
exc = NeptuneBackendConfig -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
NeptuneBackendConfig -> Text -> m ()
logE NeptuneBackendConfig
_neptune_config (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Failed to send data points.\n" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a. Show a => a -> Text
tshow (SomeException
exc :: SomeException)

        proxy :: f a -> Proxy a
        proxy :: f a -> Proxy a
proxy f a
_ = Proxy a
forall k (t :: k). Proxy t
Proxy

-- | Create a neptune data channel.
createChannel :: forall t. (NeptDataType t, HasCallStack)
              => NeptuneSession -> ExperimentId -> Text -> IO (DataChannel t)
createChannel :: NeptuneSession -> ExperimentId -> Text -> IO (DataChannel t)
createChannel NeptuneSession{ThreadId
MVar OAuth2Session
Manager
NeptuneBackendConfig
ProjectWithRoleDTO
ClientToken
Dispatcher
_neptune_dispatch :: Dispatcher
_neptune_project :: ProjectWithRoleDTO
_neptune_oauth2_refresh :: ThreadId
_neptune_oauth2 :: MVar OAuth2Session
_neptune_config :: NeptuneBackendConfig
_neptune_client_token :: ClientToken
_neptune_http_manager :: Manager
_neptune_dispatch :: NeptuneSession -> Dispatcher
_neptune_project :: NeptuneSession -> ProjectWithRoleDTO
_neptune_oauth2_refresh :: NeptuneSession -> ThreadId
_neptune_oauth2 :: NeptuneSession -> MVar OAuth2Session
_neptune_config :: NeptuneSession -> NeptuneBackendConfig
_neptune_client_token :: NeptuneSession -> ClientToken
_neptune_http_manager :: NeptuneSession -> Manager
..} ExperimentId
exp_id Text
chn_name = do
    -- call create channel api
    let chn_type :: ChannelTypeEnum
chn_type = Proxy t -> ChannelTypeEnum
forall a. NeptDataType a => Proxy a -> ChannelTypeEnum
neptChannelType  (Proxy t
forall k (t :: k). Proxy t
Proxy :: Proxy t)
    ChannelDTO
chn <- NeptuneBackendRequest CreateChannel MimeJSON ChannelDTO MimeJSON
-> IO ChannelDTO
Dispatcher
_neptune_dispatch (NeptuneBackendRequest CreateChannel MimeJSON ChannelDTO MimeJSON
 -> IO ChannelDTO)
-> NeptuneBackendRequest CreateChannel MimeJSON ChannelDTO MimeJSON
-> IO ChannelDTO
forall a b. (a -> b) -> a -> b
$ ContentType MimeJSON
-> Accept MimeJSON
-> ChannelParams
-> ExperimentId
-> NeptuneBackendRequest CreateChannel MimeJSON ChannelDTO MimeJSON
forall contentType accept.
(Consumes CreateChannel contentType,
 MimeRender contentType ChannelParams) =>
ContentType contentType
-> Accept accept
-> ChannelParams
-> ExperimentId
-> NeptuneBackendRequest
     CreateChannel contentType ChannelDTO accept
NBAPI.createChannel
        (MimeJSON -> ContentType MimeJSON
forall a. MimeType a => a -> ContentType a
ContentType MimeJSON
MimeJSON)
        (MimeJSON -> Accept MimeJSON
forall a. MimeType a => a -> Accept a
Accept MimeJSON
MimeJSON)
        (Text -> ChannelTypeEnum -> ChannelParams
mkChannelParams Text
chn_name ChannelTypeEnum
chn_type)
        ExperimentId
exp_id
    DataChannel t -> IO (DataChannel t)
forall (m :: * -> *) a. Monad m => a -> m a
return (DataChannel t -> IO (DataChannel t))
-> DataChannel t -> IO (DataChannel t)
forall a b. (a -> b) -> a -> b
$ Text -> DataChannel t
forall a. Text -> DataChannel a
DataChannel (ChannelDTO
chn ChannelDTO -> Getting Text ChannelDTO Text -> Text
forall s a. s -> Getting a s a -> a
^. Getting Text ChannelDTO Text
Lens_' ChannelDTO Text
channelDTOIdL) :: IO (DataChannel t)

-- | Get a neptune data channel. If the data channel doesn't exist yet,
-- a data channel will be created and added to the hashmap of current
-- user channels.
getOrCreateChannel :: forall t. NeptDataType t
                   => Proxy t -- ^ dummy data type
                   -> ChannelHashMap -- ^ current user channels
                   -> Text -- ^ channel name
                   -> IO (DataChannel t) -- ^ creator
                   -> IO DataChannelAny
getOrCreateChannel :: Proxy t
-> ChannelHashMap
-> Text
-> IO (DataChannel t)
-> IO DataChannelAny
getOrCreateChannel Proxy t
_ ChannelHashMap
user_channels Text
chn_name IO (DataChannel t)
creator = do
    HashMap Text DataChannelAny
uc  <- ChannelHashMap -> IO (HashMap Text DataChannelAny)
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO ChannelHashMap
user_channels
    case HashMap Text DataChannelAny
uc HashMap Text DataChannelAny
-> Getting
     (First DataChannelAny) (HashMap Text DataChannelAny) DataChannelAny
-> Maybe DataChannelAny
forall s a. s -> Getting (First a) s a -> Maybe a
^? Index (HashMap Text DataChannelAny)
-> Traversal'
     (HashMap Text DataChannelAny)
     (IxValue (HashMap Text DataChannelAny))
forall m. Ixed m => Index m -> Traversal' m (IxValue m)
ix Text
Index (HashMap Text DataChannelAny)
chn_name of
      Maybe DataChannelAny
Nothing -> do
        -- add to `user_channels`
        DataChannelAny
chn_new <- DataChannel t -> DataChannelAny
forall a. NeptDataType a => DataChannel a -> DataChannelAny
DataChannelAny (DataChannel t -> DataChannelAny)
-> IO (DataChannel t) -> IO DataChannelAny
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (DataChannel t)
creator
        STM DataChannelAny -> IO DataChannelAny
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM DataChannelAny -> IO DataChannelAny)
-> STM DataChannelAny -> IO DataChannelAny
forall a b. (a -> b) -> a -> b
$ do
            HashMap Text DataChannelAny
mapping <- ChannelHashMap -> STM (HashMap Text DataChannelAny)
forall a. TVar a -> STM a
readTVar ChannelHashMap
user_channels
            ChannelHashMap -> HashMap Text DataChannelAny -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar ChannelHashMap
user_channels (HashMap Text DataChannelAny
mapping HashMap Text DataChannelAny
-> (HashMap Text DataChannelAny -> HashMap Text DataChannelAny)
-> HashMap Text DataChannelAny
forall a b. a -> (a -> b) -> b
& Index (HashMap Text DataChannelAny)
-> Traversal'
     (HashMap Text DataChannelAny)
     (IxValue (HashMap Text DataChannelAny))
forall m. Ixed m => Index m -> Traversal' m (IxValue m)
ix Text
Index (HashMap Text DataChannelAny)
chn_name ((DataChannelAny -> Identity DataChannelAny)
 -> HashMap Text DataChannelAny
 -> Identity (HashMap Text DataChannelAny))
-> DataChannelAny
-> HashMap Text DataChannelAny
-> HashMap Text DataChannelAny
forall s t a b. ASetter s t a b -> b -> s -> t
.~ DataChannelAny
chn_new)
            DataChannelAny -> STM DataChannelAny
forall (m :: * -> *) a. Monad m => a -> m a
return DataChannelAny
chn_new
      Just DataChannelAny
x  -> DataChannelAny -> IO DataChannelAny
forall (m :: * -> *) a. Monad m => a -> m a
return DataChannelAny
x

-- | Send a batch of data in their respective channel.
sendChannel :: HasCallStack => NeptuneSession -> ExperimentId -> [DataChannelWithData] -> IO ()
sendChannel :: NeptuneSession -> ExperimentId -> [DataChannelWithData] -> IO ()
sendChannel NeptuneSession{ThreadId
MVar OAuth2Session
Manager
NeptuneBackendConfig
ProjectWithRoleDTO
ClientToken
Dispatcher
_neptune_dispatch :: Dispatcher
_neptune_project :: ProjectWithRoleDTO
_neptune_oauth2_refresh :: ThreadId
_neptune_oauth2 :: MVar OAuth2Session
_neptune_config :: NeptuneBackendConfig
_neptune_client_token :: ClientToken
_neptune_http_manager :: Manager
_neptune_dispatch :: NeptuneSession -> Dispatcher
_neptune_project :: NeptuneSession -> ProjectWithRoleDTO
_neptune_oauth2_refresh :: NeptuneSession -> ThreadId
_neptune_oauth2 :: NeptuneSession -> MVar OAuth2Session
_neptune_config :: NeptuneSession -> NeptuneBackendConfig
_neptune_client_token :: NeptuneSession -> ClientToken
_neptune_http_manager :: NeptuneSession -> Manager
..} ExperimentId
exp_id [DataChannelWithData]
chn'value = do
    [BatchChannelValueErrorDTO]
errors <- NeptuneBackendRequest
  PostChannelValues MimeJSON [BatchChannelValueErrorDTO] MimeJSON
-> IO [BatchChannelValueErrorDTO]
Dispatcher
_neptune_dispatch (NeptuneBackendRequest
   PostChannelValues MimeJSON [BatchChannelValueErrorDTO] MimeJSON
 -> IO [BatchChannelValueErrorDTO])
-> NeptuneBackendRequest
     PostChannelValues MimeJSON [BatchChannelValueErrorDTO] MimeJSON
-> IO [BatchChannelValueErrorDTO]
forall a b. (a -> b) -> a -> b
$ ContentType MimeJSON
-> Accept MimeJSON
-> ChannelsValues
-> ExperimentId
-> NeptuneBackendRequest
     PostChannelValues MimeJSON [BatchChannelValueErrorDTO] MimeJSON
forall contentType accept.
(Consumes PostChannelValues contentType,
 MimeRender contentType ChannelsValues) =>
ContentType contentType
-> Accept accept
-> ChannelsValues
-> ExperimentId
-> NeptuneBackendRequest
     PostChannelValues contentType [BatchChannelValueErrorDTO] accept
NBAPI.postChannelValues
                (MimeJSON -> ContentType MimeJSON
forall a. MimeType a => a -> ContentType a
ContentType MimeJSON
MimeJSON)
                (MimeJSON -> Accept MimeJSON
forall a. MimeType a => a -> Accept a
Accept MimeJSON
MimeJSON)
                ([InputChannelValues] -> ChannelsValues
ChannelsValues ([InputChannelValues] -> ChannelsValues)
-> [InputChannelValues] -> ChannelsValues
forall a b. (a -> b) -> a -> b
$ (DataChannelWithData -> InputChannelValues)
-> [DataChannelWithData] -> [InputChannelValues]
forall a b. (a -> b) -> [a] -> [b]
map DataChannelWithData -> InputChannelValues
toChannelsValues [DataChannelWithData]
chn'value)
                ExperimentId
exp_id
    -- TODO proper logging
    [BatchChannelValueErrorDTO]
-> (BatchChannelValueErrorDTO -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [BatchChannelValueErrorDTO]
errors ((BatchChannelValueErrorDTO -> IO ()) -> IO ())
-> (BatchChannelValueErrorDTO -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \BatchChannelValueErrorDTO
err -> do
        let chn :: Text
chn   = BatchChannelValueErrorDTO
err BatchChannelValueErrorDTO
-> Getting Text BatchChannelValueErrorDTO Text -> Text
forall s a. s -> Getting a s a -> a
^. Getting Text BatchChannelValueErrorDTO Text
Lens_' BatchChannelValueErrorDTO Text
batchChannelValueErrorDTOChannelIdL
            xs :: Double
xs    = BatchChannelValueErrorDTO
err BatchChannelValueErrorDTO
-> Getting Double BatchChannelValueErrorDTO Double -> Double
forall s a. s -> Getting a s a -> a
^. Getting Double BatchChannelValueErrorDTO Double
Lens_' BatchChannelValueErrorDTO Double
batchChannelValueErrorDTOXL
            ecode :: Int
ecode = BatchChannelValueErrorDTO
err BatchChannelValueErrorDTO
-> Getting Int BatchChannelValueErrorDTO Int -> Int
forall s a. s -> Getting a s a -> a
^. (Error -> Const Int Error)
-> BatchChannelValueErrorDTO -> Const Int BatchChannelValueErrorDTO
Lens_' BatchChannelValueErrorDTO Error
batchChannelValueErrorDTOErrorL ((Error -> Const Int Error)
 -> BatchChannelValueErrorDTO
 -> Const Int BatchChannelValueErrorDTO)
-> ((Int -> Const Int Int) -> Error -> Const Int Error)
-> Getting Int BatchChannelValueErrorDTO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Const Int Int) -> Error -> Const Int Error
Lens_' Error Int
errorCodeL
            emsg :: Text
emsg  = BatchChannelValueErrorDTO
err BatchChannelValueErrorDTO
-> Getting Text BatchChannelValueErrorDTO Text -> Text
forall s a. s -> Getting a s a -> a
^. (Error -> Const Text Error)
-> BatchChannelValueErrorDTO
-> Const Text BatchChannelValueErrorDTO
Lens_' BatchChannelValueErrorDTO Error
batchChannelValueErrorDTOErrorL ((Error -> Const Text Error)
 -> BatchChannelValueErrorDTO
 -> Const Text BatchChannelValueErrorDTO)
-> ((Text -> Const Text Text) -> Error -> Const Text Error)
-> Getting Text BatchChannelValueErrorDTO Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Text -> Const Text Text) -> Error -> Const Text Error
Lens_' Error Text
errorMessageL
        NeptuneBackendConfig -> Text -> IO ()
forall (m :: * -> *).
MonadIO m =>
NeptuneBackendConfig -> Text -> m ()
logE NeptuneBackendConfig
_neptune_config (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ (Text, Double, Int, Text) -> Text
forall a. Show a => a -> Text
tshow (Text
chn, Double
xs, Int
ecode, Text
emsg)

    where
        toChannelsValues :: DataChannelWithData -> InputChannelValues
        toChannelsValues :: DataChannelWithData -> InputChannelValues
toChannelsValues (DataChannelWithData (DataChannel Text
chn, [DataPoint a]
dat)) =
            Text -> [Point] -> InputChannelValues
mkInputChannelValues Text
chn ((DataPoint a -> Point) -> [DataPoint a] -> [Point]
forall a b. (a -> b) -> [a] -> [b]
map DataPoint a -> Point
forall a. NeptDataType a => DataPoint a -> Point
toNeptPoint [DataPoint a]
dat)

-- | Read at most 'n' items from the queue, with blocking read at the
-- first item.
readTChanAtMost :: Int -> TChan a -> STM [a]
readTChanAtMost :: Int -> TChan a -> STM [a]
readTChanAtMost Int
n TChan a
chan = do
    -- blocking-read for the 1st elem
    -- i.e. wait until there is sth
    a
v0 <- TChan a -> STM a
forall a. TChan a -> STM a
readTChan TChan a
chan
    [Maybe a]
vs <- [STM (Maybe a)] -> STM [Maybe a]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence ([STM (Maybe a)] -> STM [Maybe a])
-> [STM (Maybe a)] -> STM [Maybe a]
forall a b. (a -> b) -> a -> b
$ Int -> STM (Maybe a) -> [STM (Maybe a)]
forall a. Int -> a -> [a]
replicate (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) (TChan a -> STM (Maybe a)
forall a. TChan a -> STM (Maybe a)
tryReadTChan TChan a
chan)
    [a] -> STM [a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([a] -> STM [a]) -> [a] -> STM [a]
forall a b. (a -> b) -> a -> b
$ a
v0 a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [Maybe a] -> [a]
forall a. [Maybe a] -> [a]
catMaybes [Maybe a]
vs

-- | Read all items from the queue
readTChanFull :: TChan a -> STM [a]
readTChanFull :: TChan a -> STM [a]
readTChanFull TChan a
chan = do
    Maybe a
vs <- TChan a -> STM (Maybe a)
forall a. TChan a -> STM (Maybe a)
tryReadTChan TChan a
chan
    case Maybe a
vs of
      Maybe a
Nothing -> [a] -> STM [a]
forall (m :: * -> *) a. Monad m => a -> m a
return []
      Just a
v -> do
          [a]
vs <- TChan a -> STM [a]
forall a. TChan a -> STM [a]
readTChanFull TChan a
chan
          [a] -> STM [a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([a] -> STM [a]) -> [a] -> STM [a]
forall a b. (a -> b) -> a -> b
$ a
v a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
vs

gatherDataPoints :: forall a. NeptDataType a
                 => Proxy a
                 -> [DataPointAny]
                 -> ([Text], [DataPoint a])
gatherDataPoints :: Proxy a -> [DataPointAny] -> ([Text], [DataPoint a])
gatherDataPoints Proxy a
_ [DataPointAny]
dpa = [Either Text (DataPoint a)] -> ([Text], [DataPoint a])
forall a b. [Either a b] -> ([a], [b])
partitionEithers ([Either Text (DataPoint a)] -> ([Text], [DataPoint a]))
-> [Either Text (DataPoint a)] -> ([Text], [DataPoint a])
forall a b. (a -> b) -> a -> b
$ (DataPointAny -> Either Text (DataPoint a))
-> [DataPointAny] -> [Either Text (DataPoint a)]
forall a b. (a -> b) -> [a] -> [b]
map DataPointAny -> Either Text (DataPoint a)
forall b. Typeable b => DataPointAny -> Either Text b
castData [DataPointAny]
dpa
    where
        castData :: DataPointAny -> Either Text b
castData (DataPointAny DataPoint a
d) = case DataPoint a -> Maybe b
forall a b. (Typeable a, Typeable b) => a -> Maybe b
cast DataPoint a
d of
                                      Maybe b
Nothing -> Text -> Either Text b
forall a b. a -> Either a b
Left (Text -> Either Text b) -> Text -> Either Text b
forall a b. (a -> b) -> a -> b
$ DataPoint a -> Text
forall a. DataPoint a -> Text
_dpt_name DataPoint a
d Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" is not compatible for the channel"
                                      Just b
o -> b -> Either Text b
forall a b. b -> Either a b
Right b
o

logE :: NeptuneBackendConfig -> Text -> m ()
logE NeptuneBackendConfig
config Text
msg = NeptuneBackendConfig -> LogContext -> LogExec m
NeptuneBackendConfig
-> forall (m :: * -> *). MonadIO m => LogContext -> LogExec m
configLogExecWithContext NeptuneBackendConfig
config (NeptuneBackendConfig -> LogContext
configLogContext NeptuneBackendConfig
config) (KatipT m () -> m ()) -> KatipT m () -> m ()
forall a b. (a -> b) -> a -> b
$
    Text -> LogLevel -> Text -> KatipT m ()
forall (m :: * -> *).
(Applicative m, Katip m) =>
Text -> LogLevel -> Text -> m ()
_log Text
"Neptune.Channel" LogLevel
levelError Text
msg