当前位置: 首页 > 资讯 > >正文

【全球新要闻】GO实现Redis:GO实现Redis的AOF持久化(4)

来源:博客园    时间:2023-03-26 22:04:09


(资料图片仅供参考)

将用户发来的指令以RESP协议的形式存储在本地的AOF文件,重启Redis后执行此文件恢复数据https://github.com/csgopher/go-redis本文涉及以下文件:redis.conf:配置文件aof:实现aof

redis.conf

appendonly yesappendfilename appendonly.aof

aof/aof.go

type CmdLine = [][]byteconst (   aofQueueSize = 1 << 16)type payload struct {   cmdLine CmdLine   dbIndex int}type AofHandler struct {   db          databaseface.Database   aofChan     chan *payload   aofFile     *os.File   aofFilename string   currentDB   int}func NewAOFHandler(db databaseface.Database) (*AofHandler, error) {   handler := &AofHandler{}   handler.aofFilename = config.Properties.AppendFilename   handler.db = db   handler.LoadAof()   aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)   if err != nil {      return nil, err   }   handler.aofFile = aofFile   handler.aofChan = make(chan *payload, aofQueueSize)   go func() {      handler.handleAof()   }()   return handler, nil}func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {   if config.Properties.AppendOnly && handler.aofChan != nil {      handler.aofChan <- &payload{         cmdLine: cmdLine,         dbIndex: dbIndex,      }   }}func (handler *AofHandler) handleAof() {   handler.currentDB = 0   for p := range handler.aofChan {      if p.dbIndex != handler.currentDB {         // select db         data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()         _, err := handler.aofFile.Write(data)         if err != nil {            logger.Warn(err)            continue         }         handler.currentDB = p.dbIndex      }      data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()      _, err := handler.aofFile.Write(data)      if err != nil {         logger.Warn(err)      }   }}func (handler *AofHandler) LoadAof() {   file, err := os.Open(handler.aofFilename)   if err != nil {      logger.Warn(err)      return   }   defer file.Close()   ch := parser.ParseStream(file)   fakeConn := &connection.Connection{}   for p := range ch {      if p.Err != nil {         if p.Err == io.EOF {            break         }         logger.Error("parse error: " + p.Err.Error())         continue      }      if p.Data == nil {         logger.Error("empty payload")         continue      }      r, ok := p.Data.(*reply.MultiBulkReply)      if !ok {         logger.Error("require multi bulk reply")         continue      }      ret := handler.db.Exec(fakeConn, r.Args)      if reply.IsErrorReply(ret) {         logger.Error("exec err", err)      }   }}

AofHandler:1.从管道中接收数据 2.写入AOF文件AddAof:用户的指令包装成payload放入管道handleAof:将管道中的payload写入磁盘LoadAof:重启Redis后加载aof文件

database/database.go

type Database struct {   dbSet []*DB   aofHandler *aof.AofHandler}func NewDatabase() *Database {   mdb := &Database{}   if config.Properties.Databases == 0 {      config.Properties.Databases = 16   }   mdb.dbSet = make([]*DB, config.Properties.Databases)   for i := range mdb.dbSet {      singleDB := makeDB()      singleDB.index = i      mdb.dbSet[i] = singleDB   }   if config.Properties.AppendOnly {      aofHandler, err := aof.NewAOFHandler(mdb)      if err != nil {         panic(err)      }      mdb.aofHandler = aofHandler      for _, db := range mdb.dbSet {         singleDB := db         singleDB.addAof = func(line CmdLine) {            mdb.aofHandler.AddAof(singleDB.index, line)         }      }   }   return mdb}

将AOF加入到database里使用singleDB的原因:因为在循环中获取返回变量的地址都完全相同,因此当我们想要访问数组中元素所在的地址时,不应该直接获取 range 返回的变量地址 db,而应该使用 singleDB := db

database/db.go

type DB struct {   index int   data   dict.Dict   addAof func(CmdLine)}func makeDB() *DB {db := &DB{data:   dict.MakeSyncDict(),addAof: func(line CmdLine) {},}return db}

由于分数据库db引用不到aof,所以添加一个addAof匿名函数,在NewDatabase中用这个匿名函数调用AddAof

database/keys.go

func execDel(db *DB, args [][]byte) resp.Reply {   ......   if deleted > 0 {      db.addAof(utils.ToCmdLine2("del", args...))   }   return reply.MakeIntReply(int64(deleted))}func execFlushDB(db *DB, args [][]byte) resp.Reply {db.Flush()db.addAof(utils.ToCmdLine2("flushdb", args...))return &reply.OkReply{}}func execRename(db *DB, args [][]byte) resp.Reply {......db.addAof(utils.ToCmdLine2("rename", args...))return &reply.OkReply{}}func execRenameNx(db *DB, args [][]byte) resp.Reply {......db.addAof(utils.ToCmdLine2("renamenx", args...))return reply.MakeIntReply(1)}

database/string.go

func execSet(db *DB, args [][]byte) resp.Reply {   ......   db.addAof(utils.ToCmdLine2("set", args...))   return &reply.OkReply{}}func execSetNX(db *DB, args [][]byte) resp.Reply {   ......   db.addAof(utils.ToCmdLine2("setnx", args...))   return reply.MakeIntReply(int64(result))}func execGetSet(db *DB, args [][]byte) resp.Reply {   key := string(args[0])   value := args[1]   entity, exists := db.GetEntity(key)   db.PutEntity(key, &database.DataEntity{Data: value})   db.addAof(utils.ToCmdLine2("getset", args...))   ......}

添加addAof方法

测试命令
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n*2\r\n$6\r\nSELECT\r\n$1\r\n1\r\n

X 关闭

推荐内容

这是标题

Copyright ©  2015-2022 东亚养生网版权所有  备案号:琼ICP备2022009675号-13   联系邮箱:435 227 67 @qq.com