123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- package redistool
- import (
- "fmt"
- "github.com/druidcaesa/gotool"
- "github.com/gomodule/redigo/redis"
- "log"
- "time"
- "ulink-admin/config"
- "ulink-admin/pkg/common"
- )
- // https://godoc.org/github.com/gomodule/redigo/redis#pkg-examples
- // https://github.com/gomodule/redigo
- // RedisClient redis client instance
- type RedisClient struct {
- pool *redis.Pool
- connOpt common.RedisConnOpt
- // 数据接收
- chanRx chan common.RedisDataArray
- // 是否退出
- isExit bool
- }
- // NewRedis new redis client
- func NewRedis(opt common.RedisConnOpt) *RedisClient {
- return &RedisClient{
- connOpt: opt,
- pool: newPool(opt),
- chanRx: make(chan common.RedisDataArray, 100),
- }
- }
- type KeyProxy struct {
- marketKey string
- serviceKey string
- }
- var proxy KeyProxy
- func init() {
- cfg := config.GetOmsApiCfg()
- proxy.marketKey = cfg.MarketAppId
- proxy.serviceKey = cfg.ServiceAppId
- }
- func ProxyKey(key string) string {
- proxyKey := fmt.Sprintf("%s.%s.%s", proxy.marketKey, proxy.serviceKey, key)
- println(proxyKey)
- return proxyKey
- }
- // newPool 线程池
- func newPool(opt common.RedisConnOpt) *redis.Pool {
- return &redis.Pool{
- MaxIdle: 3,
- IdleTimeout: 240 * time.Second,
- // MaxActive: 10,
- // Wait: true,
- Dial: func() (redis.Conn, error) {
- c, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", opt.Host, opt.Port))
- if err != nil {
- log.Fatalf("Redis.Dial: %v", err)
- return nil, err
- }
- if gotool.StrUtils.HasNotEmpty(opt.Password) {
- if _, err := c.Do("AUTH", opt.Password); err != nil {
- c.Close()
- log.Fatalf("Redis.AUTH: %v", err)
- return nil, err
- }
- }
- if _, err := c.Do("SELECT", opt.Index); err != nil {
- c.Close()
- log.Fatalf("Redis.SELECT: %v", err)
- return nil, err
- }
- return c, nil
- },
- }
- }
- // Start 启动接收任务协程
- func (r *RedisClient) Start() {
- r.isExit = false
- // 开启协程用于循环接收数据
- go r.loopRead()
- }
- // Stop 停止接收任务
- func (r *RedisClient) Stop() {
- r.isExit = true
- // 关闭数据接收通道
- close(r.chanRx)
- // 关闭redis线程池
- r.pool.Close()
- }
- // Write 向redis中写入多组数据
- func (r *RedisClient) Write(data common.RedisDataArray) {
- r.chanRx <- data
- }
- // loopRead 循环接收数据
- func (r *RedisClient) loopRead() {
- for !r.isExit {
- select {
- case rx := <-r.chanRx:
- for _, it := range rx {
- if len(it.Key) > 0 {
- key := ProxyKey(it.Key)
- if len(it.Field) > 0 {
- if _, err := r.HSET(key, it.Field, it.Value); err != nil {
- log.Printf("[%s, %s, %s]: %s\n", key, it.Field, it.Value, err.Error())
- }
- } else {
- if _, err := r.SET(key, it.Value); err != nil {
- log.Printf("[%s, %s, %s]: %s\n", key, it.Field, it.Value, err.Error())
- }
- }
- if it.Expire > 0 {
- r.EXPIRE(key, it.Expire)
- }
- }
- }
- }
- }
- }
- // Error get redis connect error
- func (r *RedisClient) Error() error {
- conn := r.pool.Get()
- defer conn.Close()
- return conn.Err()
- }
- // 常用Redis操作命令的封装
- // http://redis.io/commands
- // KEYS get patten key array
- func (r *RedisClient) KEYS(patten string) ([]string, error) {
- conn := r.pool.Get()
- defer conn.Close()
- return redis.Strings(conn.Do("KEYS", patten))
- }
- // SCAN 获取大量key
- func (r *RedisClient) SCAN(patten string) ([]string, error) {
- conn := r.pool.Get()
- defer conn.Close()
- var out []string
- var cursor uint64 = 0xffffff
- isfirst := true
- for cursor != 0 {
- if isfirst {
- cursor = 0
- isfirst = false
- }
- arr, err := conn.Do("SCAN", cursor, "MATCH", patten, "COUNT", 100)
- if err != nil {
- return out, err
- }
- switch arr := arr.(type) {
- case []interface{}:
- cursor, _ = redis.Uint64(arr[0], nil)
- it, _ := redis.Strings(arr[1], nil)
- out = append(out, it...)
- }
- }
- out = gotool.StrArrayUtils.ArrayDuplication(out)
- return out, nil
- }
- // DEL delete k-v
- func (r *RedisClient) DEL(key string) (int, error) {
- key = ProxyKey(key)
- conn := r.pool.Get()
- defer conn.Close()
- return redis.Int(conn.Do("DEL", key))
- }
- // DELALL delete key array
- func (r *RedisClient) DELALL(key []string) (int, error) {
- conn := r.pool.Get()
- defer conn.Close()
- arr := make([]interface{}, len(key))
- for i, v := range key {
- v = ProxyKey(v)
- arr[i] = v
- }
- return redis.Int(conn.Do("DEL", arr...))
- }
- // GET get k-v
- func (r *RedisClient) GET(key string) (string, error) {
- key = ProxyKey(key)
- conn := r.pool.Get()
- defer conn.Close()
- return redis.String(conn.Do("GET", key))
- }
- // SET set k-v
- func (r *RedisClient) SET(key string, value string) (int64, error) {
- key = ProxyKey(key)
- conn := r.pool.Get()
- defer conn.Close()
- return redis.Int64(conn.Do("SET", key, value))
- }
- // SETEX set k-v expire seconds
- func (r *RedisClient) SETEX(key string, sec int, value string) (string, error) {
- key = ProxyKey(key)
- conn := r.pool.Get()
- defer conn.Close()
- return redis.String(conn.Do("SETEX", key, sec, value))
- }
- // EXPIRE set key expire seconds
- func (r *RedisClient) EXPIRE(key string, sec int64) (int64, error) {
- key = ProxyKey(key)
- conn := r.pool.Get()
- defer conn.Close()
- return redis.Int64(conn.Do("EXPIRE", key, sec))
- }
- // HGETALL get map of key
- func (r *RedisClient) HGETALL(key string) (map[string]string, error) {
- key = ProxyKey(key)
- conn := r.pool.Get()
- defer conn.Close()
- return redis.StringMap(conn.Do("HGETALL", key))
- }
- // HGET get value of key-field
- func (r *RedisClient) HGET(key string, field string) (string, error) {
- key = ProxyKey(key)
- conn := r.pool.Get()
- defer conn.Close()
- return redis.String(conn.Do("HGET", key, field))
- }
- // HSET set value of key-field
- func (r *RedisClient) HSET(key string, field string, value string) (int64, error) {
- key = ProxyKey(key)
- conn := r.pool.Get()
- defer conn.Close()
- return redis.Int64(conn.Do("HSET", key, field, value))
- }
- func (r *RedisClient) EXISTS(key string) (bool, error) {
- key = ProxyKey(key)
- conn := r.pool.Get()
- defer conn.Close()
- return redis.Bool(conn.Do("EXISTS", key))
- }
|