stream.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. /*
  2. * Copyright 2021 ByteDance Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package decoder
  17. import (
  18. `bytes`
  19. `io`
  20. `sync`
  21. `github.com/bytedance/sonic/internal/native`
  22. `github.com/bytedance/sonic/internal/native/types`
  23. `github.com/bytedance/sonic/option`
  24. )
  25. var (
  26. minLeftBufferShift uint = 1
  27. )
  28. // StreamDecoder is the decoder context object for streaming input.
  29. type StreamDecoder struct {
  30. r io.Reader
  31. buf []byte
  32. scanp int
  33. scanned int64
  34. err error
  35. Decoder
  36. }
  37. var bufPool = sync.Pool{
  38. New: func () interface{} {
  39. return make([]byte, 0, option.DefaultDecoderBufferSize)
  40. },
  41. }
  42. // NewStreamDecoder adapts to encoding/json.NewDecoder API.
  43. //
  44. // NewStreamDecoder returns a new decoder that reads from r.
  45. func NewStreamDecoder(r io.Reader) *StreamDecoder {
  46. return &StreamDecoder{r : r}
  47. }
  48. // Decode decodes input stream into val with corresponding data.
  49. // Redundantly bytes may be read and left in its buffer, and can be used at next call.
  50. // Either io error from underlying io.Reader (except io.EOF)
  51. // or syntax error from data will be recorded and stop subsequently decoding.
  52. func (self *StreamDecoder) Decode(val interface{}) (err error) {
  53. if self.err != nil {
  54. return self.err
  55. }
  56. var buf = self.buf[self.scanp:]
  57. var p = 0
  58. var recycle bool
  59. if cap(buf) == 0 {
  60. buf = bufPool.Get().([]byte)
  61. recycle = true
  62. }
  63. var first = true
  64. var repeat = true
  65. read_more:
  66. for {
  67. l := len(buf)
  68. realloc(&buf)
  69. n, err := self.r.Read(buf[l:cap(buf)])
  70. buf = buf[:l+n]
  71. if err != nil {
  72. repeat = false
  73. if err == io.EOF {
  74. if len(buf) == 0 {
  75. return err
  76. }
  77. break
  78. }
  79. self.err = err
  80. return err
  81. }
  82. if n > 0 || first {
  83. break
  84. }
  85. }
  86. first = false
  87. l := len(buf)
  88. if l > 0 {
  89. self.Decoder.Reset(string(buf))
  90. var x int
  91. if ret := native.SkipOneFast(&self.s, &x); ret < 0 {
  92. if repeat {
  93. goto read_more
  94. } else {
  95. err = SyntaxError{x, self.s, types.ParsingError(-ret), ""}
  96. self.err = err
  97. return
  98. }
  99. }
  100. err = self.Decoder.Decode(val)
  101. if err != nil {
  102. self.err = err
  103. }
  104. p = self.Decoder.Pos()
  105. self.scanned += int64(p)
  106. self.scanp = 0
  107. }
  108. if l > p {
  109. // remain undecoded bytes, so copy them into self.buf
  110. self.buf = append(self.buf[:0], buf[p:]...)
  111. } else {
  112. self.buf = nil
  113. recycle = true
  114. }
  115. if recycle {
  116. buf = buf[:0]
  117. bufPool.Put(buf)
  118. }
  119. return err
  120. }
  121. func (self StreamDecoder) repeatable(err error) bool {
  122. if ee, ok := err.(SyntaxError); ok &&
  123. (ee.Code == types.ERR_EOF || (ee.Code == types.ERR_INVALID_CHAR && self.i >= len(self.s)-1)) {
  124. return true
  125. }
  126. return false
  127. }
  128. // InputOffset returns the input stream byte offset of the current decoder position.
  129. // The offset gives the location of the end of the most recently returned token and the beginning of the next token.
  130. func (self *StreamDecoder) InputOffset() int64 {
  131. return self.scanned + int64(self.scanp)
  132. }
  133. // Buffered returns a reader of the data remaining in the Decoder's buffer.
  134. // The reader is valid until the next call to Decode.
  135. func (self *StreamDecoder) Buffered() io.Reader {
  136. return bytes.NewReader(self.buf[self.scanp:])
  137. }
  138. // More reports whether there is another element in the
  139. // current array or object being parsed.
  140. func (self *StreamDecoder) More() bool {
  141. if self.err != nil {
  142. return false
  143. }
  144. c, err := self.peek()
  145. return err == nil && c != ']' && c != '}'
  146. }
  147. func (self *StreamDecoder) peek() (byte, error) {
  148. var err error
  149. for {
  150. for i := self.scanp; i < len(self.buf); i++ {
  151. c := self.buf[i]
  152. if isSpace(c) {
  153. continue
  154. }
  155. self.scanp = i
  156. return c, nil
  157. }
  158. // buffer has been scanned, now report any error
  159. if err != nil {
  160. if err != io.EOF {
  161. self.err = err
  162. }
  163. return 0, err
  164. }
  165. err = self.refill()
  166. }
  167. }
  168. func isSpace(c byte) bool {
  169. return types.SPACE_MASK & (1 << c) != 0
  170. }
  171. func (self *StreamDecoder) refill() error {
  172. // Make room to read more into the buffer.
  173. // First slide down data already consumed.
  174. if self.scanp > 0 {
  175. self.scanned += int64(self.scanp)
  176. n := copy(self.buf, self.buf[self.scanp:])
  177. self.buf = self.buf[:n]
  178. self.scanp = 0
  179. }
  180. // Grow buffer if not large enough.
  181. realloc(&self.buf)
  182. // Read. Delay error for next iteration (after scan).
  183. n, err := self.r.Read(self.buf[len(self.buf):cap(self.buf)])
  184. self.buf = self.buf[0 : len(self.buf)+n]
  185. return err
  186. }
  187. func realloc(buf *[]byte) {
  188. l := uint(len(*buf))
  189. c := uint(cap(*buf))
  190. if c - l <= c >> minLeftBufferShift {
  191. e := l+(l>>minLeftBufferShift)
  192. if e < option.DefaultDecoderBufferSize {
  193. e = option.DefaultDecoderBufferSize
  194. }
  195. tmp := make([]byte, l, e)
  196. copy(tmp, *buf)
  197. *buf = tmp
  198. }
  199. }