index.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package redistool
  2. import (
  3. "fmt"
  4. "github.com/druidcaesa/gotool"
  5. "github.com/gomodule/redigo/redis"
  6. "log"
  7. "time"
  8. "ulink-admin/config"
  9. "ulink-admin/pkg/common"
  10. )
  11. // https://godoc.org/github.com/gomodule/redigo/redis#pkg-examples
  12. // https://github.com/gomodule/redigo
  13. // RedisClient redis client instance
  14. type RedisClient struct {
  15. pool *redis.Pool
  16. connOpt common.RedisConnOpt
  17. // 数据接收
  18. chanRx chan common.RedisDataArray
  19. // 是否退出
  20. isExit bool
  21. }
  22. // NewRedis new redis client
  23. func NewRedis(opt common.RedisConnOpt) *RedisClient {
  24. return &RedisClient{
  25. connOpt: opt,
  26. pool: newPool(opt),
  27. chanRx: make(chan common.RedisDataArray, 100),
  28. }
  29. }
  30. type KeyProxy struct {
  31. marketKey string
  32. serviceKey string
  33. }
  34. var proxy KeyProxy
  35. func init() {
  36. cfg := config.GetOmsApiCfg()
  37. proxy.marketKey = cfg.MarketAppId
  38. proxy.serviceKey = cfg.ServiceAppId
  39. }
  40. func ProxyKey(key string) string {
  41. proxyKey := fmt.Sprintf("%s.%s.%s", proxy.marketKey, proxy.serviceKey, key)
  42. println(proxyKey)
  43. return proxyKey
  44. }
  45. // newPool 线程池
  46. func newPool(opt common.RedisConnOpt) *redis.Pool {
  47. return &redis.Pool{
  48. MaxIdle: 3,
  49. IdleTimeout: 240 * time.Second,
  50. // MaxActive: 10,
  51. // Wait: true,
  52. Dial: func() (redis.Conn, error) {
  53. c, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", opt.Host, opt.Port))
  54. if err != nil {
  55. log.Fatalf("Redis.Dial: %v", err)
  56. return nil, err
  57. }
  58. if gotool.StrUtils.HasNotEmpty(opt.Password) {
  59. if _, err := c.Do("AUTH", opt.Password); err != nil {
  60. c.Close()
  61. log.Fatalf("Redis.AUTH: %v", err)
  62. return nil, err
  63. }
  64. }
  65. if _, err := c.Do("SELECT", opt.Index); err != nil {
  66. c.Close()
  67. log.Fatalf("Redis.SELECT: %v", err)
  68. return nil, err
  69. }
  70. return c, nil
  71. },
  72. }
  73. }
  74. // Start 启动接收任务协程
  75. func (r *RedisClient) Start() {
  76. r.isExit = false
  77. // 开启协程用于循环接收数据
  78. go r.loopRead()
  79. }
  80. // Stop 停止接收任务
  81. func (r *RedisClient) Stop() {
  82. r.isExit = true
  83. // 关闭数据接收通道
  84. close(r.chanRx)
  85. // 关闭redis线程池
  86. r.pool.Close()
  87. }
  88. // Write 向redis中写入多组数据
  89. func (r *RedisClient) Write(data common.RedisDataArray) {
  90. r.chanRx <- data
  91. }
  92. // loopRead 循环接收数据
  93. func (r *RedisClient) loopRead() {
  94. for !r.isExit {
  95. select {
  96. case rx := <-r.chanRx:
  97. for _, it := range rx {
  98. if len(it.Key) > 0 {
  99. key := ProxyKey(it.Key)
  100. if len(it.Field) > 0 {
  101. if _, err := r.HSET(key, it.Field, it.Value); err != nil {
  102. log.Printf("[%s, %s, %s]: %s\n", key, it.Field, it.Value, err.Error())
  103. }
  104. } else {
  105. if _, err := r.SET(key, it.Value); err != nil {
  106. log.Printf("[%s, %s, %s]: %s\n", key, it.Field, it.Value, err.Error())
  107. }
  108. }
  109. if it.Expire > 0 {
  110. r.EXPIRE(key, it.Expire)
  111. }
  112. }
  113. }
  114. }
  115. }
  116. }
  117. // Error get redis connect error
  118. func (r *RedisClient) Error() error {
  119. conn := r.pool.Get()
  120. defer conn.Close()
  121. return conn.Err()
  122. }
  123. // 常用Redis操作命令的封装
  124. // http://redis.io/commands
  125. // KEYS get patten key array
  126. func (r *RedisClient) KEYS(patten string) ([]string, error) {
  127. conn := r.pool.Get()
  128. defer conn.Close()
  129. return redis.Strings(conn.Do("KEYS", patten))
  130. }
  131. // SCAN 获取大量key
  132. func (r *RedisClient) SCAN(patten string) ([]string, error) {
  133. conn := r.pool.Get()
  134. defer conn.Close()
  135. var out []string
  136. var cursor uint64 = 0xffffff
  137. isfirst := true
  138. for cursor != 0 {
  139. if isfirst {
  140. cursor = 0
  141. isfirst = false
  142. }
  143. arr, err := conn.Do("SCAN", cursor, "MATCH", patten, "COUNT", 100)
  144. if err != nil {
  145. return out, err
  146. }
  147. switch arr := arr.(type) {
  148. case []interface{}:
  149. cursor, _ = redis.Uint64(arr[0], nil)
  150. it, _ := redis.Strings(arr[1], nil)
  151. out = append(out, it...)
  152. }
  153. }
  154. out = gotool.StrArrayUtils.ArrayDuplication(out)
  155. return out, nil
  156. }
  157. // DEL delete k-v
  158. func (r *RedisClient) DEL(key string) (int, error) {
  159. key = ProxyKey(key)
  160. conn := r.pool.Get()
  161. defer conn.Close()
  162. return redis.Int(conn.Do("DEL", key))
  163. }
  164. // DELALL delete key array
  165. func (r *RedisClient) DELALL(key []string) (int, error) {
  166. conn := r.pool.Get()
  167. defer conn.Close()
  168. arr := make([]interface{}, len(key))
  169. for i, v := range key {
  170. v = ProxyKey(v)
  171. arr[i] = v
  172. }
  173. return redis.Int(conn.Do("DEL", arr...))
  174. }
  175. // GET get k-v
  176. func (r *RedisClient) GET(key string) (string, error) {
  177. key = ProxyKey(key)
  178. conn := r.pool.Get()
  179. defer conn.Close()
  180. return redis.String(conn.Do("GET", key))
  181. }
  182. // SET set k-v
  183. func (r *RedisClient) SET(key string, value string) (int64, error) {
  184. key = ProxyKey(key)
  185. conn := r.pool.Get()
  186. defer conn.Close()
  187. return redis.Int64(conn.Do("SET", key, value))
  188. }
  189. // SETEX set k-v expire seconds
  190. func (r *RedisClient) SETEX(key string, sec int, value string) (string, error) {
  191. key = ProxyKey(key)
  192. conn := r.pool.Get()
  193. defer conn.Close()
  194. return redis.String(conn.Do("SETEX", key, sec, value))
  195. }
  196. // EXPIRE set key expire seconds
  197. func (r *RedisClient) EXPIRE(key string, sec int64) (int64, error) {
  198. key = ProxyKey(key)
  199. conn := r.pool.Get()
  200. defer conn.Close()
  201. return redis.Int64(conn.Do("EXPIRE", key, sec))
  202. }
  203. // HGETALL get map of key
  204. func (r *RedisClient) HGETALL(key string) (map[string]string, error) {
  205. key = ProxyKey(key)
  206. conn := r.pool.Get()
  207. defer conn.Close()
  208. return redis.StringMap(conn.Do("HGETALL", key))
  209. }
  210. // HGET get value of key-field
  211. func (r *RedisClient) HGET(key string, field string) (string, error) {
  212. key = ProxyKey(key)
  213. conn := r.pool.Get()
  214. defer conn.Close()
  215. return redis.String(conn.Do("HGET", key, field))
  216. }
  217. // HSET set value of key-field
  218. func (r *RedisClient) HSET(key string, field string, value string) (int64, error) {
  219. key = ProxyKey(key)
  220. conn := r.pool.Get()
  221. defer conn.Close()
  222. return redis.Int64(conn.Do("HSET", key, field, value))
  223. }
  224. func (r *RedisClient) EXISTS(key string) (bool, error) {
  225. key = ProxyKey(key)
  226. conn := r.pool.Get()
  227. defer conn.Close()
  228. return redis.Bool(conn.Do("EXISTS", key))
  229. }