You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

229 lines
6.0 KiB

package builder
// This file implements a job runner for the compiler, which runs jobs in
// parallel while taking care of dependencies.
import (
"container/heap"
"errors"
"fmt"
"runtime"
"sort"
"strings"
"time"
)
// Set to true to enable logging in the job runner. This may help to debug
// concurrency or performance issues.
const jobRunnerDebug = false
type jobState uint8
const (
jobStateQueued jobState = iota // not yet running
jobStateRunning // running
jobStateFinished // finished running
)
// compileJob is a single compiler job, comparable to a single Makefile target.
// It is used to orchestrate various compiler tasks that can be run in parallel
// but that have dependencies and thus have limitations in how they can be run.
type compileJob struct {
description string // description, only used for logging
dependencies []*compileJob
result string // result (path)
run func(*compileJob) (err error)
err error // error if finished
duration time.Duration // how long it took to run this job (only set after finishing)
}
// dummyCompileJob returns a new *compileJob that produces an output without
// doing anything. This can be useful where a *compileJob producing an output is
// expected but nothing needs to be done, for example for a load from a cache.
func dummyCompileJob(result string) *compileJob {
return &compileJob{
description: "<dummy>",
result: result,
}
}
// runJobs runs the indicated job and all its dependencies. For every job, all
// the dependencies are run first. It returns the error of the first job that
// fails.
// It runs all jobs in the order of the dependencies slice, depth-first.
// Therefore, if some jobs are preferred to run before others, they should be
// ordered as such in the job dependencies.
func runJobs(job *compileJob, sema chan struct{}) error {
if sema == nil {
// Have a default, if the semaphore isn't set. This is useful for
// tests.
sema = make(chan struct{}, runtime.NumCPU())
}
if cap(sema) == 0 {
return errors.New("cannot 0 jobs at a time")
}
// Create a slice of jobs to run, where all dependencies are run in order.
jobs := []*compileJob{}
addedJobs := map[*compileJob]struct{}{}
var addJobs func(*compileJob)
addJobs = func(job *compileJob) {
if _, ok := addedJobs[job]; ok {
return
}
for _, dep := range job.dependencies {
addJobs(dep)
}
jobs = append(jobs, job)
addedJobs[job] = struct{}{}
}
addJobs(job)
waiting := make(map[*compileJob]map[*compileJob]struct{}, len(jobs))
dependents := make(map[*compileJob][]*compileJob, len(jobs))
jidx := make(map[*compileJob]int)
var ready intHeap
for i, job := range jobs {
jidx[job] = i
if len(job.dependencies) == 0 {
// This job is ready to run.
ready.Push(i)
continue
}
// Construct a map for dependencies which the job is currently waiting on.
waitDeps := make(map[*compileJob]struct{})
waiting[job] = waitDeps
// Add the job to the dependents list of each dependency.
for _, dep := range job.dependencies {
dependents[dep] = append(dependents[dep], job)
waitDeps[dep] = struct{}{}
}
}
// Create a channel to accept notifications of completion.
doneChan := make(chan *compileJob)
// Send each job in the jobs slice to a worker, taking care of job
// dependencies.
numRunningJobs := 0
var totalTime time.Duration
start := time.Now()
for len(ready.IntSlice) > 0 || numRunningJobs != 0 {
var completed *compileJob
if len(ready.IntSlice) > 0 {
select {
case sema <- struct{}{}:
// Start a job.
job := jobs[heap.Pop(&ready).(int)]
if jobRunnerDebug {
fmt.Println("## start: ", job.description)
}
go runJob(job, doneChan)
numRunningJobs++
continue
case completed = <-doneChan:
// A job completed.
}
} else {
// Wait for a job to complete.
completed = <-doneChan
}
numRunningJobs--
<-sema
if jobRunnerDebug {
fmt.Println("## finished:", job.description, "(time "+job.duration.String()+")")
}
if completed.err != nil {
// Wait for any current jobs to finish.
for numRunningJobs != 0 {
<-doneChan
numRunningJobs--
}
// The build failed.
return completed.err
}
// Update total run time.
totalTime += completed.duration
// Update dependent jobs.
for _, j := range dependents[completed] {
wait := waiting[j]
delete(wait, completed)
if len(wait) == 0 {
// This job is now ready to run.
ready.Push(jidx[j])
delete(waiting, j)
}
}
}
if len(waiting) != 0 {
// There is a dependency cycle preventing some jobs from running.
return errDependencyCycle{waiting}
}
// Some statistics, if debugging.
if jobRunnerDebug {
// Total duration of running all jobs.
duration := time.Since(start)
fmt.Println("## total: ", duration)
// The individual time of each job combined. On a multicore system, this
// should be lower than the total above.
fmt.Println("## job sum: ", totalTime)
}
return nil
}
type errDependencyCycle struct {
waiting map[*compileJob]map[*compileJob]struct{}
}
func (err errDependencyCycle) Error() string {
waits := make([]string, 0, len(err.waiting))
for j, wait := range err.waiting {
deps := make([]string, 0, len(wait))
for dep := range wait {
deps = append(deps, dep.description)
}
sort.Strings(deps)
waits = append(waits, fmt.Sprintf("\t%s is waiting for [%s]",
j.description, strings.Join(deps, ", "),
))
}
sort.Strings(waits)
return "deadlock:\n" + strings.Join(waits, "\n")
}
type intHeap struct {
sort.IntSlice
}
func (h *intHeap) Push(x interface{}) {
h.IntSlice = append(h.IntSlice, x.(int))
}
func (h *intHeap) Pop() interface{} {
x := h.IntSlice[len(h.IntSlice)-1]
h.IntSlice = h.IntSlice[:len(h.IntSlice)-1]
return x
}
// runJob runs a compile job and notifies doneChan of completion.
func runJob(job *compileJob, doneChan chan *compileJob) {
start := time.Now()
if job.run != nil {
err := job.run(job)
if err != nil {
job.err = err
}
}
job.duration = time.Since(start)
doneChan <- job
}