[libcalamares] Rip out the guts of job-queue-running

- compute weights and accumulations beforehand
- mutex-lock structures so you can enqueue while running jobs
- simplify progress reporting calculations
- doesn't actually run any jobs
main
Adriaan de Groot 5 years ago
parent 08ea51a344
commit 941b5af3a2

@ -28,11 +28,34 @@
#include "Job.h" #include "Job.h"
#include "utils/Logger.h" #include "utils/Logger.h"
#include <QMutex>
#include <QMutexLocker>
#include <QThread> #include <QThread>
namespace Calamares namespace Calamares
{ {
struct WeightedJob
{
/** @brief Cumulative weight **before** this job starts
*
* This is calculated as jobs come in.
*/
double cumulative = 0.0;
/** @brief Weight of the job within the module's jobs
*
* When a list of jobs is added from a particular module,
* the jobs are weighted relative to that module's overall weight
* **and** the other jobs in the list, so that each job
* gets its share:
* ( job-weight / total-job-weight ) * module-weight
*/
double weight = 0.0;
job_ptr job;
};
using WeightedJobList = QList< WeightedJob >;
class JobThread : public QThread class JobThread : public QThread
{ {
public: public:
@ -45,106 +68,99 @@ public:
virtual ~JobThread() override; virtual ~JobThread() override;
void setJobs( JobList&& jobs ) void finalize()
{
Q_ASSERT( m_runningJobs->isEmpty() );
QMutexLocker qlock( &m_enqueMutex );
QMutexLocker rlock( &m_runMutex );
std::swap( m_runningJobs, m_queuedJobs );
m_overallQueueWeight
= m_runningJobs->isEmpty() ? 0.0 : ( m_runningJobs->last().cumulative + m_runningJobs->last().weight );
if ( m_overallQueueWeight < 1 )
{
m_overallQueueWeight = 1.0;
}
}
void enqueue( int moduleWeight, const JobList& jobs )
{ {
m_jobs = jobs; QMutexLocker qlock( &m_enqueMutex );
double cumulative
= m_queuedJobs->isEmpty() ? 0.0 : ( m_queuedJobs->last().cumulative + m_queuedJobs->last().weight );
qreal totalJobsWeight = 0.0; double totalJobWeight = std::accumulate( jobs.cbegin(), jobs.cend(), 0.0, []( double total, const job_ptr& j ) {
for ( auto job : m_jobs ) return total + j->getJobWeight();
} );
if ( totalJobWeight < 1 )
{ {
totalJobsWeight += job->getJobWeight(); totalJobWeight = 1.0;
} }
for ( auto job : m_jobs )
for ( const auto& j : jobs )
{ {
qreal jobWeight = qreal( job->getJobWeight() / totalJobsWeight ); double jobContribution = ( j->getJobWeight() / totalJobWeight ) * moduleWeight;
m_jobWeights.append( jobWeight ); m_queuedJobs->append( WeightedJob { cumulative, jobContribution, j } );
cumulative += jobContribution;
} }
} }
void run() override void run() override
{ {
bool anyFailed = false; QMutexLocker rlock( &m_runMutex );
QString message; bool failureEncountered = false;
QString details;
m_jobIndex = 0; m_jobIndex = 0;
for ( auto job : m_jobs ) for ( const auto& jobitem : *m_runningJobs )
{ {
if ( anyFailed && !job->isEmergency() ) if ( failureEncountered && !jobitem.job->isEmergency() )
{ {
cDebug() << "Skipping non-emergency job" << job->prettyName(); cDebug() << "Skipping non-emergency job" << jobitem.job->prettyName();
++m_jobIndex;
continue;
} }
else
emitProgress();
cDebug() << "Starting" << ( anyFailed ? "EMERGENCY JOB" : "job" ) << job->prettyName() << "(there are"
<< ( m_jobs.count() - m_jobIndex ) << "left)";
connect( job.data(), &Job::progress, this, &JobThread::emitProgress );
JobResult result = job->exec();
if ( !anyFailed && !result )
{ {
anyFailed = true; jobProgress( 0.0 ); // 0% for *this job*
message = result.message(); cDebug() << "Starting" << ( failureEncountered ? "EMERGENCY JOB" : "job" ) << jobitem.job->prettyName()
details = result.details(); << '(' << ( m_jobIndex + 1 ) << '/' << m_runningJobs->count() << ')';
jobProgress( 1.0 ); // 100% for *this job*
} }
emitProgress( 1.0 ); m_jobIndex++;
++m_jobIndex;
} }
if ( anyFailed )
{
emitFailed( message, details );
}
else
{
emitProgress();
}
emitFinished();
} }
private: void jobProgress( double percentage ) const
JobList m_jobs;
QList< qreal > m_jobWeights;
JobQueue* m_queue;
int m_jobIndex;
void emitProgress( qreal jobPercent = 0 )
{ {
// Make sure jobPercent is reasonable, in case a job messed up its percentage = qBound( 0.0, percentage, 1.0 );
// percentage computations.
jobPercent = qBound( qreal( 0 ), jobPercent, qreal( 1 ) );
int jobCount = m_jobs.size(); QString message;
QString message = m_jobIndex < jobCount ? m_jobs.at( m_jobIndex )->prettyStatusMessage() : tr( "Done" ); double progress = 0.0;
if ( m_jobIndex < m_runningJobs->count() )
{
qreal percent = 1.0; // Pretend we're done, since the if will reset it const auto& jobitem = m_runningJobs->at( m_jobIndex );
if ( m_jobIndex < jobCount ) progress = ( jobitem.cumulative + jobitem.weight * percentage ) / m_overallQueueWeight;
message = jobitem.job->prettyStatusMessage();
}
else
{ {
qreal cumulativeProgress = 0.0; progress = 1.0;
for ( auto jobWeight : m_jobWeights.mid( 0, m_jobIndex ) ) message = tr( "Done" );
{
cumulativeProgress += jobWeight;
}
percent = cumulativeProgress + ( ( m_jobWeights.at( m_jobIndex ) ) * jobPercent );
Logger::CDebug( Logger::LOGVERBOSE )
<< "[JOBQUEUE]: Progress for Job[" << m_jobIndex << "]: " << ( jobPercent * 100 ) << "% completed";
Logger::CDebug( Logger::LOGVERBOSE )
<< "[JOBQUEUE]: Progress Overall: " << ( cumulativeProgress * 100 ) << "% (accumulated) + "
<< ( ( ( m_jobWeights.at( m_jobIndex ) ) * jobPercent ) * 100 )
<< "% (this job) = " << ( percent * 100 ) << "% (total)";
} }
QMetaObject::invokeMethod( QMetaObject::invokeMethod(
m_queue, "progress", Qt::QueuedConnection, Q_ARG( qreal, percent ), Q_ARG( QString, message ) ); m_queue, "progress", Qt::QueuedConnection, Q_ARG( double, progress ), Q_ARG( QString, message ) );
} }
void emitFailed( const QString& message, const QString& details )
{
QMetaObject::invokeMethod(
m_queue, "failed", Qt::QueuedConnection, Q_ARG( QString, message ), Q_ARG( QString, details ) );
}
void emitFinished() { QMetaObject::invokeMethod( m_queue, "finish", Qt::QueuedConnection ); } private:
QMutex m_runMutex;
QMutex m_enqueMutex;
std::unique_ptr< WeightedJobList > m_runningJobs = std::make_unique< WeightedJobList >();
std::unique_ptr< WeightedJobList > m_queuedJobs = std::make_unique< WeightedJobList >();
JobQueue* m_queue;
int m_jobIndex = 0; ///< Index into m_runningJobs
double m_overallQueueWeight = 0.0; ///< cumulation when **all** the jobs are done
}; };
JobThread::~JobThread() {} JobThread::~JobThread() {}
@ -152,7 +168,6 @@ JobThread::~JobThread() {}
JobQueue* JobQueue::s_instance = nullptr; JobQueue* JobQueue::s_instance = nullptr;
JobQueue* JobQueue*
JobQueue::instance() JobQueue::instance()
{ {
@ -160,13 +175,6 @@ JobQueue::instance()
} }
GlobalStorage*
JobQueue::globalStorage() const
{
return m_storage;
}
JobQueue::JobQueue( QObject* parent ) JobQueue::JobQueue( QObject* parent )
: QObject( parent ) : QObject( parent )
, m_thread( new JobThread( this ) ) , m_thread( new JobThread( this ) )
@ -197,8 +205,7 @@ void
JobQueue::start() JobQueue::start()
{ {
Q_ASSERT( !m_thread->isRunning() ); Q_ASSERT( !m_thread->isRunning() );
m_thread->setJobs( std::move( m_jobs ) ); m_thread->finalize();
m_jobs.clear();
m_finished = false; m_finished = false;
m_thread->start(); m_thread->start();
} }
@ -208,8 +215,8 @@ void
JobQueue::enqueue( int moduleWeight, const JobList& jobs ) JobQueue::enqueue( int moduleWeight, const JobList& jobs )
{ {
Q_ASSERT( !m_thread->isRunning() ); Q_ASSERT( !m_thread->isRunning() );
m_jobs.append( jobs ); m_thread->enqueue( moduleWeight, jobs );
emit queueChanged( m_jobs ); emit queueChanged( jobs ); // FIXME: bogus
} }
void void
@ -219,4 +226,10 @@ JobQueue::finish()
emit finished(); emit finished();
} }
GlobalStorage*
JobQueue::globalStorage() const
{
return m_storage;
}
} // namespace Calamares } // namespace Calamares

@ -66,7 +66,6 @@ signals:
private: private:
static JobQueue* s_instance; static JobQueue* s_instance;
JobList m_jobs;
JobThread* m_thread; JobThread* m_thread;
GlobalStorage* m_storage; GlobalStorage* m_storage;
bool m_finished = true; ///< Initially, not running bool m_finished = true; ///< Initially, not running

Loading…
Cancel
Save