(资料图片仅供参考)
redis.conf
appendonly yesappendfilename appendonly.aof
aof/aof.go
type CmdLine = [][]byteconst ( aofQueueSize = 1 << 16)type payload struct { cmdLine CmdLine dbIndex int}type AofHandler struct { db databaseface.Database aofChan chan *payload aofFile *os.File aofFilename string currentDB int}func NewAOFHandler(db databaseface.Database) (*AofHandler, error) { handler := &AofHandler{} handler.aofFilename = config.Properties.AppendFilename handler.db = db handler.LoadAof() aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { return nil, err } handler.aofFile = aofFile handler.aofChan = make(chan *payload, aofQueueSize) go func() { handler.handleAof() }() return handler, nil}func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) { if config.Properties.AppendOnly && handler.aofChan != nil { handler.aofChan <- &payload{ cmdLine: cmdLine, dbIndex: dbIndex, } }}func (handler *AofHandler) handleAof() { handler.currentDB = 0 for p := range handler.aofChan { if p.dbIndex != handler.currentDB { // select db data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes() _, err := handler.aofFile.Write(data) if err != nil { logger.Warn(err) continue } handler.currentDB = p.dbIndex } data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes() _, err := handler.aofFile.Write(data) if err != nil { logger.Warn(err) } }}func (handler *AofHandler) LoadAof() { file, err := os.Open(handler.aofFilename) if err != nil { logger.Warn(err) return } defer file.Close() ch := parser.ParseStream(file) fakeConn := &connection.Connection{} for p := range ch { if p.Err != nil { if p.Err == io.EOF { break } logger.Error("parse error: " + p.Err.Error()) continue } if p.Data == nil { logger.Error("empty payload") continue } r, ok := p.Data.(*reply.MultiBulkReply) if !ok { logger.Error("require multi bulk reply") continue } ret := handler.db.Exec(fakeConn, r.Args) if reply.IsErrorReply(ret) { logger.Error("exec err", err) } }}
AofHandler:1.从管道中接收数据 2.写入AOF文件AddAof:用户的指令包装成payload放入管道handleAof:将管道中的payload写入磁盘LoadAof:重启Redis后加载aof文件
database/database.go
type Database struct { dbSet []*DB aofHandler *aof.AofHandler}func NewDatabase() *Database { mdb := &Database{} if config.Properties.Databases == 0 { config.Properties.Databases = 16 } mdb.dbSet = make([]*DB, config.Properties.Databases) for i := range mdb.dbSet { singleDB := makeDB() singleDB.index = i mdb.dbSet[i] = singleDB } if config.Properties.AppendOnly { aofHandler, err := aof.NewAOFHandler(mdb) if err != nil { panic(err) } mdb.aofHandler = aofHandler for _, db := range mdb.dbSet { singleDB := db singleDB.addAof = func(line CmdLine) { mdb.aofHandler.AddAof(singleDB.index, line) } } } return mdb}
将AOF加入到database里使用singleDB的原因:因为在循环中获取返回变量的地址都完全相同,因此当我们想要访问数组中元素所在的地址时,不应该直接获取 range 返回的变量地址 db,而应该使用 singleDB := db
database/db.go
type DB struct { index int data dict.Dict addAof func(CmdLine)}func makeDB() *DB {db := &DB{data: dict.MakeSyncDict(),addAof: func(line CmdLine) {},}return db}
由于分数据库db引用不到aof,所以添加一个addAof匿名函数,在NewDatabase中用这个匿名函数调用AddAof
database/keys.go
func execDel(db *DB, args [][]byte) resp.Reply { ...... if deleted > 0 { db.addAof(utils.ToCmdLine2("del", args...)) } return reply.MakeIntReply(int64(deleted))}func execFlushDB(db *DB, args [][]byte) resp.Reply {db.Flush()db.addAof(utils.ToCmdLine2("flushdb", args...))return &reply.OkReply{}}func execRename(db *DB, args [][]byte) resp.Reply {......db.addAof(utils.ToCmdLine2("rename", args...))return &reply.OkReply{}}func execRenameNx(db *DB, args [][]byte) resp.Reply {......db.addAof(utils.ToCmdLine2("renamenx", args...))return reply.MakeIntReply(1)}
database/string.go
func execSet(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("set", args...)) return &reply.OkReply{}}func execSetNX(db *DB, args [][]byte) resp.Reply { ...... db.addAof(utils.ToCmdLine2("setnx", args...)) return reply.MakeIntReply(int64(result))}func execGetSet(db *DB, args [][]byte) resp.Reply { key := string(args[0]) value := args[1] entity, exists := db.GetEntity(key) db.PutEntity(key, &database.DataEntity{Data: value}) db.addAof(utils.ToCmdLine2("getset", args...)) ......}
添加addAof方法
测试命令*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n*2\r\n$6\r\nSELECT\r\n$1\r\n1\r\n
X 关闭
Copyright © 2015-2022 东亚养生网版权所有 备案号:琼ICP备2022009675号-13 联系邮箱:435 227 67 @qq.com