-
Notifications
You must be signed in to change notification settings - Fork 29
/
cache.go
141 lines (122 loc) · 3.86 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package starlight
import (
"fmt"
"sync"
"sync/atomic"
"unsafe"
"go.starlark.net/starlark"
)
// The following code is copied from the starlark-go repo,
// https://go.starlark.net/starlark and is Copyright 2017 the Bazel authors,
// with a BSD 3-clause license (see the LICENSE file in that repo).
// cache is a concurrency-safe, duplicate-suppressing,
// non-blocking cache of the doLoad function.
// See Section 9.7 of gopl.io for an explanation of this structure.
// It also features online deadlock (load cycle) detection.
type cache struct {
cacheMu sync.Mutex
cache map[string]*entry
globals starlark.StringDict
readFile func(s string) ([]byte, error)
}
type entry struct {
owner unsafe.Pointer // a *cycleChecker; see cycleCheck
globals starlark.StringDict
err error
ready chan struct{}
}
func (c *cache) Load(module string) (starlark.StringDict, error) {
return c.get(new(cycleChecker), module)
}
func (c *cache) remove(module string) {
c.cacheMu.Lock()
delete(c.cache, module)
c.cacheMu.Unlock()
}
func (c *cache) reset() {
c.cacheMu.Lock()
c.cache = make(map[string]*entry)
c.cacheMu.Unlock()
}
// get loads and returns an entry (if not already loaded).
func (c *cache) get(cc *cycleChecker, module string) (starlark.StringDict, error) {
c.cacheMu.Lock()
e := c.cache[module]
if e != nil {
c.cacheMu.Unlock()
// Some other goroutine is getting this module.
// Wait for it to become ready.
// Detect load cycles to avoid deadlocks.
if err := cycleCheck(e, cc); err != nil {
return nil, err
}
cc.setWaitsFor(e)
<-e.ready
cc.setWaitsFor(nil)
} else {
// First request for this module.
e = &entry{ready: make(chan struct{})}
c.cache[module] = e
c.cacheMu.Unlock()
e.setOwner(cc)
e.globals, e.err = c.doLoad(cc, module)
e.setOwner(nil)
// Broadcast that the entry is now ready.
close(e.ready)
}
return e.globals, e.err
}
func (c *cache) doLoad(cc *cycleChecker, module string) (starlark.StringDict, error) {
thread := &starlark.Thread{
Print: func(_ *starlark.Thread, msg string) { fmt.Println(msg) },
Load: func(_ *starlark.Thread, module string) (starlark.StringDict, error) {
// Tunnel the cycle-checker state for this "thread of loading".
return c.get(cc, module)
},
}
b, err := c.readFile(module)
if err != nil {
return nil, err
}
return starlark.ExecFile(thread, module, b, c.globals)
}
// -- concurrent cycle checking --
// A cycleChecker is used for concurrent deadlock detection.
// Each top-level call to Load creates its own cycleChecker,
// which is passed to all recursive calls it makes.
// It corresponds to a logical thread in the deadlock detection literature.
type cycleChecker struct {
waitsFor unsafe.Pointer // an *entry; see cycleCheck
}
func (cc *cycleChecker) setWaitsFor(e *entry) {
atomic.StorePointer(&cc.waitsFor, unsafe.Pointer(e))
}
func (e *entry) setOwner(cc *cycleChecker) {
atomic.StorePointer(&e.owner, unsafe.Pointer(cc))
}
// cycleCheck reports whether there is a path in the waits-for graph
// from resource 'e' to thread 'me'.
//
// The waits-for graph (WFG) is a bipartite graph whose nodes are
// alternately of type entry and cycleChecker. Each node has at most
// one outgoing edge. An entry has an "owner" edge to a cycleChecker
// while it is being readied by that cycleChecker, and a cycleChecker
// has a "waits-for" edge to an entry while it is waiting for that entry
// to become ready.
//
// Before adding a waits-for edge, the cache checks whether the new edge
// would form a cycle. If so, this indicates that the load graph is
// cyclic and that the following wait operation would deadlock.
func cycleCheck(e *entry, me *cycleChecker) error {
for e != nil {
cc := (*cycleChecker)(atomic.LoadPointer(&e.owner))
if cc == nil {
break
}
if cc == me {
return fmt.Errorf("cycle in load graph")
}
e = (*entry)(atomic.LoadPointer(&cc.waitsFor))
}
return nil
}