-
Jan Trávníček authoredJan Trávníček authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
MeasurementProvisioner.cpp 9.07 KiB
/*
* Author: Radovan Cerveny
*/
#include "MeasurementProvisioner.hpp"
#include "exception/CommonException.h"
#include "sax/SaxParseInterface.h"
#include <wait.h>
#include <iostream>
#include "MeasurementProvisionerUtils.hpp"
using namespace std;
namespace measurements {
MeasurementProvisionerResults MeasurementProvisioner::runConfiguration ( const MeasurementProvisionerConfiguration & cfg ) {
prepareEnvironment ( cfg );
clog << "Preparing data ..." << flush;
MPInputData mpiData ( cfg );
clog << " done!" << endl;
MeasurementProvisionerResults mpr;
int inputCounter = 1;
int inputCount = mpiData.getSubstitutionMaps ( ).size ( );
for ( const MPSubstitutionMap & substitutionMap : mpiData.getSubstitutionMaps ( ) ) {
MPRInputResult mprir;
for ( const auto & mpsmPair : substitutionMap )
mprir.inputs.push_back ( { mpsmPair.first, mpiData.getMPIDHandle ( mpsmPair.second ).alias } );
if ( inputCounter != 1 )
clog << endl << endl;
clog << "+" << endl;
clog << "| Input [" << inputCounter++ << "/" << inputCount << "]: " << endl;
for ( const auto & mpsmPair : substitutionMap )
clog << "| \t$" << mpsmPair.first << " -> " << mpiData.getMPIDHandle ( mpsmPair.second ).alias << endl;
clog << "| Pipelines:" << endl;
int pipelineCounter = 1;
int pipelineCount = mpiData.getSubstitutionPipelines ( ).size ( );
for ( const MPPipeline & pipeline : mpiData.getSubstitutionPipelines ( ) ) {
if ( pipelineCounter != 1 )
clog << "|" << endl;
string logFullPipeline;
for ( MPPipeline::const_iterator mppcIter = pipeline.cbegin ( ); mppcIter != pipeline.cend ( ); ++mppcIter ) {
if ( mppcIter != pipeline.cbegin ( ) )
logFullPipeline += " | ";
logFullPipeline += mppcIter->getAlias ( );
}
clog << "| \t[" << pipelineCounter++ << "/" << pipelineCount << "]: " << logFullPipeline << endl;
mprir.pipelineResults.push_back ( runPipeline ( pipeline, substitutionMap, cfg ) );
}
clog << "+" << endl;
mpr.inputResults.push_back ( std::move ( mprir ) );
}
return mpr;
}
void MeasurementProvisioner::prepareEnvironment ( const MeasurementProvisionerConfiguration & cfg ) {
const string & workingDirectory = cfg.environment.workingDirectory;
if ( workingDirectory.size ( ) == 0 ) return;
list < string > workingDirectoryExpansion = MPUtils::shellExpand ( workingDirectory );
if ( workingDirectoryExpansion.size ( ) != 1 )
throw::exception::CommonException ( "MeasurementProvisioner: binaries directory: \"" + workingDirectory + "\" expansion failed" );
if ( chdir ( workingDirectoryExpansion.begin ( )->c_str ( ) ) != 0 )
throw::exception::CommonException ( "MeasurementProvisioner: chdir to binaries directory: \"" + workingDirectory + "\" failed" );
}
MPRPipelineResult MeasurementProvisioner::runPipeline ( const MPPipeline & pipeline, const MPSubstitutionMap & substitutionMap, const MeasurementProvisionerConfiguration & cfg ) {
// setup environment for pipeline running
PipelineRunnerEnvironment pre;
ofdstream ofdlog ( pre.stderrFd );
vector < vector < MeasurementResults > > commandMeasurementSubResults;
MPRPipelineResult pipelineFinalResults = MPRPipelineResult ( );
// we repeat the pipeline measurement several times, then aggregate the results
for ( int iteration = 0; iteration < cfg.environment.pipelineIterations; ++iteration ) {
ofdlog << "| \titeration: \t" << iteration + 1 << endl;
vector < MeasurementResults > pipelineMeasurementResults;
// run one full pipeline
pre.commandFdInit ( );
for ( MPPipeline::const_iterator mppcIter = pipeline.cbegin ( ); mppcIter != pipeline.cend ( ); ++mppcIter ) {
if ( mppcIter != pipeline.cbegin ( ) )
pre.commandFdSwap ( );
string commandToRun = mppcIter->substitute ( substitutionMap );
// execute by the system shell!
int status = system ( commandToRun.c_str ( ) );
// if there was an error during execution, we halt the pipeline
if ( WEXITSTATUS ( status ) != 0 ) {
pipelineFinalResults.pipelineStatus.exitCode = WEXITSTATUS ( status );
pipelineFinalResults.pipelineStatus.errorOrigin = mppcIter->getRawCommand ( );
pipelineFinalResults.pipelineStatus.errorValue = pre.retrievePipelineError ( );
break;
}
if ( mppcIter->getMeasure ( ) )
pipelineMeasurementResults.push_back ( pre.retrieveMeasurementResults ( ) );
}
// reset file descriptors for next command
pre.commandFdEnd ( );
// if there was an error, we halt the execution of all iterations and report the error
if ( pipelineFinalResults.pipelineStatus.exitCode != 0 )
break;
commandMeasurementSubResults.push_back ( std::move ( pipelineMeasurementResults ) );
}
// if everything went smoothly, we aggregate the results
if ( pipelineFinalResults.pipelineStatus.exitCode == 0 ) {
vector < vector < MeasurementResults > > transposedCommandMeasurementSubResults ( pipeline.size ( ) );
for ( vector < MeasurementResults > & commandResults : commandMeasurementSubResults )
for ( size_t i = 0; i < commandResults.size ( ); ++i )
transposedCommandMeasurementSubResults[i].push_back ( std::move ( commandResults[i] ) );
vector < vector < MeasurementResults > >::iterator tcmsIter = transposedCommandMeasurementSubResults.begin ( );
MPPipeline::const_iterator mppcIter = pipeline.cbegin ( );
for ( ; mppcIter != pipeline.cend ( ); ++mppcIter, ++tcmsIter )
pipelineFinalResults.commandResults.push_back ( { mppcIter->getAlias ( ), MeasurementResults::aggregate ( * tcmsIter ) } );
ofdlog << "| \tpipeline: \tOK" << endl;
} else {
ofdlog << "| \tpipeline: \tERROR" << endl;
}
return pipelineFinalResults;
}
MeasurementProvisioner::PipelineRunnerEnvironment::PipelineRunnerEnvironment ( ) {
try {
measurementsTmpfile = MPUtils::openShmFile ( );
} catch ( ::exception::CommonException & ) {
throw::exception::CommonException ( "MeasurementProvisioner: Cannot make measurements tmp file" );
}
measurementsFd = measurementsTmpfile.fd;
// reserve fd 5 for measurements
if ( measurementsFd != 5 )
measurementsFd = dup2 ( measurementsFd, 5 );
if ( measurementsFd == -1 )
throw::exception::CommonException ( "MeasurementProvisioner: Cannot open measurements file descriptor" );
try {
inputTmpfile = MPUtils::openShmFile ( );
outputTmpfile = MPUtils::openShmFile ( );
errorTmpfile = MPUtils::openShmFile ( );
} catch ( ::exception::CommonException & ) {
throw::exception::CommonException ( "MeasurementProvisioner: Cannot make io tmp files" );
}
// store handles to stdin, stdout, stderr, we will need to restore them later
stdinFd = dup ( 0 );
stdoutFd = dup ( 1 );
stderrFd = dup ( 2 );
close ( 0 );
close ( 1 );
close ( 2 );
}
MeasurementProvisioner::PipelineRunnerEnvironment::~PipelineRunnerEnvironment ( ) {
// close tmp files, which results in their deletion
MPUtils::closeShmFile ( measurementsTmpfile );
MPUtils::closeShmFile ( inputTmpfile );
MPUtils::closeShmFile ( outputTmpfile );
MPUtils::closeShmFile ( errorTmpfile );
if ( measurementsFd != measurementsTmpfile.fd )
close ( measurementsFd );
// restore previous stdin, stdout, stderr
close ( 0 );
close ( 1 );
close ( 2 );
dup ( stdinFd );
dup ( stdoutFd );
dup ( stderrFd );
close ( stdinFd );
close ( stdoutFd );
close ( stderrFd );
}
void MeasurementProvisioner::PipelineRunnerEnvironment::commandFdInit ( ) {
// sets the right file descriptors for child processes - commands
dup ( inputTmpfile.fd );
dup ( outputTmpfile.fd );
dup ( errorTmpfile.fd );
}
void MeasurementProvisioner::PipelineRunnerEnvironment::commandFdSwap ( ) {
// swap stdin and stdout for next command
close ( 0 );
close ( 1 );
if ( dup ( outputTmpfile.fd ) == -1 || dup ( inputTmpfile.fd ) == -1) {
throw::exception::CommonException ( "MeasurementProvisioner: dup failed" );
}
// rewind previous stdout to the beginning, so the next command can read the whole file as input
lseek ( 0, 0, SEEK_SET );
// destroy the contents of previous stdin and rewind to the beginning
lseek ( 1, 0, SEEK_SET );
if(ftruncate ( 1, 0 ) == -1) {
throw::exception::CommonException ( "MeasurementProvisioner: ftruncate failed" );
}
}
void MeasurementProvisioner::PipelineRunnerEnvironment::commandFdEnd ( ) {
// closes file descriptors for another iterations
close ( 0 );
close ( 1 );
close ( 2 );
}
string MeasurementProvisioner::PipelineRunnerEnvironment::retrievePipelineError ( ) {
// rewind stderr to read the error message
lseek ( 2, 0, SEEK_SET );
ifdstream errfds ( 2 );
// read the whole stderr
stringstream buffer;
buffer << errfds.rdbuf ( );
// destroy stderr contents
lseek ( 2, 0, SEEK_SET );
ftruncate ( 2, 0 );
return buffer.str ( );
}
MeasurementResults MeasurementProvisioner::PipelineRunnerEnvironment::retrieveMeasurementResults ( ) {
// since we are at the end of file after the measurements, we need to rewind
lseek ( measurementsFd, 0, SEEK_SET );
ifdstream ifd ( measurementsFd );
deque < sax::Token > tokens;
sax::SaxParseInterface::parseStream ( ifd, tokens );
// destroy the contents and rewind for another commands measurements
lseek ( measurementsFd, 0, SEEK_SET );
ftruncate ( measurementsFd, 0 );
return measurements::MeasurementResultsXml::parse ( tokens );
}
}