/* Copyright 2016, Ableton AG, Berlin. All rights reserved. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . * * If you would like to incorporate Link into a proprietary software application, * please contact . */ #pragma once #include #include #include #include #include #include #include #include namespace ableton { namespace discovery { // An exception thrown when sending a udp message fails. Stores the // interface through which the sending failed. struct UdpSendException : std::runtime_error { UdpSendException(const std::runtime_error& e, asio::ip::address ifAddr) : std::runtime_error(e.what()) , interfaceAddr(std::move(ifAddr)) { } asio::ip::address interfaceAddr; }; // Throws UdpSendException template void sendUdpMessage(Interface& iface, NodeId from, const uint8_t ttl, const v1::MessageType messageType, const Payload& payload, const asio::ip::udp::endpoint& to) { using namespace std; v1::MessageBuffer buffer; const auto messageBegin = begin(buffer); const auto messageEnd = v1::detail::encodeMessage(std::move(from), ttl, messageType, payload, messageBegin); const auto numBytes = static_cast(distance(messageBegin, messageEnd)); try { iface.send(buffer.data(), numBytes, to); } catch (const std::runtime_error& err) { throw UdpSendException{err, iface.endpoint().address()}; } } // UdpMessenger uses a "shared_ptr pImpl" pattern to make it movable // and to support safe async handler callbacks when receiving messages // on the given interface. template class UdpMessenger { public: using NodeState = NodeStateT; using NodeId = typename NodeState::IdType; using Timer = typename util::Injected::type::Timer; using TimerError = typename Timer::ErrorCode; using TimePoint = typename Timer::TimePoint; UdpMessenger(util::Injected iface, NodeState state, util::Injected io, const uint8_t ttl, const uint8_t ttlRatio) : mpImpl(std::make_shared( std::move(iface), std::move(state), std::move(io), ttl, ttlRatio)) { // We need to always listen for incoming traffic in order to // respond to peer state broadcasts mpImpl->listen(MulticastTag{}); mpImpl->listen(UnicastTag{}); mpImpl->broadcastState(); } UdpMessenger(const UdpMessenger&) = delete; UdpMessenger& operator=(const UdpMessenger&) = delete; UdpMessenger(UdpMessenger&& rhs) : mpImpl(std::move(rhs.mpImpl)) { } ~UdpMessenger() { if (mpImpl != nullptr) { try { mpImpl->sendByeBye(); } catch (const UdpSendException& err) { debug(mpImpl->mIo->log()) << "Failed to send bye bye message: " << err.what(); } } } void updateState(NodeState state) { mpImpl->updateState(std::move(state)); } // Broadcast the current state of the system to all peers. May throw // std::runtime_error if assembling a broadcast message fails or if // there is an error at the transport layer. Throws on failure. void broadcastState() { mpImpl->broadcastState(); } // Asynchronous receive function for incoming messages from peers. Will // return immediately and the handler will be invoked when a message // is received. Handler must have operator() overloads for PeerState and // ByeBye messages. template void receive(Handler handler) { mpImpl->setReceiveHandler(std::move(handler)); } private: struct Impl : std::enable_shared_from_this { Impl(util::Injected iface, NodeState state, util::Injected io, const uint8_t ttl, const uint8_t ttlRatio) : mIo(std::move(io)) , mInterface(std::move(iface)) , mState(std::move(state)) , mTimer(mIo->makeTimer()) , mLastBroadcastTime{} , mTtl(ttl) , mTtlRatio(ttlRatio) , mPeerStateHandler([](PeerState) {}) , mByeByeHandler([](ByeBye) {}) { } template void setReceiveHandler(Handler handler) { mPeerStateHandler = [handler]( PeerState state) { handler(std::move(state)); }; mByeByeHandler = [handler](ByeBye byeBye) { handler(std::move(byeBye)); }; } void sendByeBye() { sendUdpMessage( *mInterface, mState.ident(), 0, v1::kByeBye, makePayload(), multicastEndpoint()); } void updateState(NodeState state) { mState = std::move(state); } void broadcastState() { using namespace std::chrono; const auto minBroadcastPeriod = milliseconds{50}; const auto nominalBroadcastPeriod = milliseconds(mTtl * 1000 / mTtlRatio); const auto timeSinceLastBroadcast = duration_cast(mTimer.now() - mLastBroadcastTime); // The rate is limited to maxBroadcastRate to prevent flooding the network. const auto delay = minBroadcastPeriod - timeSinceLastBroadcast; // Schedule the next broadcast before we actually send the // message so that if sending throws an exception we are still // scheduled to try again. We want to keep trying at our // interval as long as this instance is alive. mTimer.expires_from_now(delay > milliseconds{0} ? delay : nominalBroadcastPeriod); mTimer.async_wait([this](const TimerError e) { if (!e) { broadcastState(); } }); // If we're not delaying, broadcast now if (delay < milliseconds{1}) { debug(mIo->log()) << "Broadcasting state"; sendPeerState(v1::kAlive, multicastEndpoint()); } } void sendPeerState( const v1::MessageType messageType, const asio::ip::udp::endpoint& to) { sendUdpMessage( *mInterface, mState.ident(), mTtl, messageType, toPayload(mState), to); mLastBroadcastTime = mTimer.now(); } void sendResponse(const asio::ip::udp::endpoint& to) { sendPeerState(v1::kResponse, to); } template void listen(Tag tag) { mInterface->receive(util::makeAsyncSafe(this->shared_from_this()), tag); } template void operator()(Tag tag, const asio::ip::udp::endpoint& from, const It messageBegin, const It messageEnd) { auto result = v1::parseMessageHeader(messageBegin, messageEnd); const auto& header = result.first; // Ignore messages from self and other groups if (header.ident != mState.ident() && header.groupId == 0) { debug(mIo->log()) << "Received message type " << static_cast(header.messageType) << " from peer " << header.ident; switch (header.messageType) { case v1::kAlive: sendResponse(from); receivePeerState(std::move(result.first), result.second, messageEnd); break; case v1::kResponse: receivePeerState(std::move(result.first), result.second, messageEnd); break; case v1::kByeBye: receiveByeBye(std::move(result.first.ident)); break; default: info(mIo->log()) << "Unknown message received of type: " << header.messageType; } } listen(tag); } template void receivePeerState( v1::MessageHeader header, It payloadBegin, It payloadEnd) { try { auto state = NodeState::fromPayload( std::move(header.ident), std::move(payloadBegin), std::move(payloadEnd)); // Handlers must only be called once auto handler = std::move(mPeerStateHandler); mPeerStateHandler = [](PeerState) {}; handler(PeerState{std::move(state), header.ttl}); } catch (const std::runtime_error& err) { info(mIo->log()) << "Ignoring peer state message: " << err.what(); } } void receiveByeBye(NodeId nodeId) { // Handlers must only be called once auto byeByeHandler = std::move(mByeByeHandler); mByeByeHandler = [](ByeBye) {}; byeByeHandler(ByeBye{std::move(nodeId)}); } util::Injected mIo; util::Injected mInterface; NodeState mState; Timer mTimer; TimePoint mLastBroadcastTime; uint8_t mTtl; uint8_t mTtlRatio; std::function)> mPeerStateHandler; std::function)> mByeByeHandler; }; std::shared_ptr mpImpl; }; // Factory function template UdpMessenger makeUdpMessenger( util::Injected iface, NodeState state, util::Injected io, const uint8_t ttl, const uint8_t ttlRatio) { return UdpMessenger{ std::move(iface), std::move(state), std::move(io), ttl, ttlRatio}; } } // namespace discovery } // namespace ableton