• GO实现Redis:GO实现内存数据库(3)



    • 实现Redis的database层(核心层:处理命令并返回)
    • https://github.com/csgopher/go-redis
    • 本文涉及以下文件:
      dict:定义字典的一些方法
      sync_dict:实现dict
      db:分数据库
      command:定义指令
      ping,keys,string:指令的具体处理逻辑
      database:单机版数据库

    datastruct/dict/dict.go

    // 遍历字典所有的键值对,返回值是布尔,true继续遍历,false停止遍历
    type Consumer func(key string, val interface{}) bool
    
    type Dict interface {
       Get(key string) (val interface{}, exists bool)
       Len() int
       Put(key string, val interface{}) (result int)
       PutIfAbsent(key string, val interface{}) (result int)
       PutIfExists(key string, val interface{}) (result int)
       Remove(key string) (result int)
       ForEach(consumer Consumer)
       Keys() []string
       RandomKeys(limit int) []string
       RandomDistinctKeys(limit int) []string
       Clear()
    }
    

    Consumer:遍历字典所有的键值对,返回值是布尔,true继续遍历,false停止遍历

    Dict接口:Redis数据结构的接口。这里使用sync.Map作为字典的实现,如果想用别的数据结构,换一个实现即可


    datastruct/dict/sync_dict.go

    // 使用sync.Map实现Dict接口
    type SyncDict struct {
       m sync.Map
    }
    
    func MakeSyncDict() *SyncDict {
       return &SyncDict{}
    }
    
    func (dict *SyncDict) Get(key string) (val interface{}, exists bool) {
       val, ok := dict.m.Load(key)
       return val, ok
    }
    
    func (dict *SyncDict) Len() int {
       length := 0
       dict.m.Range(func(k, v interface{}) bool {
          length++
          return true
       })
       return length
    }
    
    func (dict *SyncDict) Put(key string, val interface{}) (result int) {
       _, existed := dict.m.Load(key)
       dict.m.Store(key, val)
       if existed {
          return 0
       }
       return 1
    }
    
    func (dict *SyncDict) PutIfAbsent(key string, val interface{}) (result int) {
       _, existed := dict.m.Load(key)
       if existed {
          return 0
       }
       dict.m.Store(key, val)
       return 1
    }
    
    func (dict *SyncDict) PutIfExists(key string, val interface{}) (result int) {
       _, existed := dict.m.Load(key)
       if existed {
          dict.m.Store(key, val)
          return 1
       }
       return 0
    }
    
    func (dict *SyncDict) Remove(key string) (result int) {
       _, existed := dict.m.Load(key)
       dict.m.Delete(key)
       if existed {
          return 1
       }
       return 0
    }
    
    func (dict *SyncDict) ForEach(consumer Consumer) {
       dict.m.Range(func(key, value interface{}) bool {
          consumer(key.(string), value)
          return true
       })
    }
    
    func (dict *SyncDict) Keys() []string {
       result := make([]string, dict.Len())
       i := 0
       dict.m.Range(func(key, value interface{}) bool {
          result[i] = key.(string)
          i++
          return true
       })
       return result
    }
    
    func (dict *SyncDict) RandomKeys(limit int) []string {
       result := make([]string, limit)
       for i := 0; i < limit; i++ {
          dict.m.Range(func(key, value interface{}) bool {
             result[i] = key.(string)
             return false
          })
       }
       return result
    }
    
    func (dict *SyncDict) RandomDistinctKeys(limit int) []string {
       result := make([]string, limit)
       i := 0
       dict.m.Range(func(key, value interface{}) bool {
          result[i] = key.(string)
          i++
          if i == limit {
             return false
          }
          return true
       })
       return result
    }
    
    func (dict *SyncDict) Clear() {
       *dict = *MakeSyncDict()
    }
    

    使用sync.Map实现Dict接口

    database/db.go

    // Redis中的分数据库
    type DB struct {
    	index int
    	data  dict.Dict
    }
    
    // 所有Redis的指令都写成这样的类型
    type ExecFunc func(db *DB, args [][]byte) resp.Reply
    
    type CmdLine = [][]byte
    
    func makeDB() *DB {
    	db := &DB{
    		data: dict.MakeSyncDict(),
    	}
    	return db
    }
    
    func (db *DB) Exec(c resp.Connection, cmdLine [][]byte) resp.Reply {
    	cmdName := strings.ToLower(string(cmdLine[0]))
    	cmd, ok := cmdTable[cmdName]
    	if !ok {
    		return reply.MakeErrReply("ERR unknown command '" + cmdName + "'")
    	}
    	if !validateArity(cmd.arity, cmdLine) {
    		return reply.MakeArgNumErrReply(cmdName)
    	}
    	fun := cmd.executor
    	return fun(db, cmdLine[1:]) // 把 set k v 中的set切掉
    }
    
    // 定长:set k v => arity=3;
    // 变长:exists k1 k2 k3 ... => arity=-2,表示参数>=2个
    func validateArity(arity int, cmdArgs [][]byte) bool {
    	argNum := len(cmdArgs)
    	if arity >= 0 {
    		return argNum == arity
    	}
    	return argNum >= -arity
    }
    
    func (db *DB) GetEntity(key string) (*database.DataEntity, bool) {
    	raw, ok := db.data.Get(key)
    	if !ok {
    		return nil, false
    	}
    	entity, _ := raw.(*database.DataEntity)
    	return entity, true
    }
    
    func (db *DB) PutEntity(key string, entity *database.DataEntity) int {
    	return db.data.Put(key, entity)
    }
    
    func (db *DB) PutIfExists(key string, entity *database.DataEntity) int {
    	return db.data.PutIfExists(key, entity)
    }
    
    func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int {
    	return db.data.PutIfAbsent(key, entity)
    }
    
    func (db *DB) Remove(key string) {
    	db.data.Remove(key)
    }
    
    func (db *DB) Removes(keys ...string) (deleted int) {
    	deleted = 0
    	for _, key := range keys {
    		_, exists := db.data.Get(key)
    		if exists {
    			db.Remove(key)
    			deleted++
    		}
    	}
    	return deleted
    }
    
    func (db *DB) Flush() {
    	db.data.Clear()
    }
    

    实现Redis中的分数据库
    ExecFunc:所有Redis的指令都写成这样的类型
    validateArity方法:

    • 定长:set k v => arity=3;
    • 变长:exists k1 k2 k3 ... => arity=-2,表示参数>=2个

    database/command.go

    var cmdTable = make(map[string]*command)
    
    type command struct {
       executor ExecFunc
       arity    int 
    }
    
    func RegisterCommand(name string, executor ExecFunc, arity int) {
       name = strings.ToLower(name)
       cmdTable[name] = &command{
          executor: executor,
          arity:    arity,
       }
    }
    

    command:每一个command结构体都是一个指令,例如ping,keys等等
    arity:参数数量
    cmdTable:记录所有指令和command结构体的关系
    RegisterCommand:注册指令的实现,在程序

    database/ping.go

    func Ping(db *DB, args [][]byte) resp.Reply {
        if len(args) == 0 {
            return &reply.PongReply{}
        } else if len(args) == 1 {
            return reply.MakeStatusReply(string(args[0]))
        } else {
            return reply.MakeErrReply("ERR wrong number of arguments for 'ping' command")
        }
    }
    
    func init() {
        RegisterCommand("ping", Ping, 1)
    }
    

    init方法:在启动程序时就会调用这个方法,用于初始化

    database/keys.go

    // execDel:del k1 k2 k3 ...
    func execDel(db *DB, args [][]byte) resp.Reply {
       keys := make([]string, len(args))
       for i, v := range args {
          keys[i] = string(v)
       }
    
       deleted := db.Removes(keys...)
       return reply.MakeIntReply(int64(deleted))
    }
    
    // execExists:exist k1 k2 k3 ...
    func execExists(db *DB, args [][]byte) resp.Reply {
       result := int64(0)
       for _, arg := range args {
          key := string(arg)
          _, exists := db.GetEntity(key)
          if exists {
             result++
          }
       }
       return reply.MakeIntReply(result)
    }
    
    // execFlushDB:flushdb
    func execFlushDB(db *DB, args [][]byte) resp.Reply {
       db.Flush()
       return &reply.OkReply{}
    }
    
    // execType:type k1
    func execType(db *DB, args [][]byte) resp.Reply {
       key := string(args[0])
       entity, exists := db.GetEntity(key)
       if !exists {
          return reply.MakeStatusReply("none")
       }
       switch entity.Data.(type) {
       case []byte:
          return reply.MakeStatusReply("string")
       }
       return &reply.UnknownErrReply{}
    }
    
    // execRename:rename k1 k2
    func execRename(db *DB, args [][]byte) resp.Reply {
       if len(args) != 2 {
          return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")
       }
       src := string(args[0])
       dest := string(args[1])
       
       entity, ok := db.GetEntity(src)
       if !ok {
          return reply.MakeErrReply("no such key")
       }
       db.PutEntity(dest, entity)
       db.Remove(src)
       return &reply.OkReply{}
    }
    
    // execRenameNx:renamenx k1 k2
    func execRenameNx(db *DB, args [][]byte) resp.Reply {
       src := string(args[0])
       dest := string(args[1])
    
       _, exist := db.GetEntity(dest)
       if exist {
          return reply.MakeIntReply(0)
       }
    
       entity, ok := db.GetEntity(src)
       if !ok {
          return reply.MakeErrReply("no such key")
       }
       db.Removes(src, dest)
       db.PutEntity(dest, entity)
       return reply.MakeIntReply(1)
    }
    
    // execKeys:keys
    func execKeys(db *DB, args [][]byte) resp.Reply {
       pattern := wildcard.CompilePattern(string(args[0]))
       result := make([][]byte, 0)
       db.data.ForEach(func(key string, val interface{}) bool {
          if pattern.IsMatch(key) {
             result = append(result, []byte(key))
          }
          return true
       })
       return reply.MakeMultiBulkReply(result)
    }
    
    func init() {
       RegisterCommand("Del", execDel, -2)
       RegisterCommand("Exists", execExists, -2)
       RegisterCommand("Keys", execKeys, 2)
       RegisterCommand("FlushDB", execFlushDB, -1)
       RegisterCommand("Type", execType, 2)
       RegisterCommand("Rename", execRename, 3)
       RegisterCommand("RenameNx", execRenameNx, 3)
    }
    

    keys.go实现以下指令:
    execDel:del k1 k2 k3 ...
    execExists:exist k1 k2 k3 ...
    execFlushDB:flushdb
    execType:type k1
    execRename:rename k1 k2
    execRenameNx:renamenx k1 k2
    execKeys:keys(依赖lib包的工具类wildcard.go)

    database/string.go

    // execGet:get k1
    func execGet(db *DB, args [][]byte) resp.Reply {
       key := string(args[0])
       bytes, err := db.getAsString(key)
       if err != nil {
          return err
       }
       if bytes == nil {
          return &reply.NullBulkReply{}
       }
       return reply.MakeBulkReply(bytes)
    }
    
    // execSet:set k v
    func (db *DB) getAsString(key string) ([]byte, reply.ErrorReply) {
       entity, ok := db.GetEntity(key)
       if !ok {
          return nil, nil
       }
       bytes, ok := entity.Data.([]byte)
       if !ok {
          return nil, &reply.WrongTypeErrReply{}
       }
       return bytes, nil
    }
    
    func execSet(db *DB, args [][]byte) resp.Reply {
       key := string(args[0])
       value := args[1]
       entity := &database.DataEntity{
          Data: value,
       }
       db.PutEntity(key, entity)
       return &reply.OkReply{}
    }
    
    // execSetNX:setnex k v
    func execSetNX(db *DB, args [][]byte) resp.Reply {
       key := string(args[0])
       value := args[1]
       entity := &database.DataEntity{
          Data: value,
       }
       result := db.PutIfAbsent(key, entity)
       return reply.MakeIntReply(int64(result))
    }
    
    // execGetSet:getset k v 返回旧值
    func execGetSet(db *DB, args [][]byte) resp.Reply {
       key := string(args[0])
       value := args[1]
    
       entity, exists := db.GetEntity(key)
       db.PutEntity(key, &database.DataEntity{Data: value})
       if !exists {
          return reply.MakeNullBulkReply()
       }
       old := entity.Data.([]byte)
       return reply.MakeBulkReply(old)
    }
    
    // execStrLen:strlen k
    func execStrLen(db *DB, args [][]byte) resp.Reply {
       key := string(args[0])
       entity, exists := db.GetEntity(key)
       if !exists {
          return reply.MakeNullBulkReply()
       }
       old := entity.Data.([]byte)
       return reply.MakeIntReply(int64(len(old)))
    }
    
    func init() {
       RegisterCommand("Get", execGet, 2)
       RegisterCommand("Set", execSet, -3)
       RegisterCommand("SetNx", execSetNX, 3)
       RegisterCommand("GetSet", execGetSet, 3)
       RegisterCommand("StrLen", execStrLen, 2)
    }
    

    string.go实现以下指令:
    execGet:get k1
    execSet:set k v
    execSetNX:setnex k v
    execGetSet:getset k v 返回旧值
    execStrLen:strlen k

    database/database.go

    // Database:一组db的集合
    type Database struct {
       dbSet []*DB
    }
    
    func NewDatabase() *Database {
       mdb := &Database{}
       if config.Properties.Databases == 0 {
          config.Properties.Databases = 16
       }
       mdb.dbSet = make([]*DB, config.Properties.Databases)
       for i := range mdb.dbSet {
          singleDB := makeDB()
          singleDB.index = i
          mdb.dbSet[i] = singleDB
       }
       return mdb
    }
    
    // Exec:执行切换db指令或者其他指令
    func (mdb *Database) Exec(c resp.Connection, cmdLine [][]byte) (result resp.Reply) {
       defer func() {
          if err := recover(); err != nil {
             logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
          }
       }()
    
       cmdName := strings.ToLower(string(cmdLine[0]))
       if cmdName == "select" {
          if len(cmdLine) != 2 {
             return reply.MakeArgNumErrReply("select")
          }
          return execSelect(c, mdb, cmdLine[1:])
       }
       dbIndex := c.GetDBIndex()
       selectedDB := mdb.dbSet[dbIndex]
       return selectedDB.Exec(c, cmdLine)
    }
    
    // execSelect方法:选择db(指令:select 2)
    func execSelect(c resp.Connection, mdb *Database, args [][]byte) resp.Reply {
       dbIndex, err := strconv.Atoi(string(args[0]))
       if err != nil {
          return reply.MakeErrReply("ERR invalid DB index")
       }
       if dbIndex >= len(mdb.dbSet) {
          return reply.MakeErrReply("ERR DB index is out of range")
       }
       c.SelectDB(dbIndex)
       return reply.MakeOkReply()
    }
    
    func (mdb *Database) Close() {
    }
    
    func (mdb *Database) AfterClientClose(c resp.Connection) {
    }
    

    Database:一组db的集合
    Exec:执行切换db指令或者其他指令
    execSelect方法:选择db(指令:select 2)

    resp/handler/handler.go

    import (
    	database2 "go-redis/database"
    )
    
    func MakeHandler() *RespHandler {
       var db database.Database
       db = database2.NewDatabase()
       return &RespHandler{
          db: db,
       }
    }
    

    修改实现协议层handler的database实现

    架构小结

    TCP层服务TCP的连接,然后将连接交给RESP协议层的handler,handler监听客户端的连接,将指令解析后发给管道,管道转给database层(database/database.go),核心层根据命令类型执行不同的方法,然后返回。

  • 相关阅读:
    debian安装qq及更新
    无人机/FPV穿越机的遥控器/接收机等配件厂商
    Docker网络模式与数据卷
    在“美国死海”边的科研盛会 ACM CCS‘24 截稿日期逼近 行动要快
    计算机毕业设计Java酒店信息管理(源码+系统+mysql数据库+lw文档)
    C# 将List拆分成多个子集合
    2019年互联网高频Java面试题指南!互联网升职加薪方案!
    CAN通信-应用
    数据湖统一元数据与权限
    【高危安全通告】微软8月多个漏洞修复
  • 原文地址:https://www.cnblogs.com/csgopher/p/17249335.html