Redis笔记

chapter1.

Redis没有表和Schema(Schema就是数据库对象的集合,这个集合包含了各种对象如:表、视图、存储过程、索引等

Redis用 key-value存储数据,所有的key必须是string类型

Redis的命令不区分大小写,但存储的数据区分(包括key-value)

chapter2.

数据类型:(一个key只对应一种类型的value)

1.string

(关于双引号,应该默认是字符串,加引号保证空格的存在。单引号也可以)

SET 执行成功返回OK

(如果键值已经存在,那么覆盖先前的值。可以用EXIST命令来测试key的存在性。也可以用SETNX,即当key不存在时set。 set if not exist。如果设置成功,SETNX返回1.否则返回0)

命令选项:NX,XX(存在时候才set)

从Redis 2.6.12版本开始,SET命令的行为可以通过一系列参数来修改:

EX second:设置键的过期时间为second秒。SET key value EX second效果等同于 SETEX second value

PX millisecond: 设置键的过期时间为millisecond毫秒。SET key value PX m == PSETEX m value

NX:只有在键不存在时,才对键进行设置操作。SET key value NX == SETNX key value

XX:只有在键存在时,才对键进行设置操作。

选项例子:

redis 127.0.0.1:6379> SET not-exists-key “value” NX OK # 键不存在,设置成功

redis 127.0.0.1:6379> SET exists-key “value” XX

# EX 和 PX 可以同时出现,但后面给出的选项会覆盖前面给出的选项 redis 127.0.0.1:6379> SET key “value” EX 1000 PX 5000000 OK

redis 127.0.0.1:6379> SET key-with-expire-and-NX “hello” EX 10086 NX OK //我猜是EX时hello。NX时10086

GET 不存在时返回 (nil)

STRLEN 计算字符串长度

APPEND 向一个字符串末尾追加字符串。如果该字符串不存在,先新建一个空字符串,再append

SETRANGE 参数: stringA, int, stringB

对stringA从偏移量int开始,直到结尾,用stringB替换(偏移量从0开始)

同理,如果stringA不存在,也是先new一个空字符串。如果int为0,那么就是new。

如果int不为0,前面的字符为 \x00 (好像是C++里的空字符)

![img](E:\daily program\YNote\qqF3F14E026172AA79052A8219DFDB8FD2\7edf3f2832d645dca2c218773e8e0077$5pwjcaa(}cs.png)

\xhh表示十六进制,\x00 == 0x00, \x77 == 0x77。代码的作用应该就是看一下res地址对应的起始两个字节的值是否为0x00和0x77,对应于ascii字符的NULL和w。

OBJECT命令:可以查看value的内部编码形式 使用形式:OBJECT ENCODING stringA

(OBJECT还有其他功能,略)

string的三种编码形式:

int。存储64位有符号整数表示的字符串

embstr。长度小于等于44子节点字符串(使得在内存使用和性能方面更有效率)

raw。长度大于44字节的字符串

![img](E:\daily program\YNote\qqF3F14E026172AA79052A8219DFDB8FD2\801aa6d8307c4dea977870465435f2f2\clipboard.png)

2.列表 list (就是stack,queue,array等等的)

LPUSH key1 value1 value2

往 key1中添加 value1,value2值。返回添加元素之后key1的值。

如果key1已经存在,就是在后面继续添加,反之新建再insert

RPUSH (将value添加到list的右端)

LINSERT key1 BRFORE / AFTER value1

添加到value1的前面/后面

只有当list存在才添加。 LPUSHX, RPUSHX

LRANGE 获取一定范围里的value

LRANCE key1 int1 int2

即获取int1 ~ int2中的元素。index从0开始(跟数组一样)

(如果int2为 -1,表明读到结尾。所以 [0, -1] 就是输出所有value。

[2, -1]就是从第3个元素开始,输出到结尾)

获取某个index下的value:

LRANCE key1 index1 index1 即可。

也可以: LINDEX key1 index

LPOP RPOP 删除一个元素

list的index定义:从左到右: 0 ~ N - 1 从右到左: -1 ~ -N

(所以0 ~ -1 表示整个列表)

LTRIM key1 int1 int2

删除key1中int1 ~ int2 之外的value

例子:LTRIM key1 1 -1

即删除index为1到结尾以外的所有元素,即,只删除index为0的元素

LSET key1 int value

设置index为int的值,更改为value

LPOP和RPOP有对应的阻塞版本,即当list为空时,阻塞版本会将客户端阻塞。

BLPOP BRPOP

同时还要设置超时时间,如果为0,表示永久等待。(在任务调度场景十分有用)

例子:

worker1> BRPOP que1 0

worker2> BRPOP que1 0

两个终端,阻塞执行。

此时对que1添加一个value1,worker1先执行,解除阻塞,把value1删除。

然后再对que1添加value2,value3,此时worker2解除阻塞,把value2删除。

最后que1里剩下了value3(先添加value2,立即删除,所以是剩下value3)

3.哈希 HASH 类型

相当于Map类型,即key里面还存储了其他的key-value,但存储实现方式是hashing。

(Redis本身其实就相当于一个hash。Redis的数据对象也可以再次使用hash,只要字段和值都是string类型。为了与Redis的key区分,hash里的“key-value”应该说成 “field-value”)

HMSET key1 field1 value1 field2 value2 ……

设置多个hash字段,以及属性。(其实field-value集就是key1的value)

HMGET key1 field1 field2 ……

获取多个hash的value

HGET HSET (获取/设置单个)

HEXISTS 是否存在某个字段

HGETALL 获取所有field和value (阻塞,hash特别大时会导致Redis阻塞)

HDEL 删除field

可以用HSCAN替代HGETALL,此处略。

4.集合 SET类型

集合,SET,与Java一样,唯一,无序。(可用于去重)

SADD 添加

SISMEMBER 测试一个value是否在SET中

SREM 删除

SCARD 获取集合中成员的数量

SMEMBERS 列出所有value

同理,SMEMBERS会导致阻塞,可以改用SSCAN。略

集合间的操作: SINTER KEY [KEY …] 交集

​ SUNION KEY [KEY …] 并集

​ SDIFF 差集

将集合运算的结果保存到K中:SINTERSTORE K KEY [KEY…]

SUNIONSTORE SDIFFSTORE

SET的编码,有两种方式:

intset : 对于value全是整数,且元素个数小于set-max-intset-entries时(默认为512)

(可以节省占用空间)

hashtable: intset不适用时的默认编码

5.有序集合 SORTED SET类型

有序。 实现方式为:每个value还要拥有一个用于排序的权重。

ZADD key1 weight1 value1 weight2 value2 …… 添加。

(与SET命令类似,可以使用NX,XX等选项)

ZREVRANGE key1 int1 int2 [WITHSCORES]

获取排序。int1 ~ int2的value,0 ~ -1 即整个,和list一样。

WITHSCORES可选,表示 是否要列出 weight权重。

ZINCRBY key1 int value1

给value1的权重增加int

ZREVRANK key1 value1

获取value1的排名(第一为 0)

ZSCORE key1 value1

获取value1的权重

ZUNIONSTORE 合并两个SORTED SET。略

ZRANGEBYSCORE key MIN MAX(从小到大), ZREVRANGEBYSCORE

ZCOUNT KEY MIN MAX(指定范围内的成员个数)

集合之间的操作:ZINTERSTORE, ZUNIONSTORE (没有ZDIFFSTORE, ZUNION)

6.HyperLogLog (HLL)类型

唯一计数类型。虽然可以用SET来计数,但需要考虑内存消耗和性能下降问题。

如果我们不需要获取SET的内容,只想获得不同value的个数,就可以考虑用HLL

PFADD 添加

PFCOUNT 统计数量

PFMERGE 添加多个

HLL对象的两种存储方式:

Sparse(稀疏):对于长度小于hll-sparse-max-bytes(默认为3000)的HLL,采用此。

(存储效率更高,但可能会消耗更多的CPU资源)

Dense(稠密):当Sparse不适用时。

7.Geo类型

存储地理位置相关的数据类型

GEOADD, GEOPOS(获取), GEODIST(比较距离)

等等。略

key的管理。

DBSIZE: 显示数据库的size,即数据库一共有多少个key

获取所有的key:

KEYS *

scan 0

KEYS的后面的参数是 正则表达式,scan后面的参数是cursor,游标,不懂。

删除key:

DEL UNLINK

(二者都可以同时删除多个,并且返回删除的key数量)

UNLINK主要用于执行大KEY的异步删除(即非阻塞?)当删除string以外的数据类型,当数量很大时,很可能导致服务器延迟,此时应该用UNLINK(UNLINK会在另一个线程,而不是主事件循环线程中执行删除操作,因此不会阻塞事件的处理。)

EXISTS 判断是否存在某个key

TYPE 获取key的(值)数据类型

RENAME former_name new_name 重命名

迁移键 (其实这些比较复杂少用的,到时要用到肯定还是上网查的,目前还没用到死记硬背真的没必要!倒是几种基本类型的常用操作确实值得记忆一下!)

①MOVE KEY DB (redis内部可以有多个DB,用于内部迁移。但一般一个数据库即可!

②dump + restore

③migrate

(只需知道②是非原子性的,而③是原子性的即可!)

遍历键:keys ,或者scan(渐进式,游标。。)

关于多数据库,实际上是一个废弃的功能。并不能实现0号数据库是正式数据库,而1号数据库是测试数据库等等的想法。Redis已经在逐渐弱化这个功能,例如Redis的分布式实现Redis Cluster只允许使用0号数据库,留着只为了向下兼容。而且因为Redis是单线程的,所以使用的还是一个CPU,如果其中一个存在慢查询,那么依然会影响其他数据库(甚至会导致业务问题的定位非常困难)

PS:如果真的要使用多个数据库的功能,完全可以在一台机器上部署多个Redis实例,彼此之间使用端口号来进行区分(计算机一般有多个CPU,这样既保证了业务之间不会受到影响,又合理地使用了CPU资源)

清除所有数据:flushdb,flushall(后者是清除所有数据库) 慎用,会把所有数据都删掉,而且会阻塞。别乱用。!!!

chapter3.

数据特性:

1.位图 bitmap

(也称位数组或位向量),是由比特位bit组成的数组(其值只有0或者1)可用于记录每个用户是否属于某个表(使用过某个功能)等等。

(bitmap并不是一种新的数据类型,它实际的底层数据是字符串。)

-————————–

bitmap vs SET

显然,bitmap用于计数,而SET也可以,那么二者有什么区别呢?

对于bitmap:无论用户是否使用过某个功能,每个用户都需要占用bitmap中的1个比特(0/1)

对于SET:假设另起一个8字节的整型来存储用户ID(表示使用过某个功能),那么只有当用户使用过该功能,才需要存储该用户ID(一个ID 8字节)

假设有20亿个用户,某功能有80%的用户使用过。那么:

对于bitmap:需要在内存中分配20亿个bit,即大约250MB

对于SET:需要 20e x 80% x 8 大约是12.8G

结论:当一个功能比较流行,更密集,即如果使用bitmap,1的概率更大的时候,用bitmap更好。反之,如果一个功能比较稀疏,使用人数较少,那么直接使用SET会更好(比如上述例子改为1%)

-—————————

SETBIT key1 value1 1 / 0

对key1的value1设置相应的bit值,0或者1

GETBIT key1 value1

查询value1对应的bit值,返回0或者1

BITCOUNT key1

查询key1中所有的value,bit为1的个数。

BITOP [OPERATOR] result key1 key2

BITOP用于进行位操作,包括:AND,OR,XOR,NOT(NOT只需要指定1个key,其他2个)

即对key1和key2进行位操作,然后将结果存储在result键中。

2.设置key的过期时间

除了使用DEL或者UNLINK删除key,还可以通过设置key的超时时间让Redis自动地删除key

EXPIRE key1 int

将键key1的超时时间设置为int

TTL key1

查看key1在过期前的剩余时间。

如果该key并没有过期时间,那么将返回-1.

如果该key已经过期,那么将返回-2

当key过期之后是否立刻删除?

并不,但当客户端试图访问过期key时,Redis会立即将其从内存中删除。(被动过期)

而对于那些已经过期但还没被访问的key,有一个基于概率的算法来进行主动删除。(略)

当发现有太多已经过期的key没有被删除时,可以通过执行SCAN命令来触发被动过期

如何清除一个key的过期时间?

1.PERSIST 使一个key成为持久的key

2.当key的value被替换或者删除。包括SET,GETSET, *STORE等等的命令会清除过期时间

(但修改list,set,hash的元素并不会清除过期时间,因为修改元素的操作并不会替换key所关联的值对象)

3.被另一个没有过期时间的key重命名

3.SORT命令

当我们需要获取一个Redis列表或者集合的已排序副本,或者以某种非权重的顺序对SORTED SET中的元素进行排序时候,就需要用到SORT命令

SORT key1 对 list / set/ sorted set 进行排序

(默认只能对数字排序,int/ double)

对于字符串,需要加ALPHA修饰符。

SORT key1 ALPHA

SORT key1 ALPHA LIMIT int1 int2

排序之后,获取int1 ~ int2的元素。(index从0开始)

除了默认数字排序以及ALPHA,还可以通过其他key的权重来进行排序。

SORT key1 BY key2~x(用通配符表示多个)

(此处key2的value,hong123_104 > hong123_455 > hong123_333 > 200 > 365)

![img](E:\daily program\YNote\qqF3F14E026172AA79052A8219DFDB8FD2\ab95885bd2bc4ec9bbadda92013433f6\clipboard.png)

用处:

比如A_userid是一个用户的记录。 count_userid是一个用户的使用次数。(每一个userid都有对应)

那么当我们需要对A_userid的value进行排序(里面的value并不是使用次数,比如,是消费总额),而我们是希望通过“使用次数”这个属性来排序,那么就用到了 BY 修饰符。

SORT key1 BY key2* DESC/ASC(默认ASC)GET xxx STORE key3

GET选项:表示要获取哪一个类型的值,比如是key2,key1里的*,而不一定是value

(GET可使用多次)

STORE选项:把GET得到的结果存储到key3

4.管道 pipeline

Redis客户端与服务端使用RESP协议进行通信,大致的通信过程如下:

1,客户端向服务端发送一个命令

2,服务器接受该命令并放入执行队列(Redis是单线程的执行模型)

3,命令执行

4,服务器将命令执行的结果返回到客户端

第2,3步耗费的时间取决于Redis服务器(一般较短),主要的时延还是1,4步的网络传输。

使用pipeline好处:客户端将多个命令打包在一起,并将它们一次性发送,而不再等待每个单独命令的执行结果。同时,pipeline是在服务器执行完所有的命令后再返回结果。

因此即使是执行多个命令,第1,4步也只发生一次,所以总的执行时间会大大减少。

使用方法:在Redis-cli中加入 –pipe选项

5.Redis事务 transaction

关键命令:

MULTI:用于组装一个命令,表示命令的开始部分。

EXEC:用于执行一个事务

DISCARD:取消事务

WATCH:用于监视key。当这些key在事务执行之前发生改变,那么事务将被取消。

Redis事务与关系数据库事务的区别:Redis事务没有回滚。

所以当所有正确的命令入队列之后,如果在执行过程发生错误,位于发生错误命令之后的其他命令将继续执行,而不会回滚。

6.发布订阅 PubSub

即终端分为了发布者和订阅者。订阅者可以订阅指定的频道(发布者),当发布者发送信息时,会一次性发送给所有的订阅者,同时发布者也可以单独给某位订阅者发送信息(类似公众号的模式)

命令:

SUBSCRIBE 订阅 (如果该频道从未被订阅过,那么会自动创建该频道)

UNSUBSCRIBE 取消订阅

PUBLISH 发布信息(一个发布者可以是多个频道的所有者,因此需要选择发布到哪个频道)

7,Lua脚本

chapter4.

Redis常用场景:

1.存储Session。

Session一般存储在外部存储系统,如果一个web服务器宕机,其他服务器仍然可以从外部存储中获取Session并继续服务。而与关系数据库相比,Redis的访问延迟非常低(存储在内存),所以用于存储Session非常合适。同时Redis的对key过期机制非常契合Session的有效期。

2.分析,排行,队列,最新的N个记录等等

这些功能使用SQL也可以做到,但SQL查询要比Redis查询慢得多。

3.缓存

因为Redis是基于内存的数据存储系统,所以在关系数据库前面增加Redis作为缓存,通常能够加速数据库的查询过程。

例如,在查询关系数据库之前,先在Redis中查找记录。如果找到就直接使用,如果找不到就从关系数据库中查找,并将记录放置到Redis中。在向关系数据库写入时,我们也将记录写入Redis。同时为了限制缓存的大小,可以设置过期时间,或者LRU等等的收回策略。

Redis的缺点:

因为Redis默认将全部数据都放在内存中,所以Redis的容量有限,不能作为单一的数据库来存储大量的数据(虽然有一些基于云的Redis提供了使用SSD作为数据存储后端的选项)。

其次,因为Redis事务并不完全符合关系数据库的ACID规范。如果要使用完全符合ACID规范的事务,就不能使用Redis。

其他命令:

FLUSHALL:删除所有的数据库的所有keys。

CONFIG SET XXX XX 设置conf配置文件,将XXX的属性值改成XX

同理还有 CONFIG GET XXX

INFO MEMORY 查看Redis的内存使用情况

![img](E:\daily program\YNote\qqF3F14E026172AA79052A8219DFDB8FD2\2d1e6c0c369b432aa5976bc06ebecf77\clipboard.png)

普通Java程序使用Redis:使用Jedis即可。

1
2
3
4
5
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

Spring程序使用Redis:https://blog.csdn.net/Evankaka/article/details/50396325

依赖包,除了redis.clients,还需要:

1
2
3
4
5
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.7.2.RELEASE</version>
</dependency>

chapter5:

Redis复制,从一个Redis服务器,将data全部复制到另一个。略 INFO REPLICATION

chapter6:

由于Redis是在内存中存储数据的,所以当服务器重新启动时,所有的数据都将丢失。所以我们需要:像上一章一样复制复制备份到另一个服务器,或者持久化到disk。

持久化到磁盘的机制:RDB, AOF

RDB:可以看做是Redis在某一个时间点上的快点(snapshot),适合于备份和错误恢复

AOF:一个写入操作的日志,将在服务器启动时被重新放。

使用RDB的操作:

CONFIG SET SAVE “900 1” (还是使用CONFIG SET修改配置文件)

永久启用RDB:cat conf/redis.conf | grep “^save”

RDB的结果会生成yige.rdb文件。二进制形式。略

由于RDB并不能提供非常强的一致性,虽然可以定期将数据保存到RDB,但在崩溃时,保存的时刻到崩溃时刻中间的数据将会丢失。

AOF是一种只记录Redis写入命令的追加式文件,因为每个写入命令都会被追加到文件中,所以AOF的数据一致性更高。

启用AOF持久化:CONFIG SET APPENDONLY YES

永久启动: cat conf/dis.conf | grep “^appendonly”

二者可以同时使用。

chapter7:高可用和集群Redis

当Redis的数据越来越大的时候,安全性会受到损害(宕机的时候恢复的时间越来越长)同时key越来越多,内存的使用率越来越大,内存的大小成为了性能的瓶颈。因此配置多个node(多个Redis服务器的集群),可以增加安全性和可用性。

chapter8:生产环境部署

即在Linux上部署Redis,成为真正的服务器,可不仅仅是一个测试demo。这个时候还需要配置各种参数,如客户端连接参数,数据库本身参数,key的管理,LRU,LFU等等的算法策略,还有日志。

chapter9:管理Redis

即Redis服务上线之后,对日常的Redis进行运维操作。

查看参数,备份,监控内存使用情况,监控客户端等等。

chapter10:Redis的故障诊断

chapter11:使用Redis模块拓展Redis 略略略

————————–Redis Dev & Ops

chap2:

keys *, dbsize:

dbsize直接获取Redis内置的一个变量,时间复杂度为1

而keys会遍历所有键,所以时间复杂度是n(当Redis保存了大量键的时候,就不应该使用)

exists key, del key,

expire key seconds(设置过期时间

ttl key :返回剩余过期时间。 大于等于0就是剩余值,-1是没设置过期时间,-2是键不存在

type key, object encoding key : type是返回键的类型,object encoding是返回内部编码实现类型

(根据具体情况计算性能,底层选择不同的类型:

string字符串: raw, int, embstr

hash哈希: hashtable, ziplist

list列表: linkedlist, ziplist

set集合: hashtable, intset

zset有序集合(Sorted Set): skiplist, ziplist

incr key:使key自增1.(如果不是整数,会报错。如果不存在这个key,创建一个,默认值为0,incr后就是变成1。 必须是整数,浮点数也会报错)

Redis使用了单线程 + IO Multiplexing(多路复用)模型来实现高性能的内存数据库服务

(所谓的IO Multiplexing,指由N个client要访问,然后IO根据哪个client ready了,就选择那个(就像是一个电路图:

![img](E:\daily program\YNote\qqF3F14E026172AA79052A8219DFDB8FD2\1e1c5c3688d74413b43baa95057179b0\clipboard.png)

IO Multiplexing主要是可以提高连接数。但始终还是单线程,因此如果其中一个命令的执行时间特别长,那么就会造成其他命令的阻塞。所以Redis是面向快速执行场景的数据库。

(内存的处理速度是磁盘IO的10万倍,所以Redis的效率是很快的,能达到每秒万级别)

(内存是100ns, Disk seek是10 000 000ns)

①字符串类型 string:

key都是字符串类型,字符串可以是字符串,数字,二进制(图片,音频,视频

(所以 set 1 123也是可行的。key为1,value为123)

set 选项: ex秒级过期时间, px 毫秒级, nx:键必须不存在,才可以设置成功。 xx:与nx相反

例子:setnx hello redis (nx可以用作分布式锁的一种实现方案,因为只有一个可以set成功)

mset : 批量设置值 例子: mset key1 value1 [key2 value2 ……]

get, mget同理。

批量命令可以提升开发效率。如n次get,需要n次网络时间+n次命令时间。

而mget只需要1次网络时间+n次命令时间

(批量操作发送的命令数是有节制的,否则可能造成Redis阻塞或者网络阻塞)

del key [key ……] del可以删除多个。

incr,decr

incrby key increment(指定自增数值,可为负数) decrby key decrement (不能为浮点数

incrbyfloat key increment(每次增加的值为浮点数,同时key也能为浮点数,不然执行一次就没了)

append key value 在尾部追加值

strlen key 字符串长度

getset 设置,并且返回原来的值

setrange key offset value 设定offset偏移值的那一位,字符由value替代(从0开始

例子: set redis pest setrange redis 0 b get redis ==》 best

getrange key start end (偏移量从0开始。 [start, end] (没有-1)

总结:del,mset,mget这三个的时间复杂度为O(n),getrange为O(n),n是字符串的长度,如果字符串不是很长,可以视为O(1)。其他都是O(1)

字符串内部编码: int, embstr(小于等于39个字节的字符串) , raw(大于39个字节)

②哈希 hashing

即value本身又是一个键值对。

设置值: hset key field value (同样还有hsetnx。相当于set和setnx的区别

获取值: hget key field (一个key存储多个 field - value)

删除field: hdel key field1 [field2 ……] 返回成功删除field的个数

计算field的个数: hlen key

批量set/get : hmget key field [ field …] hmget key field value [ field value……]

hexists key field 判断是否存在

hkeys key 获取所有的field(感觉叫hfields更恰当,但确实是hkeys

hvals key

hgetall key 获取所有的field-value

(当哈希元素比较多,hgetall可能会阻塞。如果只需要获取部分,可以使用hmget。如果一定要获取全部field-value,可以使用hscan,该命令会渐进式地遍历哈希类型。)

hincrby key field increment 给key.field增加increment(可以为负数) hincrbyfloat

hstrlen key field

总结:hdel,hmget,hmset是O(k),k是field的个数。hgetall,hkeys, hvals是O(n),n是field的总数

其他都为O(1)

哈希内部编码:ziplist(压缩列表):当元素个数小于hash-mmmax-ziplist-entries(默认是512),同时所有值都小于hash-max-ziplist-value(默认是64字节),就使用ziplist(结构更紧凑,节省内存)

hashtable(哈希表):当无法满足ziplist时,ziplist的读写效率会下降,而hashtable的读写为O(1)

ps:对于一个用户的各种信息属性,可以单独用不同的key,但这样用户信息性差,一般不会使用这种方案。比较好的方案是使用哈希类型,提高内聚性,控制在ziplist之内,有效减少内存空间的使用。(必须的时候使用hashtable也不是不行)

哈希类型是稀疏的,而关系数据库是完全结构化的,当在关系数据库增加新的列,所有行都要设置值(如果没有指定就设置为NULL)。但可以做复杂的关系查询,而Redis比较困难。

③列表 list

rpush,lpush(二者都可以插入多个) linsert key before | after pivot value

lrange key start end 查找

……

懒了,其实各种类型的操作就略过吧,上面已经看过一次了,把上面的复习一遍就好。主要看它举的实例并理解。

chap3:各种功能topic

①慢查询

Redis有一个表专门记录命令执行较长的命令(只包含执行时间,不包含客户端等待时间)

参数:slowlog-log-slower-than (超出该值则记录,单位是微秒。)

1 秒 = 1000 毫秒 = 1000 000微秒 (即默认是10毫秒)

slowlog-max-len :慢查询日志的最大记录,如果超出上限,则最早insert的那条删除。

(slowlog-max-len设置大一点,不会耗费很多内存空间的)

设置命令:config xx int 持久化到配置文件: config rewrite

获取日志: slowlog get [n] // n表示指定条数

slowlog len : 获取慢查询日志列表的当前长度

slowlog reset:重置(即clear

②redis shell

redis-cli的各种参数:

-h : host -p: port -a: auth

-r 重复执行n次: redis-cli -r 3 ping

(即重复执行3次ping命令,ping命令会测试服务器的连通性,如果连通,返回PONG)

-i 表示每隔几秒执行一次命令(与-r连用,单位是秒)

例子:redis-cli -a xxx -r 10 -i 1 info | grep used_memory_human 每隔1秒输出内存的使用量

-x:将stdin读取,作为redis-cli的最后一个参数,例如;

echo “gogogo” | redis-cli -a xxx -x set hello # 之后hello的值就被设置为”gogogo\n”

值得注意的是,echo输入,会存在一个换行符,进入redis-cli里再set才不会有换行符。

–scan和–pattern :扫描指定模式的keys,相当于使用scan命令

–slave:将当前客户端模拟成一个子节点,然后获取当前Redis结点的更新操作。

(可以用于记录当前连接Redis结点的一些更新操作)

(在一个客户端中使用redis-cli –slave -a xxx,然后其他客户端的更新,当前都会有命令记录)

–rdb:请求Redis实例生成并发送RDB持久化文件,保存在本地。

–pipe:把批量的命令一次发送到一个管道中,一次执行

–bigkeys:使用scan命令获取占用内存比较大的键值。(这些键可能是系统的瓶颈)

–eval : 执行指定的Lua脚本

–latency :测试到目标Redis的网络延迟 redis-cli -h {另一台机器的host IP} –latency

–latency-history :上面的只有一条记录,而这个是每执行15秒就生成另一条记录(可以用作对比),同时也可以使用-i参数来控制间隔时间。

-latency-dist 使用统计图表的形式从控制台输出信息

info : redis-cli info 输出redis的所有信息(很长,配合| grep使用)

redis-cli –stat:实时获取Redis的重要统计信息(比info少很多,但可以实时查看一些增量的数据)

–raw , –no-raw 使得返回的结果是原始格式 / 不是原始格式

当存储中文时,保存在redis里的中文会被转义成16进制。此时无论是命令行时get还是进入redis-cli之后get,返回的结果都是二进制格式,如: \xe4\xbd\xa0\xe5\xa5\xbd

如果使用–raw,则返回原始格式,即中文: redis-cli –raw get hello (–raw必须在get前面)

PS:但是在进入redis-cli之后的get好像无法指定参数,所以不能返回raw,只能在命令行时指定–raw

③redis-server

redis-server是Redis的启动脚本。可选参数只有:–test-memory,用于检测当前的OS能否稳定地分配指定容量的内存给Redis(防止因为内存问题造成Redis崩溃)

(检测时间较长,但输出passed this test表示检测完毕。 单位是MB,所以1024就是1G)

一般只是用于调试跟测试,快速沾满机器内存做一些边界测试

④redis-benchmark

用于为Redis做基准性能测试

-c:表示客户端的并发数量(默认50)

-n:表示客户端的请求总量(默认是100000)

例子:redis-benchmark -c 100 -n 20000,代表100个客户端同时请求Redis,一共执行20000次。

-q:只显示 requests per second信息。

-r:插入更多的键。如-r 10000,插入10000个随机的键(仅作测试使用)

-P:表示每个请求的pipeline的数据量(默认是1)

(我发现恰当指定pipeline的值,比默认是1要快。)

-k:表示客户端使用使用keepalive,1为使用,0为不使用,默认是1

-t 指定命令测试

–csv:按照csv格式输出,可以到处到excel等(注意是两条杠)

⑤Pipeline

Redis客户端的一条命令的4个过程:发送命令,命令排队,命令执行,返回结果。

(第一步跟第四步称为Round Trip Time,RTT,往返时间)

虽然有部分批量操作的命令,但大部分命令都没有批量命令,此时需要使用pipeline,一次过发送多个命令,减少RTT的次数(由于Redis的处理速度很快,只要Pipeline包含的命令数不是太多,就能有效提升效率)

(高级语言的客户端中基本都支持Pipeline。比如Jedis。)

提升的结论:

1.一般Pipleline的执行速度会比逐条执行要快(Redis的性能瓶颈是网络,即RTT)

2.客户端和服务端的网络时延越大(即RTT越大),那么Pipeline的提升效果越明显。

原生批量命令与Pipeline的区别:

1.原生批量命令是原子的,而Pipeline是非原子的。

2.原生批量是一个命令对应多个key,而Pipeline是支持多个命令的

3.原生批量是Redis服务端支持实现的,而Pipeline需要服务端和客户端的共同实现。

Ps:Pipeline一次组装的命令个数不能太多,否则会增加客户端的等待时间,以及网络阻塞,可以将一次包含大量命令的Pipeline拆分成多次较小的Pipeline来完成。

Pipeline虽然只能操作1个Redis实例,但即使在分布式Redis的场景中,也可以作为批量操作的重要优化手段。

⑥事务

Redis提供了简单的事务功能,但又跟关系数据库的事务不太一样。它无法保证很多方面的数据一致性,因为需要复杂事务的业务不应该放在Redis中实现。

multi:开启一个事务(事务里不可以嵌套事务

……中间执行的各种命令都会输出QUEUED,即存储在事务里,但还没开始执行。

exec:执行事务。 discard:取消事务

Redis的事务不提供回滚功能,如果执行出错,那么会出现问题:

1.当命令出现语法错误

此时Redis在执行每一条命令之前就已经发现了,因而这时候执行exec会显示错误。然后每一条语句都会失效。(因为根本就没有去执行,并不是回滚)

2.当命令出现运行时错误

有一些语句,不存在语法错误,但在运行时才会出现错误,这时Redis在检查语句的时候是不能发现的,只能在执行的时候才发现。比如对于SET跟SORTED SET,对一个SET使用SSET的ZADD操作,并不属于语法错误,但在运行时会出错。这时候事务不会回滚,因而在出错之前的语句,仍然已经执行成功,而且出错之后的语句也会执行!无论如何,这种问题只能靠程序员自己避免。

watch: 使得客户端监控1个或者多个键,有效期直到执行了unwatch或者exec。

当客户端A监控的这些keys,只要有其中一个被修改了(除非是在A的事务中进行修改的),那么该客户端A下的第一个事务不会执行。(返回nil)

小细节点:

1.如果是在当前客户端A的事务中修改了被监控的值,那么是可以成功执行的,因为要保证的是,在执行事务的时候,该值不会在事务在之外发生了变化。

2.如果是在当前客户端A的事务之外修改了被监控的值,那么也是不可以执行的。如果是在其他客户端B中修改,那么一定不能执行(即使是在B的事务环境中执行,也不行)

3.准确地说,并不是执行exec会取消,而是当事务(即multi)结束的时候,watch的键会取消。所以,如果执行了multi……discard,这时候其实watch已经失效了。后面如果继续使用multi,是可以执行事务的。所以单独地使用exec / discard(即在没有multi的前提下运行这两条命令),watch是仍然存在的。

⑦Lua

语法省略。

Redis使用Lua的方法:eval,evalsha

eval当作命令的时候: eval 脚本内容 key个数 key列表 参数列表

例子:eval ‘return “hello “ .. KEYS[1] .. ARGV[1] ‘ 1 redis world

输出:”hello redisworld”

如果Lua脚本较长,可以使用–eval直接执行文件

evalsha:将Lua加载到Redis,得到该脚本的SHA1检验和,之后evalsha使用SHA1作为参数就可以直接执行对应的Lua脚本,避免每次都发送Lua脚本的开销,而且脚本也能常驻在服务端了。

script load:将脚本内容加载到Redis内存中:redis-cli script load “xxx” 返回SHA1值。

执行脚本: evalsha 脚本SHA1值 key个数 key列表 参数列表

Lua的redis.call可以实现对Redis的访问:

redis.call(“set”, “hello”, “world”) redis.call(“get”, ”hello”)

pcall也可以调用Redis,但如果call执行失败,脚本会直接结束,而pcall会忽略错误继续执行(根据具体的情况进行函数的选择)

Lua对于Redis的好处:

1.Lua在Redis中的原子执行的,执行中途不会插入其他命令

2.Lua可以自定制命令,并且可以将这些命令常驻在Redis内存中,达到复用的效果

3.Lua可以将多条命令一次性打包,有效地减少网络开销。

举一个Lua的例子:

假设有user1,user2,user3,user4这几个键,分别对应一个数字。

如get user1 =》 123 ,user2 = 444 , user3 = 743 , user4 = 814

有一个列表,存储了这几个键:

lpush hot user1, user2, user3, user4

现在我们需要对列表内所有的键对应的值进行+1操作,并且保证是原子执行,返回操作的个数。

Lua脚本:

![img](E:\daily program\YNote\qqF3F14E026172AA79052A8219DFDB8FD2\a8de5b73eb2142e490223c07e13bee29\clipboard.png)

执行Lua脚本: redis-cli -a xxx –eval my.lua hot(即执行my.lua脚本,第一个key参数是hot列表)

Redis管理Lua脚本:

script load:将Lua脚本加载到Redis

script exists sha1 [sha1 …]:判断sha1是否已经加载到内存

script flush:清除Redis内存以及加载的所有Lua脚本(此时需要重新load

script kill:杀掉正在执行的Lua脚本(如果Lua比较耗时,或者存在死循环等问题,就可以考虑kill

ps:Redis提供了一个参数,是Lua脚本的超时时间,但仅仅是当Lua脚本超过该时间之后,向其他命令发出BUSY的信号,但并不会停止掉服务端和客户端的脚本执行。其他命令会收到“Busy Redis is busy running a script”的错误。

但是,如果Lua脚本正在执行写操作,那么script kill将不能生效,此时只能使用shutdown save停掉Redis服务。(如果是死循环的重复写操作,那么只能shutdown save了)

⑧bitmap,HyprtLogLog,订阅,GEO等等(上面的笔记有

chap4:客户端

Redis的CS模型:

Redis制定了RESP(Redis Serialization Protocol,Redis序列化协议),实现客户端与服务端的正常交互。这种协议简单高效,能被机器解析,同时也容易理解。

发送命令的格式:先指定参数个数(前面加一个*,然后每一个参数前面用$加数字,表示该参数的长度。)

例子:SET hello world

底层发送的命令格式:(下面的各种参数与\r\n之间是没有空格的,只是这里增加以下可读性

*3 \r\n $3 \r\n SET \r\n $5 \r\n hello \r\n $5 \r\n world \r\n

上面的每一部分解释:

首先,每一part都是单独的一行(\r\n就是换行了),然后*3表示有3个参数, $3表示第一个参数的长度3,第一个参数是SET。$5表示第二个参数长度5,第二个参数是hello.第三个的长度也是$5,值为world

返回结果的格式:

有5种:

1.状态回复: 在RESP中,第一个字节是 +

2.错误回复:在RESP中,第一个字节是 -

3.整数回复:在RESP中,第一个字节是 :

4.字符串回复:在RESP中,第一个字节是 $

5.多条字符串回复:在RESP中,第一个字节是 *

从redis-cli.c的源码的 cliFormatReplyTTY方法,可以看到一共有这几种执行结果:

![img](E:\daily program\YNote\qqF3F14E026172AA79052A8219DFDB8FD2\8cc4710fe2404a6582bb758a14fd3161\clipboard.png)

![img](E:\daily program\YNote\qqF3F14E026172AA79052A8219DFDB8FD2\a3fbdf67f35f4db3a293a700bf15f534\clipboard.png)

一共有REDIS_REPLY_ERROR, STATUS, INTEGER, STRING, NIL, ARRAY

Jedis的重要类,重要方法,重要功能等等:

1.基本连接: Jedis

但默认是直连,于是每一次都要新建/关闭TCP的开销,资源无法控制(无法限制Jedis对象的个数),Jedis线程不安全。而使用连接池,可以事先初始化好Jedis连接,降低TCP的开销。而且借用和归还到连接池的操作都是在本地进行的,只有少量的并发同步开销。

2.Jedis连接池:JedisPool

一般还有它的配置类GenericObjectPoolConfig类

1
2
3
4
5
<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxIdle" value="100" />
<property name="maxWaitMillis" value="1000" />
<property name="testOnBorrow" value="true" />
</bean>

(此处的JedisPoolConfig是GenericObjectPoolConfig的子类,只有一个默认构造方法,初始化一下:

1
2
3
4
5
6
7
8
public class JedisPoolConfig extends GenericObjectPoolConfig {
public JedisPoolConfig() {
this.setTestWhileIdle(true);
this.setMinEvictableIdleTimeMillis(60000L);
this.setTimeBetweenEvictionRunsMillis(30000L);
this.setNumTestsPerEvictionRun(-1);
}
}

PS:书上列了一堆GenericObjectPoolConfig的属性,因为BaseObjectPoolConfig还有很多。

1
2
3
4
5
6
7
8
9
10
11
12
public class GenericObjectPoolConfig extends BaseObjectPoolConfig {
public static final int DEFAULT_MAX_TOTAL = 8;
public static final int DEFAULT_MAX_IDLE = 8;
public static final int DEFAULT_MIN_IDLE = 0;
private int maxTotal = 8;
private int maxIdle = 8;
private int minIdle = 0;

public GenericObjectPoolConfig() {

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public abstract class BaseObjectPoolConfig implements Cloneable {
public static final boolean DEFAULT_LIFO = true;
public static final boolean DEFAULT_FAIRNESS = false;
public static final long DEFAULT_MAX_WAIT_MILLIS = -1L;
public static final long DEFALUT_MIN_EVICTABLE_IDLE_TIME_MILLIS = 1800000L;
public static final long DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS = -1L;
public static final int DEFAULT_NUM_TESTS_PER_EVICTION_RUN = 3;
public static final boolean DEFAULT_TEST_ON_CREATE = false;
public static final boolean DEFAULT_TEST_ON_BORROW = false;
public static final boolean DEFAULT_TEST_ON_RETURN = false;
public static final boolean DEFALUT_TEST_WHILE_IDLE = false;
public static final long DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = -1L;
public static final boolean DEFAULT_BLOCK_WHEN_EXHAUSTED = true;
public static final boolean DEFAULT_JMX_ENABLE = true;
public static final String DEFAULT_JMX_NAME_PREFIX = "pool";
public static final String DEFAULT_JMX_NAME_BASE = null;
public static final String DEFAULT_EVICTION_POLICY_CLASS_NAME = "org.apache.commons.pool2.impl.DefaultEvictionPolicy"
private boolean lifo = true;
private boolean fairness = false;
private long maxWaitMillis = -1L;
private long minEvictableIdleTimeMillis = 1800000L;
private long softMinEvictableIdleTimeMillis = 1800000L;
private int numTestsPerEvictionRun = 3;
private String evictionPolicyClassName = "org.apache.commons.pool2.impl.DefaultEvictionPolicy"
private boolean testOnCreate = false;
private boolean testOnBorrow = false;
private boolean testOnReturn = false;
private boolean testWhileIdle = false;
private long timeBetweenEvictionRunsMillis = -1L;
private boolean blockWhenExhausted = true;
private boolean jmxEnabled = true;
private String jmxNamePrefix = "pool";
}

3.看一下当时在Spring下自己的项目的Redis完整配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxIdle" value="300" />
<property name="maxWaitMillis" value="1000" />
<property name="testOnBorrow" value="true" />
<property name="maxTotal" value="8" />
</bean>

<bean id="connectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="poolConfig" ref="poolConfig" />
<property name="port" value="6379" />
<property name="hostName" value="xxx.xx.xx.xxx" />
<property name="password" value="xxxx" />
<property name="timeout" value="100000" />
<property name="database" value="0"/>
</bean>

<!-- org.springframework.data.redis.core.RedisTemplate-->
<bean id="redisTemplate" class="com.hong.utils.redis.MyRedisTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="hashKeySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
<property name="keySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
<property name="valueSerializer">
<bean class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer"/>
</property>
<property name="hashValueSerializer">
<bean class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer"/>
</property>
</bean>

<bean id="methodCacheInterceptor" class="org.springframework.cache.support.SimpleCacheManager">
<property name="caches">
<set>
<bean class="com.hong.utils.redis.RedisUtil">
<property name="redisTemplate" ref="redisTemplate"/>
<property name="name" value="common" />
</bean>
</set>
</property>
</bean>

<bean id="redisUtil" class="com.hong.utils.redis.RedisUtil" >
<property name="redisTemplate" ref="redisTemplate"/>
<!-- name属性的值,是用于等会的注解当中 -->
<property name="name" value="common" />
</bean>

还是很好理解的,首先JedisPoolConfig是连接池的配置。

然后JedisConnectionFactory就是连接的配置(连接池要预先创建连接,那么自然需要一个Factory去创建连接,而至于hostName,password,timeout,port等等的,确实就应该写在factory里了,同时还传入了poolConfig配置,database就是默认是第一个数据库吧,也没什么好说的)

然后是RedisTemplate,这个也很显然,就跟JdbcTemplate,就是封装了原生的Redis API,然后提供了更易操作的方法接口罢了。我们看一下方法:

![img](E:\daily program\YNote\qqF3F14E026172AA79052A8219DFDB8FD2\3e927f8193dc487b9d462e096537527a\clipboard.png)

![img](E:\daily program\YNote\qqF3F14E026172AA79052A8219DFDB8FD2\a5493d3d73cb441ca0a50ab95ad2323f\clipboard.png)

方法确实很多哈,这里也没必要截完了,反正都是封装好了的Redis原生API,不妨随意选择一个方法去查看?就选keys方法吧,返回所有的键,很简单的方法逻辑,这里重在理解方法的主要构成,没必要选择太复杂的方法逻辑:

1
2
3
4
5
6
7
8
9
public Set<K> keys(K pattern) {
final byte[] rawKey = this.rawKey(pattern);
Set<byte[]> rawKeys = (Set)this.execute(new RedisCallback<Set<byte[]>>() {
public Set<byte[]> doInRedis(RedisConnection connection) {
return connection.keys(rawKey);
}
}, true);
return this.keySerializer != null ? SerializationUtils.deserialize(rawKeys, this.keySerializer) : rawKeys;
}

看不出什么,但感觉关键就是把具体的参数传到execute方法,然后再执行,再看几个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected List<Object> execRaw() {
return (List)this.execute(new RedisCallback<List<Object>>() {
public List(Object) doInRedis(RedisConnection connection) throws DataAccessException {
return connection.exec();
}
});
}

public void delete(K key) {
final byte[] rawKey = this.rawKey(key);
this.execute(new RedisCallback<Object>() {
public Object doInRedis(RedisConnection connection) {
connection.del(new byte[][]{rawKey});
return null;
}
}, true);
}

显然,都是把不同的参数传到了execute方法。所以说就是对execute方法进行重载咯!我们查一下,确实,一堆的execute重载方法:

![img](E:\daily program\YNote\qqF3F14E026172AA79052A8219DFDB8FD2\bf4ed38156ac4f6f811fff3169568859\clipboard.png)

那我们还是继续拿keys指向的那个execute来看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(action, "Callback object must not be null");

RedisConnectionFactory factory = getConnectionFactory();
RedisConnection conn = null;
try {

if (enableTransactionSupport) {
// only bind resources in case of potential transaction synchronization
conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
} else {
conn = RedisConnectionUtils.getConnection(factory);
}

boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);

RedisConnection connToUse = preProcessConnection(conn, existingConnection);

boolean pipelineStatus = connToUse.isPipelined();
if (pipeline && !pipelineStatus) {
connToUse.openPipeline();
}

RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
T result = action.doInRedis(connToExpose);

// close pipeline
if (pipeline && !pipelineStatus) {
connToUse.closePipeline();
}

// TODO: any other connection processing?
return postProcessResult(result, connToUse, existingConnection);
} finally {

if (!enableTransactionSupport) {
RedisConnectionUtils.releaseConnection(conn, factory);
}
}
}

前面两行Assert,普通的断言语句。然后后面的几行就很显然了,先获取ConnectionFactory,然后再在try-catch块里获取connection。具体的怎么getConnectionFactory这种是涉及到比较复杂的很多类的跳转的,但本质上都只是设计模式的运用,此处可以不管。接下来的就是Redis的操作了,先判断一下事务同步管理器,连接预处理等等的,最后再放入pipeline执行(因为keys是一个批处理操作,原子性的)。最后再返回postProcessResult。话说再刷新了一下,这段代码更新了,这个var11确实没有存在的必要,直接return postPro…即可。但总体来说,确实也是获取到ConnectionFactory,然后生成连接,再操作。那么,RedisTemplate的作用确实也跟我们预期的一样。

除此之外,网上也有提到,Redis可以直接存储Java对象,那么就需要序列化跟反序列化。 在Spring中,是通过StringRedisSerializer跟JdkSerializationRedisSerializer实现的。那么,RedisTemplate下面传入的两个蜜汁property也是可以理解了。至于传入connectionFactory,那是当然的了,我们上面看源码也已经看到了,RedisTemplate是需要调用connectionFactory生成连接的)

接下来最后这两个,MethodCacheInterceptor跟RedisUtil。我第一次使用的时候,直接拿RedisUtil当作RedisTemplate使用了,只是简单的set,get,那确实是足够的(RedisUtil是我的自定义类,只有最简单的set,get,remove)。其实认真一点看类名,就知道它的作用了,MethodCacheInterceptor,Redis的作用是什么?就是Cache。那么Method,Interceptor?显然,就是对方法进行拦截,确定哪些属性要考虑缓存的问题。如果需要,那么就要用到RedisUtil(基本就是最简单的set,get,remove,所以使用一个迷你版的RedisUtil就可以满足RedisTemplate的功能了)

至于MethodCacheInterceptor,是自己写的方法,我们看一下这份网上cv的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
public class MethodCacheInterceptor implements MethodInterceptor {
private Logger logger = Logger.getLogger(MethodCacheInterceptor.class);
private RedisUtil redisUtil;
private List<String> targetNamesList; // 不加入缓存的service名称
private List<String> methodNamesList; // 不加入缓存的方法名称
private Long defaultCacheExpireTime; // 缓存默认的过期时间
private Long xxxRecordManagerTime; //
private Long xxxSetRecordManagerTime; //


public MethodCacheInterceptor() {
try {
File f = new File("D:\\cacheConf.properties");
//配置文件位置直接被写死,有需要自己修改下
InputStream in = new FileInputStream(f);
// InputStream in = getClass().getClassLoader().getResourceAsStream(
// "D:\\lunaJee-workspace\\msm\\msm_core\\src\\main\\java\\com\\mucfc\\msm\\common\\cacheConf.properties");
Properties p = new Properties();
p.load(in);
// 分割字符串
String[] targetNames = p.getProperty("targetNames").split(",");
String[] methodNames = p.getProperty("methodNames").split(",");

// 加载过期时间设置
defaultCacheExpireTime = Long.valueOf(p.getProperty("defaultCacheExpireTime"));
xxxRecordManagerTime = Long.valueOf(p.getProperty("com.service.impl.xxxRecordManager"));
xxxSetRecordManagerTime = Long.valueOf(p.getProperty("com.service.impl.xxxSetRecordManager"));
// 创建list
targetNamesList = new ArrayList<String>(targetNames.length);
methodNamesList = new ArrayList<String>(methodNames.length);
Integer maxLen = targetNames.length > methodNames.length ? targetNames.length
: methodNames.length;
// 将不需要缓存的类名和方法名添加到list中
for (int i = 0; i < maxLen; i++) {
if (i < targetNames.length) {
targetNamesList.add(targetNames[i]);
}
if (i < methodNames.length) {
methodNamesList.add(methodNames[i]);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Object value = null;
String targetName = invocation.getThis().getClass().getName();
String methodName = invocation.getMethod().getName();
if (!isAddCache(targetName, methodName)) // 不需要缓存的内容
return invocation.proceed(); // 执行方法返回结果
Object[] arguments = invocation.getArguments();
String key = getCacheKey(targetName, methodName, arguments);
System.out.println(key);
try {
if (redisUtil.exists(key)) // 判断是否有缓存
return redisUtil.get(key);

value = invocation.proceed(); // 写入缓存
if (value != null) {
final String tkey = key;
final Object tvalue = value;
new Thread(new Runnable() {
@Override
public void run() {
if (tkey.startsWith("com.service.impl.xxxRecordManager")) {
redisUtil.set(tkey, tvalue, xxxRecordManagerTime);
} else if (tkey.startsWith("com.service.impl.xxxSetRecordManager")) {
redisUtil.set(tkey, tvalue, xxxSetRecordManagerTime);
} else {
redisUtil.set(tkey, tvalue, defaultCacheExpireTime);
}
}
}).start();
}
} catch (Exception e) {
e.printStackTrace();
if (value == null) {
return invocation.proceed();
}
}
return value;
}

/**
* 是否加入缓存
*
* @return
*/
private boolean isAddCache(String targetName, String methodName) {
boolean flag = true;
if (targetNamesList.contains(targetName)
|| methodNamesList.contains(methodName)) {
flag = false;
}
return flag;
}

/**
* 创建缓存key
*
* @param targetName
* @param methodName
* @param arguments
*/
private String getCacheKey(String targetName, String methodName,
Object[] arguments) {
StringBuffer sbu = new StringBuffer();
sbu.append(targetName).append("_").append(methodName);
if ((arguments != null) && (arguments.length != 0)) {
for (int i = 0; i < arguments.length; i++) {
sbu.append("_").append(arguments[i]);
}
}
return sbu.toString();
}

public void setRedisUtil(RedisUtil redisUtil) {
this.redisUtil = redisUtil;
}
}

首先第一个是构造方法,里面就是对Interceptor的配置,比如过期时间啊,要拦截哪部分的方法啊等等的。这段代码显然有点糙(毕竟上面的都是优雅的源码,这个虽然也是网上大佬写的,但还是有差距哈)。然后invoke方法,这个是最关键的方法。我们的这个类实现了MethodInterceptor接口,这个接口就是只有这么1个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface MethodInterceptor extends Interceptor {

/**
* Implement this method to perform extra treatments before and
* after the invocation. Polite implementations would certainly
* like to invoke {@link Joinpoint#proceed()}.
*
* @param invocation the method invocation joinpoint
* @return the result of the call to {@link
* Joinpoint#proceed()}, might be intercepted by the
* interceptor.
*
* @throws Throwable if the interceptors or the
* target-object throws an exception. */
Object invoke(MethodInvocation invocation) throws Throwable;
}

从注释也可以看得出来,用于对方法在before或者after进行加强(此处有点像AOP)。而对于invoke方法,前面首先是要读取配置辣,然后try-catch也是比较显然的,判断是否有缓存,如果有,直接return,如果没有,写入缓存(其实这里的interceptor的invoke方法写得不够好,应该要分情况去缓存的,这个大概只是一个demo,还要修改很多,但这里暂时不讨论)。后面的这个Thread我没看懂是为何,既然用到了RedisUtil,应该跟它demo的逻辑有关系,但跟整体的Cache关系不大,略。至于其他的方法,都是辅助这个invoke方法的,比如判断是否要写入缓存,是否存在键等等,关键还是invoke。

至此,对当时Spring整合Redis的全部配置文件,每一个导入的类作用是什么,为什么要有这些property,应该是能有比较清晰的认知了。

4.Jedis的一些重要方法

其实最关键的框架也就是上面的第3点了。这一部分还有的就是Pipeline,调用Lua脚本等等,也是直接调用就行(看着方法名,方法参数来就行,只要对Redis的原生操作足够熟悉)。

Pipeline就直接使用Pipeline类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Pipeline extends MultiKeyPipelineBase implements Closeable {
private Pipeline.MultiResponseBuilder currentMulti;

public Pipeline() {
}

protected <T> Response<T> getResponse(Builder<T> builder) {
if (this.currentMulti != null) {
super.getResponse(BuilderFactory.STRING);
Response<T> lr = new Response(builder);
this.currentMulti.addResponse(lr);
return lr;
} else {
return super.getResponse(builder);
}
}

public void setClient(Client client) {
this.client = client;
}

protected Client getClient(byte[] key) {
return this.client;
}

protected Client getClient(String key) {
return this.client;
}

public void clear() {
if (this.isInMulti()) {
this.discard();
}

this.sync();
}

//...
}

调用Lua是在Jedis下的方法:

![img](E:\daily program\YNote\qqF3F14E026172AA79052A8219DFDB8FD2\60fbd8249e904bc3a36241008b228f76\clipboard.png)

看了一下,RedisTemplate搜eval竟然没有相关方法。然后再搜了一下script,就出来了:

![img](E:\daily program\YNote\qqF3F14E026172AA79052A8219DFDB8FD2\c46dfa03d3e647adb313655d32a4e859\clipboard.png)

显然,这个就是调用脚本的总方法了。原来没有eval是因为,RedisTemplate还有更大的野心,它要一个方法来执行所有的跟Redis兼容的脚本。。那么看来Jedis也有其他的调用脚本的方法,此处就不再赘述。

客户端管理的API

client list:列出与Redis服务端相连的所有客户端连接信息。下面的客户端的属性:

标识:id,addr,name,fd(socket的文件描述符,如果fd=-1,表示客户端不是外部客户端,而是Redis内部的伪装客户端)

输入缓冲区: qbuf, qbuf-free。缓冲区会动态调整,只是要求每个客户端缓冲区的大小不能超过1G。如果超过了,客户端将会被关闭。而且缓冲区不受maxmomory控制,假设一个Redis实例设置了maxmemory为4G,已经存储了2G,但是如果此时输入缓冲区使用了3G,加起来就超过了maxmemory的限制,可能会产生数据丢失,键值淘汰,OOM等情况。

导致输入缓冲区过大的主要原因:Redis的处理速度跟不上输入缓冲区的输入速度,并且每次进入输入缓冲区的命令包含了大量的bigkey,从而造成了输入缓冲区过大的情况。或者是Redis发生了阻塞,短期内不能处理命令,造成客户端输入的命令积压在了输入缓冲区。

监控输入缓冲区的方法:

1.client list(收集qbuf和qbuf-free,找到可能出现问题的客户端)

2.info clients,找到最大的输入缓冲区,设置client_biggest_input_buf参数,超过一定值告警

输出缓冲区:obl,oll,omem。与输入缓冲区不一样,输出区可以通过参数设置。输出区根据客户端的不同分为3种:普通客户端,发布订阅客户端,slave客户端(子客户端)。包含:固定缓冲区(16KB,用于返回比较小的执行结果0,动态缓冲区(返回比较大的结果)

客户端的存活状态:age:连接时间 idle:最近一次空闲时间

通过设置参数来控制客户端: config set maxclients(最大客户端连接数,默认10000)

config set timeout(客户端空闲时间超过这个值,就会被关闭,默认是0,即不关闭)

(一般开发的时候用默认的就行,实际发布和运维的时候,idle是必须大于0的)

flag:参数:表示客户端的类型。

N:普通客户端。M:master。S:slave。O:正在执行monitor命令。x:正在执行事务

b:正在等待阻塞事件 u:客户端未被阻塞 d:一个被watch的键已被修改,无法exec事务

c:回复完整的输出后,关闭连接 A:尽可能快地关闭连接

其他诸如:client setName/getName , kill port, pause timeout(毫秒),不再赘述

monitor:可以监控其他客户端的命令调用(在哪个时间,调用了哪个命令)。但在并发量过大的时候,monitor客户端的输出缓冲会暴涨,可能会瞬间占用大量内存。

tcp-keepalive:检测TCP连接活性的周期,默认是0.防止大量的死连接占用系统资源。

tcp-backlog:是一个队列,用于接受的TCP连接。此参数就是队列的大小,默认511,一般不修改。

-------------本文结束感谢您的阅读-------------