package builder // This file implements a job runner for the compiler, which runs jobs in // parallel while taking care of dependencies. import ( "container/heap" "errors" "fmt" "runtime" "sort" "strings" "time" ) // Set to true to enable logging in the job runner. This may help to debug // concurrency or performance issues. const jobRunnerDebug = false // compileJob is a single compiler job, comparable to a single Makefile target. // It is used to orchestrate various compiler tasks that can be run in parallel // but that have dependencies and thus have limitations in how they can be run. type compileJob struct { description string // description, only used for logging dependencies []*compileJob result string // result (path) run func(*compileJob) (err error) err error // error if finished duration time.Duration // how long it took to run this job (only set after finishing) } // dummyCompileJob returns a new *compileJob that produces an output without // doing anything. This can be useful where a *compileJob producing an output is // expected but nothing needs to be done, for example for a load from a cache. func dummyCompileJob(result string) *compileJob { return &compileJob{ description: "", result: result, } } // runJobs runs the indicated job and all its dependencies. For every job, all // the dependencies are run first. It returns the error of the first job that // fails. // It runs all jobs in the order of the dependencies slice, depth-first. // Therefore, if some jobs are preferred to run before others, they should be // ordered as such in the job dependencies. func runJobs(job *compileJob, sema chan struct{}) error { if sema == nil { // Have a default, if the semaphore isn't set. This is useful for tests. sema = make(chan struct{}, runtime.NumCPU()) } if cap(sema) == 0 { return errors.New("cannot run 0 jobs at a time") } // Create a slice of jobs to run, where all dependencies are run in order. jobs := []*compileJob{} addedJobs := map[*compileJob]struct{}{} var addJobs func(*compileJob) addJobs = func(job *compileJob) { if _, ok := addedJobs[job]; ok { return } for _, dep := range job.dependencies { addJobs(dep) } jobs = append(jobs, job) addedJobs[job] = struct{}{} } addJobs(job) waiting := make(map[*compileJob]map[*compileJob]struct{}, len(jobs)) dependents := make(map[*compileJob][]*compileJob, len(jobs)) compileJobs := make(map[*compileJob]int) var ready intHeap for i, job := range jobs { compileJobs[job] = i if len(job.dependencies) == 0 { // This job is ready to run. ready.Push(i) continue } // Construct a map for dependencies which the job is currently waiting on. waitDeps := make(map[*compileJob]struct{}) waiting[job] = waitDeps // Add the job to the dependents list of each dependency. for _, dep := range job.dependencies { dependents[dep] = append(dependents[dep], job) waitDeps[dep] = struct{}{} } } // Create a channel to accept notifications of completion. doneChan := make(chan *compileJob) // Send each job in the jobs slice to a worker, taking care of job dependencies. numRunningJobs := 0 var totalTime time.Duration start := time.Now() for len(ready.IntSlice) > 0 || numRunningJobs != 0 { var completed *compileJob if len(ready.IntSlice) > 0 { select { case sema <- struct{}{}: // Start a job. job := jobs[heap.Pop(&ready).(int)] if jobRunnerDebug { fmt.Println("## start: ", job.description) } go runJob(job, doneChan) numRunningJobs++ continue case completed = <-doneChan: // A job completed. } } else { // Wait for a job to complete. completed = <-doneChan } numRunningJobs-- <-sema if jobRunnerDebug { fmt.Println("## finished:", completed.description, "(time "+completed.duration.String()+")") } if completed.err != nil { // Wait for any current jobs to finish. for numRunningJobs != 0 { <-doneChan numRunningJobs-- } // The build failed. return completed.err } // Update total run time. totalTime += completed.duration // Update dependent jobs. for _, j := range dependents[completed] { wait := waiting[j] delete(wait, completed) if len(wait) == 0 { // This job is now ready to run. ready.Push(compileJobs[j]) delete(waiting, j) } } } if len(waiting) != 0 { // There is a dependency cycle preventing some jobs from running. return errDependencyCycle{waiting} } // Some statistics, if debugging. if jobRunnerDebug { // Total duration of running all jobs. duration := time.Since(start) fmt.Println("## total: ", duration) // The individual time of each job combined. On a multicore system, this // should be lower than the total above. fmt.Println("## job sum: ", totalTime) } return nil } type errDependencyCycle struct { waiting map[*compileJob]map[*compileJob]struct{} } func (err errDependencyCycle) Error() string { waits := make([]string, 0, len(err.waiting)) for j, wait := range err.waiting { deps := make([]string, 0, len(wait)) for dep := range wait { deps = append(deps, dep.description) } sort.Strings(deps) waits = append(waits, fmt.Sprintf("\t%s is waiting for [%s]", j.description, strings.Join(deps, ", "), )) } sort.Strings(waits) return "deadlock:\n" + strings.Join(waits, "\n") } type intHeap struct { sort.IntSlice } func (h *intHeap) Push(x interface{}) { h.IntSlice = append(h.IntSlice, x.(int)) } func (h *intHeap) Pop() interface{} { x := h.IntSlice[len(h.IntSlice)-1] h.IntSlice = h.IntSlice[:len(h.IntSlice)-1] return x } // runJob runs a compile job and notifies doneChan of completion. func runJob(job *compileJob, doneChan chan *compileJob) { start := time.Now() if job.run != nil { err := job.run(job) if err != nil { job.err = err } } job.duration = time.Since(start) doneChan <- job }