Skip to content
Snippets Groups Projects
Commit 4c4d29af authored by Radovan Červený's avatar Radovan Červený
Browse files

working aggregation, aggregating only time for now

parent e3c3f7c8
No related branches found
No related tags found
1 merge request!15BP_cervera3 - automatic measurements, processing
Showing
with 84 additions and 21 deletions
...@@ -20,4 +20,16 @@ std::ostream & operator <<( std::ostream & os, const MeasurementFrame & frame ) ...@@ -20,4 +20,16 @@ std::ostream & operator <<( std::ostream & os, const MeasurementFrame & frame )
return os; return os;
} }
   
MeasurementFrame MeasurementFrame::aggregate ( const std::vector < MeasurementFrame > & framesToAggregate ) {
MeasurementFrame aggregatedFrame ( framesToAggregate[0].name, framesToAggregate[0].type, framesToAggregate[0].parent_idx );
aggregatedFrame.sub_idxs = framesToAggregate[0].sub_idxs;
aggregatedFrame.time = TimeDataFrame::aggregate ( framesToAggregate );
aggregatedFrame.memory = MemoryDataFrame::aggregate ( framesToAggregate );
aggregatedFrame.counter = CounterDataFrame::aggregate ( framesToAggregate );
return aggregatedFrame;
}
} }
...@@ -28,6 +28,8 @@ struct MeasurementFrame { ...@@ -28,6 +28,8 @@ struct MeasurementFrame {
CounterDataFrame counter; CounterDataFrame counter;
   
MeasurementFrame ( measurements::stealth_string, measurements::Type, unsigned ); MeasurementFrame ( measurements::stealth_string, measurements::Type, unsigned );
static MeasurementFrame aggregate ( const std::vector < MeasurementFrame > & );
}; };
   
std::ostream & operator <<( std::ostream &, const MeasurementFrame & ); std::ostream & operator <<( std::ostream &, const MeasurementFrame & );
......
...@@ -97,6 +97,23 @@ void MeasurementResults::print_as_tree ( std::ostream & os, unsigned idx, std::s ...@@ -97,6 +97,23 @@ void MeasurementResults::print_as_tree ( std::ostream & os, unsigned idx, std::s
} }
} }
   
MeasurementResults MeasurementResults::aggregate ( const std::vector < MeasurementResults > & resultsToAggregate ) {
MeasurementResults aggregatedResults;
size_t frameCount = resultsToAggregate[0].frames.size ( );
for ( size_t frameIdx = 0; frameIdx < frameCount; ++frameIdx ) {
std::vector < MeasurementFrame > framesToAggregate;
for ( const MeasurementResults & measurementResults : resultsToAggregate )
framesToAggregate.push_back ( measurementResults.frames[frameIdx] );
aggregatedResults.frames.push_back ( MeasurementFrame::aggregate ( framesToAggregate ) );
}
return aggregatedResults;
}
const int MeasurementXalloc::FORMAT = std::ios::xalloc ( ); const int MeasurementXalloc::FORMAT = std::ios::xalloc ( );
   
std::ostream & operator <<( std::ostream & os, const MeasurementResults & mr ) { std::ostream & operator <<( std::ostream & os, const MeasurementResults & mr ) {
......
...@@ -29,6 +29,8 @@ struct MeasurementResults { ...@@ -29,6 +29,8 @@ struct MeasurementResults {
void print_as_list ( std::ostream & ) const; void print_as_list ( std::ostream & ) const;
void print_as_tree ( std::ostream & ) const; void print_as_tree ( std::ostream & ) const;
   
static MeasurementResults aggregate ( const std::vector < MeasurementResults > & );
private: private:
void print_as_list ( std::ostream &, unsigned ) const; void print_as_list ( std::ostream &, unsigned ) const;
void print_as_tree ( std::ostream &, unsigned, std::string &, bool ) const; void print_as_tree ( std::ostream &, unsigned, std::string &, bool ) const;
......
...@@ -47,6 +47,11 @@ void CounterDataFrame::hint ( unsigned frame_idx, measurements::stealth_vector < ...@@ -47,6 +47,11 @@ void CounterDataFrame::hint ( unsigned frame_idx, measurements::stealth_vector <
} }
} }
   
CounterDataFrame CounterDataFrame::aggregate ( const std::vector < MeasurementFrame > & framesToAggregate ) {
// dummmy aggregation
return framesToAggregate[0].counter;
}
std::ostream & operator <<( std::ostream & os, const CounterDataFrame & cdf ) { std::ostream & operator <<( std::ostream & os, const CounterDataFrame & cdf ) {
   
os << "cnts: ("; os << "cnts: (";
......
...@@ -33,6 +33,8 @@ struct CounterDataFrame { ...@@ -33,6 +33,8 @@ struct CounterDataFrame {
static void init ( unsigned, measurements::stealth_vector < MeasurementFrame > & ); static void init ( unsigned, measurements::stealth_vector < MeasurementFrame > & );
static void update ( unsigned, measurements::stealth_vector < MeasurementFrame > & ); static void update ( unsigned, measurements::stealth_vector < MeasurementFrame > & );
static void hint ( unsigned, measurements::stealth_vector < MeasurementFrame > &, CounterHint ); static void hint ( unsigned, measurements::stealth_vector < MeasurementFrame > &, CounterHint );
static CounterDataFrame aggregate ( const std::vector < MeasurementFrame > & );
}; };
   
std::ostream & operator <<( std::ostream &, const CounterDataFrame & ); std::ostream & operator <<( std::ostream &, const CounterDataFrame & );
......
...@@ -64,6 +64,11 @@ void MemoryDataFrame::hint ( unsigned frame_idx, measurements::stealth_vector < ...@@ -64,6 +64,11 @@ void MemoryDataFrame::hint ( unsigned frame_idx, measurements::stealth_vector <
} }
} }
   
MemoryDataFrame MemoryDataFrame::aggregate ( const std::vector < MeasurementFrame > & framesToAggregate ) {
// dummmy aggregation
return framesToAggregate[0].memory;
}
std::ostream & operator <<( std::ostream & os, const MemoryDataFrame & mdf ) { std::ostream & operator <<( std::ostream & os, const MemoryDataFrame & mdf ) {
os << mdf.start_heap_usage << "B shu, " << mdf.end_heap_usage << "B ehu, " << mdf.high_watermark << "B hw, " << mdf.in_frame_high_watermark << "B if_hw"; os << mdf.start_heap_usage << "B shu, " << mdf.end_heap_usage << "B ehu, " << mdf.high_watermark << "B hw, " << mdf.in_frame_high_watermark << "B if_hw";
return os; return os;
......
...@@ -21,7 +21,7 @@ struct MemoryHint { ...@@ -21,7 +21,7 @@ struct MemoryHint {
NEW, DELETE NEW, DELETE
}; };
   
Type type; Type type;
value_type size; value_type size;
}; };
   
...@@ -39,6 +39,8 @@ struct MemoryDataFrame { ...@@ -39,6 +39,8 @@ struct MemoryDataFrame {
static void init ( unsigned, measurements::stealth_vector < MeasurementFrame > & ); static void init ( unsigned, measurements::stealth_vector < MeasurementFrame > & );
static void update ( unsigned, measurements::stealth_vector < MeasurementFrame > & ); static void update ( unsigned, measurements::stealth_vector < MeasurementFrame > & );
static void hint ( unsigned, measurements::stealth_vector < MeasurementFrame > &, MemoryHint ); static void hint ( unsigned, measurements::stealth_vector < MeasurementFrame > &, MemoryHint );
static MemoryDataFrame aggregate ( const std::vector < MeasurementFrame > & );
}; };
   
std::ostream & operator <<( std::ostream &, const MemoryDataFrame & ); std::ostream & operator <<( std::ostream &, const MemoryDataFrame & );
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
   
#include <sstream> #include <sstream>
#include "../MeasurementFrame.hpp" #include "../MeasurementFrame.hpp"
#include <iostream>
   
using namespace std::chrono; using namespace std::chrono;
   
...@@ -26,6 +27,20 @@ void TimeDataFrame::update ( unsigned frame_idx, measurements::stealth_vector < ...@@ -26,6 +27,20 @@ void TimeDataFrame::update ( unsigned frame_idx, measurements::stealth_vector <
parent_frame.time.in_frame_duration -= current_frame.time.duration; parent_frame.time.in_frame_duration -= current_frame.time.duration;
} }
   
TimeDataFrame TimeDataFrame::aggregate ( const std::vector < MeasurementFrame > & framesToAggregate ) {
TimeDataFrame aggregatedTimeDataFrame { };
for ( const MeasurementFrame & frame : framesToAggregate ) {
aggregatedTimeDataFrame.duration += frame.time.duration;
aggregatedTimeDataFrame.in_frame_duration += frame.time.in_frame_duration;
}
aggregatedTimeDataFrame.duration /= framesToAggregate.size ( );
aggregatedTimeDataFrame.in_frame_duration /= framesToAggregate.size ( );
return aggregatedTimeDataFrame;
}
std::ostream & operator <<( std::ostream & os, const TimeDataFrame & tdf ) { std::ostream & operator <<( std::ostream & os, const TimeDataFrame & tdf ) {
os << tdf.duration << " dur, " << tdf.in_frame_duration << " if_dur"; os << tdf.duration << " dur, " << tdf.in_frame_duration << " if_dur";
return os; return os;
......
...@@ -24,6 +24,8 @@ struct TimeDataFrame { ...@@ -24,6 +24,8 @@ struct TimeDataFrame {
   
static void init ( unsigned, measurements::stealth_vector < MeasurementFrame > & ); static void init ( unsigned, measurements::stealth_vector < MeasurementFrame > & );
static void update ( unsigned, measurements::stealth_vector < MeasurementFrame > & ); static void update ( unsigned, measurements::stealth_vector < MeasurementFrame > & );
static TimeDataFrame aggregate ( const std::vector < MeasurementFrame > & );
}; };
   
std::ostream & operator <<( std::ostream &, const std::chrono::microseconds & ); std::ostream & operator <<( std::ostream &, const std::chrono::microseconds & );
......
...@@ -96,7 +96,7 @@ MPRPipelineResults MeasurementProvisioner::runPipeline ( const MPPipeline & pipe ...@@ -96,7 +96,7 @@ MPRPipelineResults MeasurementProvisioner::runPipeline ( const MPPipeline & pipe
   
ofdstream ofdlog ( pre.stderrFd ); ofdstream ofdlog ( pre.stderrFd );
   
vector < vector < MeasurementResults > > pipelineMeasurementSubResults; vector < vector < MeasurementResults > > commandMeasurementSubResults;
   
MPRPipelineResults pipelineFinalResults { }; MPRPipelineResults pipelineFinalResults { };
   
...@@ -138,20 +138,24 @@ MPRPipelineResults MeasurementProvisioner::runPipeline ( const MPPipeline & pipe ...@@ -138,20 +138,24 @@ MPRPipelineResults MeasurementProvisioner::runPipeline ( const MPPipeline & pipe
if ( pipelineFinalResults.pipelineStatus.exitCode != 0 ) if ( pipelineFinalResults.pipelineStatus.exitCode != 0 )
break; break;
   
pipelineMeasurementSubResults.push_back ( std::move ( pipelineMeasurementResults ) ); commandMeasurementSubResults.push_back ( std::move ( pipelineMeasurementResults ) );
} }
   
// if everything went smoothly, we aggregate the results // if everything went smoothly, we aggregate the results
if ( pipelineFinalResults.pipelineStatus.exitCode == 0 ) { if ( pipelineFinalResults.pipelineStatus.exitCode == 0 ) {
// FIXME we need proper aggregation // FIXME we need proper aggregation
vector < MeasurementResults > aggregatedMeasurementResults = pipelineMeasurementSubResults[0];
   
vector < MeasurementResults >::iterator amrIter = aggregatedMeasurementResults.begin ( ); vector < vector < MeasurementResults > > transposedCommandMeasurementSubResults ( pipeline.size ( ) );
for ( vector < MeasurementResults > & commandResults : commandMeasurementSubResults )
for ( size_t i = 0; i < commandResults.size ( ); ++i )
transposedCommandMeasurementSubResults[i].push_back ( std::move ( commandResults[i] ) );
vector < vector < MeasurementResults > >::iterator tcmsIter = transposedCommandMeasurementSubResults.begin ( );
MPPipeline::const_iterator mppcIter = pipeline.cbegin ( ); MPPipeline::const_iterator mppcIter = pipeline.cbegin ( );
   
for ( ; mppcIter != pipeline.cend ( ); ++mppcIter, ++amrIter ) { for ( ; mppcIter != pipeline.cend ( ); ++mppcIter, ++tcmsIter )
pipelineFinalResults.commandResults.push_back ( { mppcIter->getRawCommand ( ), std::move ( * amrIter ) } ); pipelineFinalResults.commandResults.push_back ( { mppcIter->getRawCommand ( ), MeasurementResults::aggregate ( * tcmsIter ) } );
}
   
ofdlog << "| \tpipeline: \tOK" << endl; ofdlog << "| \tpipeline: \tOK" << endl;
} else { } else {
......
...@@ -21,32 +21,27 @@ class MeasurementProvisioner { ...@@ -21,32 +21,27 @@ class MeasurementProvisioner {
static void prepareEnvironment ( const MeasurementProvisionerConfiguration & ); static void prepareEnvironment ( const MeasurementProvisionerConfiguration & );
static MPRPipelineResults runPipeline ( const MPPipeline &, const MPSubstitutionMap &, const MeasurementProvisionerConfiguration & ); static MPRPipelineResults runPipeline ( const MPPipeline &, const MPSubstitutionMap &, const MeasurementProvisionerConfiguration & );
   
struct PipelineRunnerEnvironment { struct PipelineRunnerEnvironment {
MPUtils::ShmFileHandle measurementsTmpfile, inputTmpfile, outputTmpfile, errorTmpfile; MPUtils::ShmFileHandle measurementsTmpfile, inputTmpfile, outputTmpfile, errorTmpfile;
   
int measurementsFd, stdinFd, stdoutFd, stderrFd; int measurementsFd, stdinFd, stdoutFd, stderrFd;
   
public: public:
PipelineRunnerEnvironment ( );
~PipelineRunnerEnvironment ( );
   
PipelineRunnerEnvironment(); void commandFdInit ( );
~PipelineRunnerEnvironment(); void commandFdSwap ( );
void commandFdEnd ( );
void commandFdInit();
void commandFdSwap();
void commandFdEnd();
   
std::string retrievePipelineError(); std::string retrievePipelineError ( );
MeasurementResults retrieveMeasurementResults(); MeasurementResults retrieveMeasurementResults ( );
}; };
   
public: public:
static MeasurementProvisionerResults runConfiguration ( const MeasurementProvisionerConfiguration & ); static MeasurementProvisionerResults runConfiguration ( const MeasurementProvisionerConfiguration & );
}; };
   
} }
   
#endif /* MEASUREMENT_PROVISIONER_HPP_ */ #endif /* MEASUREMENT_PROVISIONER_HPP_ */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment