aboutsummaryrefslogtreecommitdiffhomepage
path: root/builder/jobs.go
blob: 116887461e540d0a79c7a47336130a92c8d7574d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package builder

// This file implements a job runner for the compiler, which runs jobs in
// parallel while taking care of dependencies.

import (
	"container/heap"
	"errors"
	"fmt"
	"runtime"
	"sort"
	"strings"
	"time"
)

// Set to true to enable logging in the job runner. This may help to debug
// concurrency or performance issues.
const jobRunnerDebug = false

// compileJob is a single compiler job, comparable to a single Makefile target.
// It is used to orchestrate various compiler tasks that can be run in parallel
// but that have dependencies and thus have limitations in how they can be run.
type compileJob struct {
	description  string // description, only used for logging
	dependencies []*compileJob
	result       string // result (path)
	run          func(*compileJob) (err error)
	err          error         // error if finished
	duration     time.Duration // how long it took to run this job (only set after finishing)
}

// dummyCompileJob returns a new *compileJob that produces an output without
// doing anything. This can be useful where a *compileJob producing an output is
// expected but nothing needs to be done, for example for a load from a cache.
func dummyCompileJob(result string) *compileJob {
	return &compileJob{
		description: "<dummy>",
		result:      result,
	}
}

// runJobs runs the indicated job and all its dependencies. For every job, all
// the dependencies are run first. It returns the error of the first job that
// fails.
// It runs all jobs in the order of the dependencies slice, depth-first.
// Therefore, if some jobs are preferred to run before others, they should be
// ordered as such in the job dependencies.
func runJobs(job *compileJob, sema chan struct{}) error {
	if sema == nil {
		// Have a default, if the semaphore isn't set. This is useful for tests.
		sema = make(chan struct{}, runtime.NumCPU())
	}
	if cap(sema) == 0 {
		return errors.New("cannot run 0 jobs at a time")
	}

	// Create a slice of jobs to run, where all dependencies are run in order.
	jobs := []*compileJob{}
	addedJobs := map[*compileJob]struct{}{}
	var addJobs func(*compileJob)
	addJobs = func(job *compileJob) {
		if _, ok := addedJobs[job]; ok {
			return
		}
		for _, dep := range job.dependencies {
			addJobs(dep)
		}
		jobs = append(jobs, job)
		addedJobs[job] = struct{}{}
	}
	addJobs(job)

	waiting := make(map[*compileJob]map[*compileJob]struct{}, len(jobs))
	dependents := make(map[*compileJob][]*compileJob, len(jobs))
	compileJobs := make(map[*compileJob]int)
	var ready intHeap
	for i, job := range jobs {
		compileJobs[job] = i
		if len(job.dependencies) == 0 {
			// This job is ready to run.
			ready.Push(i)
			continue
		}

		// Construct a map for dependencies which the job is currently waiting on.
		waitDeps := make(map[*compileJob]struct{})
		waiting[job] = waitDeps

		// Add the job to the dependents list of each dependency.
		for _, dep := range job.dependencies {
			dependents[dep] = append(dependents[dep], job)
			waitDeps[dep] = struct{}{}
		}
	}

	// Create a channel to accept notifications of completion.
	doneChan := make(chan *compileJob)

	// Send each job in the jobs slice to a worker, taking care of job dependencies.
	numRunningJobs := 0
	var totalTime time.Duration
	start := time.Now()
	for len(ready.IntSlice) > 0 || numRunningJobs != 0 {
		var completed *compileJob
		if len(ready.IntSlice) > 0 {
			select {
			case sema <- struct{}{}:
				// Start a job.
				job := jobs[heap.Pop(&ready).(int)]
				if jobRunnerDebug {
					fmt.Println("## start:   ", job.description)
				}
				go runJob(job, doneChan)
				numRunningJobs++
				continue

			case completed = <-doneChan:
				// A job completed.
			}
		} else {
			// Wait for a job to complete.
			completed = <-doneChan
		}
		numRunningJobs--
		<-sema
		if jobRunnerDebug {
			fmt.Println("## finished:", completed.description, "(time "+completed.duration.String()+")")
		}
		if completed.err != nil {
			// Wait for any current jobs to finish.
			for numRunningJobs != 0 {
				<-doneChan
				numRunningJobs--
			}

			// The build failed.
			return completed.err
		}

		// Update total run time.
		totalTime += completed.duration

		// Update dependent jobs.
		for _, j := range dependents[completed] {
			wait := waiting[j]
			delete(wait, completed)
			if len(wait) == 0 {
				// This job is now ready to run.
				ready.Push(compileJobs[j])
				delete(waiting, j)
			}
		}
	}
	if len(waiting) != 0 {
		// There is a dependency cycle preventing some jobs from running.
		return errDependencyCycle{waiting}
	}

	// Some statistics, if debugging.
	if jobRunnerDebug {
		// Total duration of running all jobs.
		duration := time.Since(start)
		fmt.Println("## total:   ", duration)

		// The individual time of each job combined. On a multicore system, this
		// should be lower than the total above.
		fmt.Println("## job sum: ", totalTime)
	}

	return nil
}

type errDependencyCycle struct {
	waiting map[*compileJob]map[*compileJob]struct{}
}

func (err errDependencyCycle) Error() string {
	waits := make([]string, 0, len(err.waiting))
	for j, wait := range err.waiting {
		deps := make([]string, 0, len(wait))
		for dep := range wait {
			deps = append(deps, dep.description)
		}
		sort.Strings(deps)

		waits = append(waits, fmt.Sprintf("\t%s is waiting for [%s]",
			j.description, strings.Join(deps, ", "),
		))
	}
	sort.Strings(waits)
	return "deadlock:\n" + strings.Join(waits, "\n")
}

type intHeap struct {
	sort.IntSlice
}

func (h *intHeap) Push(x interface{}) {
	h.IntSlice = append(h.IntSlice, x.(int))
}

func (h *intHeap) Pop() interface{} {
	x := h.IntSlice[len(h.IntSlice)-1]
	h.IntSlice = h.IntSlice[:len(h.IntSlice)-1]
	return x
}

// runJob runs a compile job and notifies doneChan of completion.
func runJob(job *compileJob, doneChan chan *compileJob) {
	start := time.Now()
	if job.run != nil {
		err := job.run(job)
		if err != nil {
			job.err = err
		}
	}
	job.duration = time.Since(start)
	doneChan <- job
}