Commit e5043a75 authored by Michael Vrána's avatar Michael Vrána

Minor refactoring and fixes

parent ecb5d9cb
Pipeline #80503 passed with stages
in 9 minutes and 41 seconds
export interface EvaluateRequest {
algorithms: {
[id: string]: {
name: string
}
}
inputs: {
strings?: {
[id: string]: {
value: string
}
}
ints?: {
[id: string]: {
value: number // must be integer
}
}
doubles?: {
[id: string]: {
value: number // must be double
}
}
bools?: {
[id: string]: {
value: boolean
}
}
}
outputs: string[] // array of IDs
pipes: {
from: string // ID of the source node
to: string // ID of the destination node
paramIndex: number // index of the parameter in the destination node
}[]
}
\ No newline at end of file
export interface EvaluateResponse {
error?: string
outputs?: {
[id: string]: string
[outputId: string]: string
}
}
\ No newline at end of file
......@@ -54,10 +54,9 @@ public:
stream << textMessage->getText();
stream >> json;
std::string errs;
int taskId = json["taskId"].asInt();
std::cout << getTimestamp() << '\t' << "Evaluating task " << json["taskId"].asInt() << std::endl;
std::cout << getTimestamp() << '\t' << "Evaluating task " << taskId << std::endl;
if (debug) std::cout << json << std::endl;
auto future = std::async(std::launch::async, [& json] {
......@@ -67,9 +66,7 @@ public:
});
if (future.wait_for(timeoutDuration) == std::future_status::timeout) {
std::string errorMessage = getTimestamp() + "\tEvaluation timed out";
replyError(*responseSession, *responseProducer, taskId, errorMessage.c_str());
replyError(*responseSession, taskId, "Evaluation timed out");
message->acknowledge();
std::exit(EVALUATION_TIMEOUT);
......@@ -80,13 +77,12 @@ public:
try {
outputMap = future.get();
} catch (std::exception & e) {
replyError(*responseSession, *responseProducer, taskId, e.what());
replyError(*responseSession, taskId, e.what());
message->acknowledge();
return;
}
std::cout << getTimestamp() << '\t' << "Evaluated task " << taskId << std::endl;
replyOutputs(*responseSession, *responseProducer, taskId, *outputMap);
replyOutputs(*responseSession, taskId, *outputMap);
message->acknowledge();
}
......@@ -99,7 +95,8 @@ private:
std::unique_ptr<cms::MessageConsumer> requestConsumer;
std::unique_ptr<cms::MessageProducer> responseProducer;
void replyOutputs(cms::Session & session, cms::MessageProducer & responseProducer, int taskId, const std::map<std::string, std::string> & outputMap) {
void replyOutputs(cms::Session & session, int taskId, const std::map<std::string, std::string> & outputMap) {
std::cout << getTimestamp() << '\t' << "Evaluated task " << taskId << std::endl;
Json::Value outputsJson(Json::objectValue);
for (const auto &outputEntry : outputMap)
......@@ -108,26 +105,26 @@ private:
Json::Value responseJson;
responseJson["outputs"] = outputsJson;
reply(session, responseProducer, taskId, responseJson);
reply(session, taskId, responseJson);
}
void replyError(cms::Session & session, cms::MessageProducer & responseProducer, int taskId, const char * errorMessage) {
void replyError(cms::Session & session, int taskId, const char * errorMessage) {
std::cout << getTimestamp() << '\t' << "Error: " << errorMessage << std::endl;
Json::Value errorJson;
errorJson["error"] = errorMessage;
reply(session, responseProducer, taskId, errorJson);
reply(session, taskId, errorJson);
};
void reply(cms::Session & session, cms::MessageProducer & responseProducer, int taskId, Json::Value & json) {
void reply(cms::Session & session, int taskId, Json::Value & json) {
json["taskId"] = taskId;
auto output = Json::writeString(Json::StreamWriterBuilder(), json);
responseProducer.send(session.createTextMessage(output));
responseProducer->send(session.createTextMessage(output));
std::cout << getTimestamp() << '\t' << "Replied to task " << taskId << std::endl;
if (debug) std::cout << json << std::endl;
}
std::string getTimestamp() const {
static std::string getTimestamp() {
std::string str;
std::stringstream stream;
......
......@@ -54,10 +54,6 @@ public:
return degree;
}
int decrementDegree() {
return --degree;
}
const std::set<Edge> & getEdges() const {
return edges;
}
......
......@@ -9,9 +9,10 @@
#include <stdexcept>
#include "AbstractNode.hpp"
class TopSortException : public std::logic_error {
class TopSortException : public std::logic_error
{
public:
explicit TopSortException(const std::string & msg) : std::logic_error(msg) {}
explicit TopSortException(const std::string &msg) : std::logic_error(msg) {}
};
/**
......@@ -23,23 +24,23 @@ public:
* @return True if the node has path to output node, false if not
*/
bool traverseNodes(
std::shared_ptr<AbstractNode> & node,
std::set<Edge> & visitedEdges,
std::set<std::shared_ptr<AbstractNode>> & validNodes,
std::forward_list<std::shared_ptr<AbstractNode>> & topSort) {
// already has been visited and is valid
if (validNodes.find(node) != validNodes.end()) return true;
std::shared_ptr<AbstractNode> & node,
std::set<Edge> & visitedEdges,
std::set<std::shared_ptr<AbstractNode>> & validNodes,
std::forward_list<std::shared_ptr<AbstractNode>> & topSort) {
if (validNodes.find(node) != validNodes.end())
// already has been visited in previous traversals and is valid
return true;
// output can't have any child nodes
bool res = node->isOutput();
// list of edges to remove is needed because you can't delete edge during iteration of edges
std::forward_list<Edge> edgesToRemove;
for (const auto & edge : node->getEdges()) {
auto child = edge.getNode();
// has been visited in current traversal, this means a cycle exists
if (visitedEdges.find(edge) != visitedEdges.end()) throw TopSortException("Cycle detected");
if (visitedEdges.find(edge) != visitedEdges.end())
// has been visited in current traversal, this means a cycle exists
throw TopSortException("Cycle detected");
visitedEdges.insert(edge);
if (traverseNodes(child, visitedEdges, validNodes, topSort)) {
......@@ -47,15 +48,15 @@ bool traverseNodes(
res = true;
continue;
}
// child node is invalid, remove the edge to the child
edgesToRemove.push_front(edge);
}
// remove all edges to invalid nodes
for (auto & edge : edgesToRemove)
node->removeEdge(edge);
if (res) {
// node is valid
validNodes.insert(node);
topSort.push_front(node);
}
......@@ -64,23 +65,24 @@ bool traverseNodes(
}
/**
* Deletes all nodes that don't have a path to output node
* @param nodes Map of nodes to filter
* @return Map of filtered nodes
* Topologically sorts nodes and also filters out nodes which are not part of a path between input and output node
* @param nodes Map of nodes to sort
* @return Topologically sorted nodes
*/
std::shared_ptr<std::forward_list<std::shared_ptr<AbstractNode>>> topSort(std::map<std::string, std::shared_ptr<AbstractNode>> & nodes) {
auto validNodes = std::set<std::shared_ptr<AbstractNode>>();
auto topSort = std::make_shared<std::forward_list<std::shared_ptr<AbstractNode>>>();
std::shared_ptr<std::forward_list<std::shared_ptr<AbstractNode>>> topSort(const std::map<std::string, std::shared_ptr<AbstractNode>> & nodes) {
std::set<std::shared_ptr<AbstractNode>> validNodes;
auto topSortedNodes = std::make_shared<std::forward_list<std::shared_ptr<AbstractNode>>>();
for (const auto & nodeIt : nodes) {
for (const auto &nodeIt : nodes) {
auto node = nodeIt.second;
if (!node->isInput()) continue;
if (!node->isInput())
continue;
std::set<Edge> visitedEdges;
traverseNodes(node, visitedEdges, validNodes, *topSort);
traverseNodes(node, visitedEdges, validNodes, *topSortedNodes);
}
return topSort;
return topSortedNodes;
}
#endif //ALIB_WEB_WORKER_FILTERINVALIDNODES_HPP
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment