chain.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package cron
  2. import (
  3. "fmt"
  4. "runtime"
  5. "sync"
  6. "time"
  7. )
  8. // JobWrapper decorates the given Job with some behavior.
  9. type JobWrapper func(Job) Job
  10. // Chain is a sequence of JobWrappers that decorates submitted jobs with
  11. // cross-cutting behaviors like logging or synchronization.
  12. type Chain struct {
  13. wrappers []JobWrapper
  14. }
  15. // NewChain returns a Chain consisting of the given JobWrappers.
  16. func NewChain(c ...JobWrapper) Chain {
  17. return Chain{c}
  18. }
  19. // Then decorates the given job with all JobWrappers in the chain.
  20. //
  21. // This:
  22. // NewChain(m1, m2, m3).Then(job)
  23. // is equivalent to:
  24. // m1(m2(m3(job)))
  25. func (c Chain) Then(j Job) Job {
  26. for i := range c.wrappers {
  27. j = c.wrappers[len(c.wrappers)-i-1](j)
  28. }
  29. return j
  30. }
  31. // Recover panics in wrapped jobs and log them with the provided logger.
  32. func Recover(logger Logger) JobWrapper {
  33. return func(j Job) Job {
  34. return FuncJob(func() {
  35. defer func() {
  36. if r := recover(); r != nil {
  37. const size = 64 << 10
  38. buf := make([]byte, size)
  39. buf = buf[:runtime.Stack(buf, false)]
  40. err, ok := r.(error)
  41. if !ok {
  42. err = fmt.Errorf("%v", r)
  43. }
  44. logger.Error(err, "panic", "stack", "...\n"+string(buf))
  45. }
  46. }()
  47. j.Run()
  48. })
  49. }
  50. }
  51. // DelayIfStillRunning serializes jobs, delaying subsequent runs until the
  52. // previous one is complete. Jobs running after a delay of more than a minute
  53. // have the delay logged at Info.
  54. func DelayIfStillRunning(logger Logger) JobWrapper {
  55. return func(j Job) Job {
  56. var mu sync.Mutex
  57. return FuncJob(func() {
  58. start := time.Now()
  59. mu.Lock()
  60. defer mu.Unlock()
  61. if dur := time.Since(start); dur > time.Minute {
  62. logger.Info("delay", "duration", dur)
  63. }
  64. j.Run()
  65. })
  66. }
  67. }
  68. // SkipIfStillRunning skips an invocation of the Job if a previous invocation is
  69. // still running. It logs skips to the given logger at Info level.
  70. func SkipIfStillRunning(logger Logger) JobWrapper {
  71. return func(j Job) Job {
  72. var ch = make(chan struct{}, 1)
  73. ch <- struct{}{}
  74. return FuncJob(func() {
  75. select {
  76. case v := <-ch:
  77. j.Run()
  78. ch <- v
  79. default:
  80. logger.Info("skip")
  81. }
  82. })
  83. }
  84. }