singleflight.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. // Copyright 2013 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package singleflight provides a duplicate function call suppression
  5. // mechanism.
  6. package singleflight // import "golang.org/x/sync/singleflight"
  7. import (
  8. "bytes"
  9. "errors"
  10. "fmt"
  11. "runtime"
  12. "runtime/debug"
  13. "sync"
  14. )
  15. // errGoexit indicates the runtime.Goexit was called in
  16. // the user given function.
  17. var errGoexit = errors.New("runtime.Goexit was called")
  18. // A panicError is an arbitrary value recovered from a panic
  19. // with the stack trace during the execution of given function.
  20. type panicError struct {
  21. value interface{}
  22. stack []byte
  23. }
  24. // Error implements error interface.
  25. func (p *panicError) Error() string {
  26. return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
  27. }
  28. func newPanicError(v interface{}) error {
  29. stack := debug.Stack()
  30. // The first line of the stack trace is of the form "goroutine N [status]:"
  31. // but by the time the panic reaches Do the goroutine may no longer exist
  32. // and its status will have changed. Trim out the misleading line.
  33. if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
  34. stack = stack[line+1:]
  35. }
  36. return &panicError{value: v, stack: stack}
  37. }
  38. // call is an in-flight or completed singleflight.Do call
  39. type call struct {
  40. wg sync.WaitGroup
  41. // These fields are written once before the WaitGroup is done
  42. // and are only read after the WaitGroup is done.
  43. val interface{}
  44. err error
  45. // These fields are read and written with the singleflight
  46. // mutex held before the WaitGroup is done, and are read but
  47. // not written after the WaitGroup is done.
  48. dups int
  49. chans []chan<- Result
  50. }
  51. // Group represents a class of work and forms a namespace in
  52. // which units of work can be executed with duplicate suppression.
  53. type Group struct {
  54. mu sync.Mutex // protects m
  55. m map[string]*call // lazily initialized
  56. }
  57. // Result holds the results of Do, so they can be passed
  58. // on a channel.
  59. type Result struct {
  60. Val interface{}
  61. Err error
  62. Shared bool
  63. }
  64. // Do executes and returns the results of the given function, making
  65. // sure that only one execution is in-flight for a given key at a
  66. // time. If a duplicate comes in, the duplicate caller waits for the
  67. // original to complete and receives the same results.
  68. // The return value shared indicates whether v was given to multiple callers.
  69. func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
  70. g.mu.Lock()
  71. if g.m == nil {
  72. g.m = make(map[string]*call)
  73. }
  74. if c, ok := g.m[key]; ok {
  75. c.dups++
  76. g.mu.Unlock()
  77. c.wg.Wait()
  78. if e, ok := c.err.(*panicError); ok {
  79. panic(e)
  80. } else if c.err == errGoexit {
  81. runtime.Goexit()
  82. }
  83. return c.val, c.err, true
  84. }
  85. c := new(call)
  86. c.wg.Add(1)
  87. g.m[key] = c
  88. g.mu.Unlock()
  89. g.doCall(c, key, fn)
  90. return c.val, c.err, c.dups > 0
  91. }
  92. // DoChan is like Do but returns a channel that will receive the
  93. // results when they are ready.
  94. //
  95. // The returned channel will not be closed.
  96. func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
  97. ch := make(chan Result, 1)
  98. g.mu.Lock()
  99. if g.m == nil {
  100. g.m = make(map[string]*call)
  101. }
  102. if c, ok := g.m[key]; ok {
  103. c.dups++
  104. c.chans = append(c.chans, ch)
  105. g.mu.Unlock()
  106. return ch
  107. }
  108. c := &call{chans: []chan<- Result{ch}}
  109. c.wg.Add(1)
  110. g.m[key] = c
  111. g.mu.Unlock()
  112. go g.doCall(c, key, fn)
  113. return ch
  114. }
  115. // doCall handles the single call for a key.
  116. func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
  117. normalReturn := false
  118. recovered := false
  119. // use double-defer to distinguish panic from runtime.Goexit,
  120. // more details see https://golang.org/cl/134395
  121. defer func() {
  122. // the given function invoked runtime.Goexit
  123. if !normalReturn && !recovered {
  124. c.err = errGoexit
  125. }
  126. g.mu.Lock()
  127. defer g.mu.Unlock()
  128. c.wg.Done()
  129. if g.m[key] == c {
  130. delete(g.m, key)
  131. }
  132. if e, ok := c.err.(*panicError); ok {
  133. // In order to prevent the waiting channels from being blocked forever,
  134. // needs to ensure that this panic cannot be recovered.
  135. if len(c.chans) > 0 {
  136. go panic(e)
  137. select {} // Keep this goroutine around so that it will appear in the crash dump.
  138. } else {
  139. panic(e)
  140. }
  141. } else if c.err == errGoexit {
  142. // Already in the process of goexit, no need to call again
  143. } else {
  144. // Normal return
  145. for _, ch := range c.chans {
  146. ch <- Result{c.val, c.err, c.dups > 0}
  147. }
  148. }
  149. }()
  150. func() {
  151. defer func() {
  152. if !normalReturn {
  153. // Ideally, we would wait to take a stack trace until we've determined
  154. // whether this is a panic or a runtime.Goexit.
  155. //
  156. // Unfortunately, the only way we can distinguish the two is to see
  157. // whether the recover stopped the goroutine from terminating, and by
  158. // the time we know that, the part of the stack trace relevant to the
  159. // panic has been discarded.
  160. if r := recover(); r != nil {
  161. c.err = newPanicError(r)
  162. }
  163. }
  164. }()
  165. c.val, c.err = fn()
  166. normalReturn = true
  167. }()
  168. if !normalReturn {
  169. recovered = true
  170. }
  171. }
  172. // Forget tells the singleflight to forget about a key. Future calls
  173. // to Do for this key will call the function rather than waiting for
  174. // an earlier call to complete.
  175. func (g *Group) Forget(key string) {
  176. g.mu.Lock()
  177. delete(g.m, key)
  178. g.mu.Unlock()
  179. }