From 8051a2d8c199b2be6c3297fce16f937a061ddb1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radovan=20=C4=8Cerven=C3=BD?= <radovan.cerveny@gmail.com> Date: Wed, 17 Feb 2016 07:38:56 +0100 Subject: [PATCH] working pipeline execution and measurement gathering --- .../provisioner/MeasurementProvisioner.cpp | 97 ++++++++++++++++--- 1 file changed, 81 insertions(+), 16 deletions(-) diff --git a/ameasure2/src/provisioner/MeasurementProvisioner.cpp b/ameasure2/src/provisioner/MeasurementProvisioner.cpp index 1f59621509..2c3f6c8bb4 100644 --- a/ameasure2/src/provisioner/MeasurementProvisioner.cpp +++ b/ameasure2/src/provisioner/MeasurementProvisioner.cpp @@ -4,6 +4,7 @@ #include "MeasurementProvisioner.hpp" #include "exception/AlibException.h" +#include "sax/SaxParseInterface.h" #include <unistd.h> #include <err.h> #include <wordexp.h> @@ -61,47 +62,111 @@ void MeasurementProvisioner::runProvision ( const MeasurementProvision & mp, Mea runPipeline ( expandedInputDataFile, mp.pipeCommands ); } -void MeasurementProvisioner::runPipeline ( const std::string & inputDataFile, const std::vector < std::string > & pipeCommands ) { +void MeasurementProvisioner::runPipeline ( const std::string & inputDataFile, const std::vector < std::string > & pipelineCommands ) { - std::string executionPipeline; + std::vector < std::string > pipeline; - for ( std::vector < std::string >::const_iterator iter = pipeCommands.cbegin ( ); iter != pipeCommands.cend ( ); ++iter ) { - if ( iter != pipeCommands.cbegin ( ) ) - executionPipeline += " | "; + std::vector < std::deque < sax::Token > > measurementsOutputTokens; - executionPipeline += * iter + " -m"; + std::vector < std::string >::const_iterator iter = pipelineCommands.cbegin ( ); - if ( iter == pipeCommands.cbegin ( ) ) - executionPipeline += " -i " + inputDataFile; - } + pipeline.emplace_back ( * ( iter++ ) + " -m -i " + inputDataFile ); + + for ( ; iter != pipelineCommands.cend ( ); ++iter ) + pipeline.emplace_back ( * iter + " -m" ); - char tmpfileTemplate[] = "XXXXXX"; - int tmpfileFd = mkstemp ( tmpfileTemplate ); + int measurementsTmpfileFd = open ( "/tmp", O_RDWR | O_TMPFILE, S_IRUSR | S_IWUSR ); - if ( tmpfileFd == -1 ) + if ( measurementsTmpfileFd == -1 ) throw exception::AlibException ( "Cannot make tmp file" ); - int measurementsFd = tmpfileFd; + int measurementsFd = measurementsTmpfileFd; + // reserve fd 5 for measurements if ( measurementsFd != 5 ) measurementsFd = dup2 ( measurementsFd, 5 ); if ( measurementsFd == -1 ) throw exception::AlibException ( "Cannot open measurements file descriptor" ); - // we dont want to see system call output, yet we want to be able to output afterwards + int inputTmpfileFd = open ( "/tmp", O_RDWR | O_TMPFILE, S_IRUSR | S_IWUSR ); + int outputTmpfileFd = open ( "/tmp", O_RDWR | O_TMPFILE, S_IRUSR | S_IWUSR ); + + if ( ( inputTmpfileFd == -1 ) || ( outputTmpfileFd == -1 ) ) + throw exception::AlibException ( "Cannot make io tmp file" ); + + // store handles to stdin, stdout, stderr, we will need to restore them later + int curStdinFd = dup ( 0 ); int curStdoutFd = dup ( 1 ); int curStderrFd = dup ( 2 ); + close ( 0 ); close ( 1 ); close ( 2 ); - int status = system ( executionPipeline.c_str ( ) ); + for ( iter = pipeline.cbegin ( ); iter != pipeline.cend ( ); ++iter ) { + if ( iter == pipeline.cbegin ( ) ) { + dup ( inputTmpfileFd ); + dup ( outputTmpfileFd ); + } else { + // swap stdin and stdout for next command + close ( 0 ); + close ( 1 ); + dup ( outputTmpfileFd ); + dup ( inputTmpfileFd ); + + // rewind previous stdout to the beginning, so the next command can read the whole file as input + lseek ( 0, SEEK_SET, 0 ); + + // destroy the contents of previous stdin and rewind to the beginning + ftruncate ( 1, 0 ); + lseek ( 1, SEEK_SET, 0 ); + } + + // leave the execution to the system shell + int status = system ( iter->c_str ( ) ); + + if ( status != 0 ) + throw exception::AlibException ( "Command exited with error" ); + + // since we are at the end of file after the output, we need to rewind + lseek ( measurementsFd, SEEK_SET, 0 ); + std::ifdstream ifd ( measurementsFd ); + + std::deque < sax::Token > tokens; + sax::SaxParseInterface::parseStream ( ifd, tokens ); + measurementsOutputTokens.emplace_back ( std::move ( tokens ) ); + + // destroy the contents and rewind for another commands measurements + ftruncate ( measurementsFd, 0 ); + lseek ( measurementsFd, SEEK_SET, 0 ); + } + + // close tmp files, which results in their deletion + if ( measurementsFd != measurementsTmpfileFd ) + close ( measurementsTmpfileFd ); + + close ( measurementsFd ); + close ( inputTmpfileFd ); + close ( outputTmpfileFd ); + // restore our main stdin, stdout, stderr + close ( 0 ); + close ( 1 ); + close ( 2 ); + + dup ( curStdinFd ); dup ( curStdoutFd ); dup ( curStderrFd ); - std::cout << executionPipeline << ' ' << status << ' ' << tmpfileFd << ' ' << tmpfileTemplate << std::endl; + std::cout << measurementsOutputTokens.size ( ) << std::endl; + + for ( auto & tokens : measurementsOutputTokens ) { + for ( auto & token : tokens ) + std::cout << token << std::endl; + + std::cout << std::endl; + } } } -- GitLab