From 941b5af3a224256fa7a7b5bb4f5b0dd64676d498 Mon Sep 17 00:00:00 2001 From: Adriaan de Groot Date: Wed, 19 Aug 2020 12:54:40 +0200 Subject: [PATCH] [libcalamares] Rip out the guts of job-queue-running - compute weights and accumulations beforehand - mutex-lock structures so you can enqueue while running jobs - simplify progress reporting calculations - doesn't actually run any jobs --- src/libcalamares/JobQueue.cpp | 179 ++++++++++++++++++---------------- src/libcalamares/JobQueue.h | 1 - 2 files changed, 96 insertions(+), 84 deletions(-) diff --git a/src/libcalamares/JobQueue.cpp b/src/libcalamares/JobQueue.cpp index b78ba32dc..491e08f7c 100644 --- a/src/libcalamares/JobQueue.cpp +++ b/src/libcalamares/JobQueue.cpp @@ -28,11 +28,34 @@ #include "Job.h" #include "utils/Logger.h" +#include +#include #include namespace Calamares { +struct WeightedJob +{ + /** @brief Cumulative weight **before** this job starts + * + * This is calculated as jobs come in. + */ + double cumulative = 0.0; + /** @brief Weight of the job within the module's jobs + * + * When a list of jobs is added from a particular module, + * the jobs are weighted relative to that module's overall weight + * **and** the other jobs in the list, so that each job + * gets its share: + * ( job-weight / total-job-weight ) * module-weight + */ + double weight = 0.0; + + job_ptr job; +}; +using WeightedJobList = QList< WeightedJob >; + class JobThread : public QThread { public: @@ -45,106 +68,99 @@ public: virtual ~JobThread() override; - void setJobs( JobList&& jobs ) + void finalize() + { + Q_ASSERT( m_runningJobs->isEmpty() ); + QMutexLocker qlock( &m_enqueMutex ); + QMutexLocker rlock( &m_runMutex ); + std::swap( m_runningJobs, m_queuedJobs ); + m_overallQueueWeight + = m_runningJobs->isEmpty() ? 0.0 : ( m_runningJobs->last().cumulative + m_runningJobs->last().weight ); + if ( m_overallQueueWeight < 1 ) + { + m_overallQueueWeight = 1.0; + } + } + + void enqueue( int moduleWeight, const JobList& jobs ) { - m_jobs = jobs; + QMutexLocker qlock( &m_enqueMutex ); + + double cumulative + = m_queuedJobs->isEmpty() ? 0.0 : ( m_queuedJobs->last().cumulative + m_queuedJobs->last().weight ); - qreal totalJobsWeight = 0.0; - for ( auto job : m_jobs ) + double totalJobWeight = std::accumulate( jobs.cbegin(), jobs.cend(), 0.0, []( double total, const job_ptr& j ) { + return total + j->getJobWeight(); + } ); + if ( totalJobWeight < 1 ) { - totalJobsWeight += job->getJobWeight(); + totalJobWeight = 1.0; } - for ( auto job : m_jobs ) + + for ( const auto& j : jobs ) { - qreal jobWeight = qreal( job->getJobWeight() / totalJobsWeight ); - m_jobWeights.append( jobWeight ); + double jobContribution = ( j->getJobWeight() / totalJobWeight ) * moduleWeight; + m_queuedJobs->append( WeightedJob { cumulative, jobContribution, j } ); + cumulative += jobContribution; } } void run() override { - bool anyFailed = false; - QString message; - QString details; + QMutexLocker rlock( &m_runMutex ); + bool failureEncountered = false; m_jobIndex = 0; - for ( auto job : m_jobs ) + for ( const auto& jobitem : *m_runningJobs ) { - if ( anyFailed && !job->isEmergency() ) + if ( failureEncountered && !jobitem.job->isEmergency() ) { - cDebug() << "Skipping non-emergency job" << job->prettyName(); - ++m_jobIndex; - continue; + cDebug() << "Skipping non-emergency job" << jobitem.job->prettyName(); } - - emitProgress(); - cDebug() << "Starting" << ( anyFailed ? "EMERGENCY JOB" : "job" ) << job->prettyName() << "(there are" - << ( m_jobs.count() - m_jobIndex ) << "left)"; - connect( job.data(), &Job::progress, this, &JobThread::emitProgress ); - JobResult result = job->exec(); - if ( !anyFailed && !result ) + else { - anyFailed = true; - message = result.message(); - details = result.details(); + jobProgress( 0.0 ); // 0% for *this job* + cDebug() << "Starting" << ( failureEncountered ? "EMERGENCY JOB" : "job" ) << jobitem.job->prettyName() + << '(' << ( m_jobIndex + 1 ) << '/' << m_runningJobs->count() << ')'; + jobProgress( 1.0 ); // 100% for *this job* } - emitProgress( 1.0 ); - ++m_jobIndex; + m_jobIndex++; } - if ( anyFailed ) - { - emitFailed( message, details ); - } - else - { - emitProgress(); - } - emitFinished(); } -private: - JobList m_jobs; - QList< qreal > m_jobWeights; - JobQueue* m_queue; - int m_jobIndex; - - void emitProgress( qreal jobPercent = 0 ) + void jobProgress( double percentage ) const { - // Make sure jobPercent is reasonable, in case a job messed up its - // percentage computations. - jobPercent = qBound( qreal( 0 ), jobPercent, qreal( 1 ) ); + percentage = qBound( 0.0, percentage, 1.0 ); - int jobCount = m_jobs.size(); - QString message = m_jobIndex < jobCount ? m_jobs.at( m_jobIndex )->prettyStatusMessage() : tr( "Done" ); + QString message; + double progress = 0.0; + if ( m_jobIndex < m_runningJobs->count() ) + { - qreal percent = 1.0; // Pretend we're done, since the if will reset it - if ( m_jobIndex < jobCount ) + const auto& jobitem = m_runningJobs->at( m_jobIndex ); + progress = ( jobitem.cumulative + jobitem.weight * percentage ) / m_overallQueueWeight; + message = jobitem.job->prettyStatusMessage(); + } + else { - qreal cumulativeProgress = 0.0; - for ( auto jobWeight : m_jobWeights.mid( 0, m_jobIndex ) ) - { - cumulativeProgress += jobWeight; - } - percent = cumulativeProgress + ( ( m_jobWeights.at( m_jobIndex ) ) * jobPercent ); - - Logger::CDebug( Logger::LOGVERBOSE ) - << "[JOBQUEUE]: Progress for Job[" << m_jobIndex << "]: " << ( jobPercent * 100 ) << "% completed"; - Logger::CDebug( Logger::LOGVERBOSE ) - << "[JOBQUEUE]: Progress Overall: " << ( cumulativeProgress * 100 ) << "% (accumulated) + " - << ( ( ( m_jobWeights.at( m_jobIndex ) ) * jobPercent ) * 100 ) - << "% (this job) = " << ( percent * 100 ) << "% (total)"; + progress = 1.0; + message = tr( "Done" ); } QMetaObject::invokeMethod( - m_queue, "progress", Qt::QueuedConnection, Q_ARG( qreal, percent ), Q_ARG( QString, message ) ); + m_queue, "progress", Qt::QueuedConnection, Q_ARG( double, progress ), Q_ARG( QString, message ) ); } - void emitFailed( const QString& message, const QString& details ) - { - QMetaObject::invokeMethod( - m_queue, "failed", Qt::QueuedConnection, Q_ARG( QString, message ), Q_ARG( QString, details ) ); - } - void emitFinished() { QMetaObject::invokeMethod( m_queue, "finish", Qt::QueuedConnection ); } +private: + QMutex m_runMutex; + QMutex m_enqueMutex; + + std::unique_ptr< WeightedJobList > m_runningJobs = std::make_unique< WeightedJobList >(); + std::unique_ptr< WeightedJobList > m_queuedJobs = std::make_unique< WeightedJobList >(); + + JobQueue* m_queue; + int m_jobIndex = 0; ///< Index into m_runningJobs + double m_overallQueueWeight = 0.0; ///< cumulation when **all** the jobs are done }; JobThread::~JobThread() {} @@ -152,7 +168,6 @@ JobThread::~JobThread() {} JobQueue* JobQueue::s_instance = nullptr; - JobQueue* JobQueue::instance() { @@ -160,13 +175,6 @@ JobQueue::instance() } -GlobalStorage* -JobQueue::globalStorage() const -{ - return m_storage; -} - - JobQueue::JobQueue( QObject* parent ) : QObject( parent ) , m_thread( new JobThread( this ) ) @@ -197,8 +205,7 @@ void JobQueue::start() { Q_ASSERT( !m_thread->isRunning() ); - m_thread->setJobs( std::move( m_jobs ) ); - m_jobs.clear(); + m_thread->finalize(); m_finished = false; m_thread->start(); } @@ -208,8 +215,8 @@ void JobQueue::enqueue( int moduleWeight, const JobList& jobs ) { Q_ASSERT( !m_thread->isRunning() ); - m_jobs.append( jobs ); - emit queueChanged( m_jobs ); + m_thread->enqueue( moduleWeight, jobs ); + emit queueChanged( jobs ); // FIXME: bogus } void @@ -219,4 +226,10 @@ JobQueue::finish() emit finished(); } +GlobalStorage* +JobQueue::globalStorage() const +{ + return m_storage; +} + } // namespace Calamares diff --git a/src/libcalamares/JobQueue.h b/src/libcalamares/JobQueue.h index 3847b4667..31b5407ef 100644 --- a/src/libcalamares/JobQueue.h +++ b/src/libcalamares/JobQueue.h @@ -66,7 +66,6 @@ signals: private: static JobQueue* s_instance; - JobList m_jobs; JobThread* m_thread; GlobalStorage* m_storage; bool m_finished = true; ///< Initially, not running