Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue #21 #28

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions examples/chat-pubsub-regex.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2012-2023 University of California, Los Angeles
*
* This file is part of ndn-svs, synchronization library for distributed realtime
* applications for NDN.
*
* ndn-svs library is free software: you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free Software
* Foundation, in version 2.1 of the License.
*
* ndn-svs library is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
*/

#include <ctime>
#include <functional>
#include <iostream>
#include <string>
#include <thread>
#include <vector>

#include <ndn-svs/svspubsub.hpp>

using namespace ndn::svs;

struct Options
{
std::string prefix;
std::string m_id;
};

class Program
{
public:
Program(const Options& options)
: m_options(options)
{
// Use HMAC signing for Sync Interests
// Note: this is not generally recommended, but is used here for simplicity
SecurityOptions secOpts(m_keyChain);
secOpts.interestSigner->signingInfo.setSigningHmacKey("dGhpcyBpcyBhIHNlY3JldCBtZXNzYWdl");

// Sign data packets using SHA256 (for simplicity)
secOpts.dataSigner->signingInfo.setSha256Signing();

// Do not fetch publications older than 10 seconds
SVSPubSubOptions opts;
opts.useTimestamp = true;
opts.maxPubAge = ndn::time::seconds(10);

// Create the Pub/Sub instance
m_svsps = std::make_shared<SVSPubSub>(
ndn::Name(m_options.prefix),
ndn::Name(m_options.m_id),
face,
std::bind(&Program::onMissingData, this, _1),
opts,
secOpts);

std::cout << "SVS client starting: " << m_options.m_id << std::endl;

// Subscribe to all data packets with prefix /chat (the "topic")
m_svsps->subscribeWithRegex(ndn::Regex("^<chat>"), [] (const auto& subData)
{
std::string content(reinterpret_cast<const char*>(subData.data.data()), subData.data.size());
std::cout << subData.producerPrefix << " [" << subData.seqNo << "] : " <<
subData.name << " : ";
if (content.length() > 200) {
std::cout << "[LONG] " << content.length() << " bytes"
<< " [" << std::hash<std::string>{}(content) << "]";
} else {
std::cout << content;
}
std::cout << std::endl;
});
}

void
run()
{
// Begin processing face events in a separate thread.
std::thread svsThread([this] { face.processEvents(); });

// Announce our presence.
// Note that the SVS-PS instance is thread-safe.
publishMsg("User " + m_options.m_id + " has joined the groupchat");

// Read from stdin and publish messages.
std::string userInput;
while (true) {
std::getline(std::cin, userInput);
publishMsg(userInput);
}

// Wait for the SVS-PS thread to finish.
svsThread.join();
}

protected:
/**
* Callback on receving a new State Vector from another node.
* This will be called regardless of whether the missing data contains any topics
* or producers that we are subscribed to.
*/
void
onMissingData(const std::vector<MissingDataInfo>&)
{
// Ignore any other missing data for this example
}

/**
* Publish a string message to the group
*/
void
publishMsg(const std::string& msg)
{
// Message to send
std::string content = msg;

// If the message starts with "SEND " generate a new message
// with random content with length after send
if (msg.length() > 5 && msg.substr(0, 5) == "SEND ") {
auto len = std::stoi(msg.substr(5));

content = std::string(len, 'a');
std::srand(std::time(nullptr));
for (auto& c : content)
c = std::rand() % 26 + 'a';

std::cout << "> Sending random message with hash [" << std::hash<std::string>{}(content) << "]" << std::endl;
}

// Note that unlike SVSync, names can be arbitrary,
// and need not be prefixed with the producer prefix.
ndn::Name name("chat"); // topic of publication
name.append(m_options.m_id); // who sent this
name.appendTimestamp(); // and when

m_svsps->publish(name, ndn::make_span(reinterpret_cast<const uint8_t*>(content.data()), content.size()));
}

private:
const Options m_options;
ndn::Face face;
std::shared_ptr<SVSPubSub> m_svsps;
ndn::KeyChain m_keyChain;
};

int
main(int argc, char** argv)
{
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <prefix>" << std::endl;
return 1;
}

Options opt;
opt.prefix = "/ndn/svs";
opt.m_id = argv[1];

Program program(opt);
program.run();

return 0;
}
72 changes: 68 additions & 4 deletions ndn-svs/svspubsub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include <chrono>



namespace ndn::svs {

SVSPubSub::SVSPubSub(const Name& syncPrefix,
Expand Down Expand Up @@ -90,6 +92,23 @@ SVSPubSub::publish(const Name& name, span<const uint8_t> value,
}
}


SeqNo
SVSPubSub::publish(const Name& name,
const Name& nodePrefix, time::milliseconds freshnessPeriod,
std::vector<Block> mappingBlocks)
{
// Segment the data if larger than MAX_DATA_SIZE
NodeID nid = nodePrefix == EMPTY_NAME ? m_dataPrefix : nodePrefix;
SeqNo seqNo = m_svsync.getCore().getSeqNo(nid) + 1;

// Insert mapping and manually update the sequence number
insertMapping(nid, seqNo, name, mappingBlocks);
m_svsync.getCore().updateSeqNo(seqNo, nid);

return seqNo;
}

SeqNo
SVSPubSub::publishPacket(const Data& data, const Name& nodePrefix,
std::vector<Block> mappingBlocks)
Expand Down Expand Up @@ -139,6 +158,15 @@ SVSPubSub::subscribe(const Name& prefix, const SubscriptionCallback& callback, b
return handle;
}

uint32_t
SVSPubSub::subscribeWithRegex(const Regex &regex, const SubscriptionCallback &callback,bool autofetch, bool packets)
{
uint32_t handle = ++m_subscriptionCount;
Subscription sub = { handle, ndn::Name(), callback, packets, false, autofetch, make_shared<Regex>(regex)};
m_regexSubscriptions.push_back(sub);
return handle;
}

uint32_t
SVSPubSub::subscribeToProducer(const Name& nodePrefix, const SubscriptionCallback& callback,
bool prefetch, bool packets)
Expand Down Expand Up @@ -190,8 +218,8 @@ SVSPubSub::updateCallbackInternal(const std::vector<MissingDataInfo>& info)
}
}

// Fetch all mappings if we have prefix subscription(s)
if (!m_prefixSubscriptions.empty())
// Fetch all mappings if we have prefix subscription(s) or regex subscription(s)
if (!m_prefixSubscriptions.empty() or !m_regexSubscriptions.empty())
{
MissingDataInfo remainingInfo = stream;

Expand Down Expand Up @@ -277,8 +305,44 @@ SVSPubSub::processMapping(const NodeID& nodeId, SeqNo seqNo)
{
if (sub.prefix.isPrefixOf(mapping.first))
{
m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub);
queued = true;
if (sub.autofetch)
{
m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub);
queued = true;
}
else
{
SubscriptionData subData = {
mapping.first,
ndn::span<const uint8_t>{},
nodeId,
seqNo,
ndn::Data()
};
sub.callback(subData);
}
}
}
for (const auto& sub : m_regexSubscriptions)
{
if (sub.regex->match(mapping.first))
{
if (sub.autofetch)
{
m_fetchMap[std::pair(nodeId, seqNo)].push_back(sub);
queued = true;
}
else
{
SubscriptionData subData = {
mapping.first,
ndn::span<const uint8_t>{},
nodeId,
seqNo,
ndn::Data()
};
sub.callback(subData);
}
}
}

Expand Down
31 changes: 31 additions & 0 deletions ndn-svs/svspubsub.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "svsync.hpp"

#include <ndn-cxx/security/validator-null.hpp>
#include <ndn-cxx/util/regex.hpp>

namespace ndn::svs {

Expand Down Expand Up @@ -116,6 +117,20 @@ class SVSPubSub : noncopyable
time::milliseconds freshnessPeriod = FRESH_FOREVER,
std::vector<Block> mappingBlocks = {});

/**
* @brief Publish data names only on the pub/sub group.
*
* @param name name for the publication
* @param nodePrefix Name to publish the data under
* @param freshnessPeriod freshness period for the data
* @param mappingBlocks Additional blocks to be published with the mapping (use sparingly)
*/
SeqNo
publish(const Name& name,
const Name& nodePrefix = EMPTY_NAME,
time::milliseconds freshnessPeriod = FRESH_FOREVER,
std::vector<Block> mappingBlocks = {});

/**
* @brief Subscribe to a application name prefix.
*
Expand All @@ -128,6 +143,18 @@ class SVSPubSub : noncopyable
uint32_t
subscribe(const Name& prefix, const SubscriptionCallback& callback, bool packets = false);

/**
* @brief Subscribe with a regex to name.
*
* @param regex regex of the application data
* @param callback Callback when new data is received
* @param packets Subscribe to the raw Data packets instead of BLOBs
*
* @returns Handle to the subscription
*/
uint32_t
subscribeWithRegex(const Regex& regex, const SubscriptionCallback& callback, bool autofetch = true, bool packets = false);

/**
* @brief Subscribe to a data producer
*
Expand Down Expand Up @@ -181,6 +208,9 @@ class SVSPubSub : noncopyable
SubscriptionCallback callback;
bool isPacketSubscription;
bool prefetch;
bool autofetch = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wastes space in the struct, please move the bool next to the others above to minimize padding. Even better, the bools and the uint32_t should be grouped together.

std::shared_ptr<Regex> regex = make_shared<Regex>("^<>+$");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why a shared_ptr?


};

void
Expand Down Expand Up @@ -241,6 +271,7 @@ class SVSPubSub : noncopyable
uint32_t m_subscriptionCount;
std::vector<Subscription> m_producerSubscriptions;
std::vector<Subscription> m_prefixSubscriptions;
std::vector<Subscription> m_regexSubscriptions;

// Queue of publications to fetch
std::map<std::pair<Name, SeqNo>, std::vector<Subscription>> m_fetchMap;
Expand Down