Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
MeasurementProvisioner.cpp 9.07 KiB
/*
 * Author: Radovan Cerveny
 */

#include "MeasurementProvisioner.hpp"
#include "exception/CommonException.h"
#include "sax/SaxParseInterface.h"
#include <wait.h>
#include <iostream>

#include "MeasurementProvisionerUtils.hpp"

using namespace std;

namespace measurements {

MeasurementProvisionerResults MeasurementProvisioner::runConfiguration ( const MeasurementProvisionerConfiguration & cfg ) {

	prepareEnvironment ( cfg );

	clog << "Preparing data ..." << flush;

	MPInputData mpiData ( cfg );

	clog << " done!" << endl;

	MeasurementProvisionerResults mpr;

	int inputCounter = 1;
	int inputCount = mpiData.getSubstitutionMaps ( ).size ( );

	for ( const MPSubstitutionMap & substitutionMap : mpiData.getSubstitutionMaps ( ) ) {
		MPRInputResult mprir;

		for ( const auto & mpsmPair : substitutionMap )
			mprir.inputs.push_back ( { mpsmPair.first, mpiData.getMPIDHandle ( mpsmPair.second ).alias } );

		if ( inputCounter != 1 )
			clog << endl << endl;

		clog << "+" << endl;
		clog << "| Input [" << inputCounter++ << "/" << inputCount << "]: " << endl;

		for ( const auto & mpsmPair : substitutionMap )
			clog << "| \t$" << mpsmPair.first << " -> " << mpiData.getMPIDHandle ( mpsmPair.second ).alias << endl;

		clog << "| Pipelines:" << endl;

		int pipelineCounter = 1;
		int pipelineCount = mpiData.getSubstitutionPipelines ( ).size ( );

		for ( const MPPipeline & pipeline : mpiData.getSubstitutionPipelines ( ) ) {
			if ( pipelineCounter != 1 )
				clog << "|" << endl;

			string logFullPipeline;

			for ( MPPipeline::const_iterator mppcIter = pipeline.cbegin ( ); mppcIter != pipeline.cend ( ); ++mppcIter ) {
				if ( mppcIter != pipeline.cbegin ( ) )
					logFullPipeline += " | ";

				logFullPipeline += mppcIter->getAlias ( );
			}

			clog << "| \t[" << pipelineCounter++ << "/" << pipelineCount << "]: " << logFullPipeline << endl;
			mprir.pipelineResults.push_back ( runPipeline ( pipeline, substitutionMap, cfg ) );
		}

		clog << "+" << endl;
		mpr.inputResults.push_back ( std::move ( mprir ) );
	}

	return mpr;
}

void MeasurementProvisioner::prepareEnvironment ( const MeasurementProvisionerConfiguration & cfg ) {

	const string & workingDirectory = cfg.environment.workingDirectory;

	if ( workingDirectory.size ( ) == 0 ) return;

	list < string > workingDirectoryExpansion = MPUtils::shellExpand ( workingDirectory );

	if ( workingDirectoryExpansion.size ( ) != 1 )
		throw::exception::CommonException ( "MeasurementProvisioner: binaries directory: \"" + workingDirectory + "\" expansion failed" );

	if ( chdir ( workingDirectoryExpansion.begin ( )->c_str ( ) ) != 0 )
		throw::exception::CommonException ( "MeasurementProvisioner: chdir to binaries directory: \"" + workingDirectory + "\" failed" );
}

MPRPipelineResult MeasurementProvisioner::runPipeline ( const MPPipeline & pipeline, const MPSubstitutionMap & substitutionMap, const MeasurementProvisionerConfiguration & cfg ) {

	 // setup environment for pipeline running
	PipelineRunnerEnvironment pre;

	ofdstream ofdlog ( pre.stderrFd );

	vector < vector < MeasurementResults > > commandMeasurementSubResults;

	MPRPipelineResult pipelineFinalResults = MPRPipelineResult ( );

	 // we repeat the pipeline measurement several times, then aggregate the results
	for ( int iteration = 0; iteration < cfg.environment.pipelineIterations; ++iteration ) {

		ofdlog << "| \titeration: \t" << iteration + 1 << endl;

		vector < MeasurementResults > pipelineMeasurementResults;

		 // run one full pipeline
		pre.commandFdInit ( );

		for ( MPPipeline::const_iterator mppcIter = pipeline.cbegin ( ); mppcIter != pipeline.cend ( ); ++mppcIter ) {
			if ( mppcIter != pipeline.cbegin ( ) )
				pre.commandFdSwap ( );

			string commandToRun = mppcIter->substitute ( substitutionMap );

			 // execute by the system shell!
			int status = system ( commandToRun.c_str ( ) );

			 // if there was an error during execution, we halt the pipeline
			if ( WEXITSTATUS ( status ) != 0 ) {
				pipelineFinalResults.pipelineStatus.exitCode = WEXITSTATUS ( status );
				pipelineFinalResults.pipelineStatus.errorOrigin = mppcIter->getRawCommand ( );
				pipelineFinalResults.pipelineStatus.errorValue	= pre.retrievePipelineError ( );
				break;
			}

			if ( mppcIter->getMeasure ( ) )
				pipelineMeasurementResults.push_back ( pre.retrieveMeasurementResults ( ) );
		}

		 // reset file descriptors for next command
		pre.commandFdEnd ( );

		 // if there was an error, we halt the execution of all iterations and report the error
		if ( pipelineFinalResults.pipelineStatus.exitCode != 0 )
			break;
		commandMeasurementSubResults.push_back ( std::move ( pipelineMeasurementResults ) );
	}

	 // if everything went smoothly, we aggregate the results
	if ( pipelineFinalResults.pipelineStatus.exitCode == 0 ) {
		vector < vector < MeasurementResults > > transposedCommandMeasurementSubResults ( pipeline.size ( ) );

		for ( vector < MeasurementResults > & commandResults : commandMeasurementSubResults )
			for ( size_t i = 0; i < commandResults.size ( ); ++i )
				transposedCommandMeasurementSubResults[i].push_back ( std::move ( commandResults[i] ) );

		vector < vector < MeasurementResults > >::iterator tcmsIter = transposedCommandMeasurementSubResults.begin ( );
		MPPipeline::const_iterator mppcIter = pipeline.cbegin ( );

		for ( ; mppcIter != pipeline.cend ( ); ++mppcIter, ++tcmsIter )
			pipelineFinalResults.commandResults.push_back ( { mppcIter->getAlias ( ), MeasurementResults::aggregate ( * tcmsIter ) } );

		ofdlog << "| \tpipeline: \tOK" << endl;
	} else {
		ofdlog << "| \tpipeline: \tERROR" << endl;
	}

	return pipelineFinalResults;
}

MeasurementProvisioner::PipelineRunnerEnvironment::PipelineRunnerEnvironment ( ) {

	try {
		measurementsTmpfile = MPUtils::openShmFile ( );
	} catch ( ::exception::CommonException & ) {
		throw::exception::CommonException ( "MeasurementProvisioner: Cannot make measurements tmp file" );
	}

	measurementsFd = measurementsTmpfile.fd;

	 // reserve fd 5 for measurements
	if ( measurementsFd != 5 )
		measurementsFd = dup2 ( measurementsFd, 5 );

	if ( measurementsFd == -1 )
		throw::exception::CommonException ( "MeasurementProvisioner: Cannot open measurements file descriptor" );

	try {
		inputTmpfile  = MPUtils::openShmFile ( );
		outputTmpfile = MPUtils::openShmFile ( );
		errorTmpfile  = MPUtils::openShmFile ( );
	} catch ( ::exception::CommonException & ) {
		throw::exception::CommonException ( "MeasurementProvisioner: Cannot make io tmp files" );
	}

	 // store handles to stdin, stdout, stderr, we will need to restore them later
	stdinFd	 = dup ( 0 );
	stdoutFd = dup ( 1 );
	stderrFd = dup ( 2 );

	close ( 0 );
	close ( 1 );
	close ( 2 );
}

MeasurementProvisioner::PipelineRunnerEnvironment::~PipelineRunnerEnvironment ( ) {
	 // close tmp files, which results in their deletion
	MPUtils::closeShmFile ( measurementsTmpfile );
	MPUtils::closeShmFile ( inputTmpfile );
	MPUtils::closeShmFile ( outputTmpfile );
	MPUtils::closeShmFile ( errorTmpfile );

	if ( measurementsFd != measurementsTmpfile.fd )
		close ( measurementsFd );
	 // restore previous stdin, stdout, stderr
	close ( 0 );
	close ( 1 );
	close ( 2 );

	dup ( stdinFd );
	dup ( stdoutFd );
	dup ( stderrFd );

	close ( stdinFd );
	close ( stdoutFd );
	close ( stderrFd );
}

void MeasurementProvisioner::PipelineRunnerEnvironment::commandFdInit ( ) {
	 // sets the right file descriptors for child processes - commands
	dup ( inputTmpfile.fd );
	dup ( outputTmpfile.fd );
	dup ( errorTmpfile.fd );
}

void MeasurementProvisioner::PipelineRunnerEnvironment::commandFdSwap ( ) {
	 // swap stdin and stdout for next command
	close ( 0 );
	close ( 1 );
	if ( dup ( outputTmpfile.fd ) == -1 || dup ( inputTmpfile.fd ) == -1) {
		throw::exception::CommonException ( "MeasurementProvisioner: dup failed" );
	}

	 // rewind previous stdout to the beginning, so the next command can read the whole file as input
	lseek ( 0, 0, SEEK_SET );

	 // destroy the contents of previous stdin and rewind to the beginning
	lseek ( 1, 0, SEEK_SET );
	if(ftruncate ( 1, 0 ) == -1) {
		throw::exception::CommonException ( "MeasurementProvisioner: ftruncate failed" );
	}
}

void MeasurementProvisioner::PipelineRunnerEnvironment::commandFdEnd ( ) {
	 // closes file descriptors for another iterations
	close ( 0 );
	close ( 1 );
	close ( 2 );
}

string MeasurementProvisioner::PipelineRunnerEnvironment::retrievePipelineError ( ) {
	 // rewind stderr to read the error message
	lseek ( 2, 0, SEEK_SET );
	ifdstream errfds ( 2 );

	 // read the whole stderr
	stringstream buffer;
	buffer << errfds.rdbuf ( );

	 // destroy stderr contents
	lseek ( 2, 0, SEEK_SET );
	ftruncate ( 2, 0 );
	return buffer.str ( );
}

MeasurementResults MeasurementProvisioner::PipelineRunnerEnvironment::retrieveMeasurementResults ( ) {
	 // since we are at the end of file after the measurements, we need to rewind
	lseek ( measurementsFd, 0, SEEK_SET );

	ifdstream ifd ( measurementsFd );

	deque < sax::Token > tokens;
	sax::SaxParseInterface::parseStream ( ifd, tokens );
	 // destroy the contents and rewind for another commands measurements
	lseek ( measurementsFd, 0, SEEK_SET );
	ftruncate ( measurementsFd, 0 );

	return measurements::MeasurementResultsXml::parse ( tokens );
}

}