/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*
* If you would like to incorporate Link into a proprietary software application,
* please contact .
*/
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace ableton
{
namespace link
{
template
struct Measurement
{
using Callback = std::function&)>;
using Micros = std::chrono::microseconds;
static const std::size_t kNumberDataPoints = 100;
static const std::size_t kNumberMeasurements = 5;
Measurement(const PeerState& state,
Callback callback,
asio::ip::address_v4 address,
Clock clock,
util::Injected io)
: mIo(std::move(io))
, mpImpl(std::make_shared(
std::move(state), std::move(callback), std::move(address), std::move(clock), mIo))
{
mpImpl->listen();
}
Measurement(const Measurement&) = delete;
Measurement& operator=(Measurement&) = delete;
Measurement(const Measurement&&) = delete;
Measurement& operator=(Measurement&&) = delete;
struct Impl : std::enable_shared_from_this
{
using Socket =
typename util::Injected::type::template Socket;
using Timer = typename util::Injected::type::Timer;
using Log = typename util::Injected::type::Log;
Impl(const PeerState& state,
Callback callback,
asio::ip::address_v4 address,
Clock clock,
util::Injected io)
: mSocket(io->template openUnicastSocket(address))
, mSessionId(state.nodeState.sessionId)
, mEndpoint(state.endpoint)
, mCallback(std::move(callback))
, mClock(std::move(clock))
, mTimer(io->makeTimer())
, mMeasurementsStarted(0)
, mLog(channel(io->log(), "Measurement on gateway@" + address.to_string()))
, mSuccess(false)
{
const auto ht = HostTime{mClock.micros()};
sendPing(mEndpoint, discovery::makePayload(ht));
resetTimer();
}
void resetTimer()
{
mTimer.cancel();
mTimer.expires_from_now(std::chrono::milliseconds(50));
mTimer.async_wait([this](const typename Timer::ErrorCode e) {
if (!e)
{
if (mMeasurementsStarted < kNumberMeasurements)
{
const auto ht = HostTime{mClock.micros()};
sendPing(mEndpoint, discovery::makePayload(ht));
++mMeasurementsStarted;
resetTimer();
}
else
{
fail();
}
}
});
}
void listen()
{
mSocket.receive(util::makeAsyncSafe(this->shared_from_this()));
}
// Operator to handle incoming messages on the interface
template
void operator()(
const asio::ip::udp::endpoint& from, const It messageBegin, const It messageEnd)
{
using namespace std;
const auto result = v1::parseMessageHeader(messageBegin, messageEnd);
const auto& header = result.first;
const auto payloadBegin = result.second;
if (header.messageType == v1::kPong)
{
debug(mLog) << "Received Pong message from " << from;
// parse for all entries
SessionId sessionId{};
std::chrono::microseconds ghostTime{0};
std::chrono::microseconds prevGHostTime{0};
std::chrono::microseconds prevHostTime{0};
try
{
discovery::parsePayload(
payloadBegin, messageEnd,
[&sessionId](const SessionMembership& sms) { sessionId = sms.sessionId; },
[&ghostTime](GHostTime gt) { ghostTime = std::move(gt.time); },
[&prevGHostTime](PrevGHostTime gt) { prevGHostTime = std::move(gt.time); },
[&prevHostTime](HostTime ht) { prevHostTime = std::move(ht.time); });
}
catch (const std::runtime_error& err)
{
warning(mLog) << "Failed parsing payload, caught exception: " << err.what();
listen();
return;
}
if (mSessionId == sessionId)
{
const auto hostTime = mClock.micros();
const auto payload =
discovery::makePayload(HostTime{hostTime}, PrevGHostTime{ghostTime});
sendPing(from, payload);
listen();
if (ghostTime != Micros{0} && prevHostTime != Micros{0})
{
mData.push_back(
static_cast(ghostTime.count())
- (static_cast((hostTime + prevHostTime).count()) * 0.5));
if (prevGHostTime != Micros{0})
{
mData.push_back(
(static_cast((ghostTime + prevGHostTime).count()) * 0.5)
- static_cast(prevHostTime.count()));
}
}
if (mData.size() > kNumberDataPoints)
{
finish();
}
else
{
resetTimer();
}
}
else
{
fail();
}
}
else
{
debug(mLog) << "Received invalid message from " << from;
listen();
}
}
template
void sendPing(asio::ip::udp::endpoint to, const Payload& payload)
{
v1::MessageBuffer buffer;
const auto msgBegin = std::begin(buffer);
const auto msgEnd = v1::pingMessage(payload, msgBegin);
const auto numBytes = static_cast(std::distance(msgBegin, msgEnd));
try
{
mSocket.send(buffer.data(), numBytes, to);
}
catch (const std::runtime_error& err)
{
info(mLog) << "Failed to send Ping to " << to.address().to_string() << ": "
<< err.what();
}
}
void finish()
{
mTimer.cancel();
mSuccess = true;
debug(mLog) << "Measuring " << mEndpoint << " done.";
mCallback(mData);
}
void fail()
{
mData.clear();
debug(mLog) << "Measuring " << mEndpoint << " failed.";
mCallback(mData);
}
Socket mSocket;
SessionId mSessionId;
asio::ip::udp::endpoint mEndpoint;
std::vector mData;
Callback mCallback;
Clock mClock;
Timer mTimer;
std::size_t mMeasurementsStarted;
Log mLog;
bool mSuccess;
};
util::Injected mIo;
std::shared_ptr mpImpl;
};
} // namespace link
} // namespace ableton