[TOC]
Quartz
分为三个部分:e
Job&Detial(任务):定时任务的执行方法,与Trigger配套的
Trigger(触发器):规定什么时候触发,与Job&Detail配套的
Scheduler(调度器):单例,把Trigger丢里面由调度器调度,只需要一个Scheduler,配置不同的Trigger;可以理解成类似线程池的东西
原理:ScheduledThreadPoolExecutor线程池 + 通过Object类的wait()和notify()或者Condition类的await()\signal()进行等待和唤醒、锁保证线程安全 来进行调度
Scheduler有两个调度线程:regular Scheduler Thread(执行常规调度)和Misfire Scheduler Thread(执行错失的任务),Regular Thread 轮询所有Trigger,如果有将要触发的Trigger(用wait和notifyAll实现),则从任务线程池中获取一个空闲线程,然后执行与改Trigger关联的job;Misfire Thraed则是扫描所有的trigger,查看是否有错失的,如果有的话,根据一定的策略进行处理
默认是并发的,即如果当前任务没有完成,会自动开一个任务执行
注意在分布式集群的情况下,多台机子有相同的定时任务,会出错,此时通过共享数据库的方式实现
Quartz的解决方案:
quartz集群分为水平集群和垂直集群,水平集群即将定时任务节点部署在不同的服务器,其最大的问题就是时钟同步问题,若时钟不能同步,则会导致集群中各个节点状态紊乱,造成不可预知的后果;垂直集群则是集群各节点部署在同一台服务器,时钟同步自然不是问题,但存在单点故障问题,服务器宕机会严重影响服务的可用性
在各个节点会上报任务,存到数据库中,执行时会从数据库中取出触发器来执行,如果触发器的名称和执行时间相同,则只有一个节点去执行此任务。
如果此节点执行失败,则此任务则会被分派到另一节点执行,中途也会自动检查失效的定时调度,发现不成功的,其他节点立马接过来继续完成定时任务。Quartz有11个定时任务调度表
参考
Quartz原理解密
深入解读Quartz的原理
Quartz 2.2 的实现原理和运行过程
其他定时器
Timer:这是java自带的java.util.Timer类,这个类允许你调度一个java.util.TimerTask任务。使用这种方式可以让你的程序按照某一个频度执行,但不能在指定时间运行。一般用的较少。单线程,任务一多会阻塞;一个任务出异常其他任务都受影响;受系统时间影响
ScheduledExecutorService:也jdk自带的一个类;是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响。线程池+延时队列DelayedQueue(数组、最小堆, 最近要执行的任务放在堆顶) 实现,如果堆顶任务时间未到就阻塞(通过自旋+condition.await\signal实现)。不受系统时间影响
Spring 中的 @Schedule 注解
参考:Java 定时任务实现原理详解
Java优先级队列DelayedWorkQueue原理分析
CORS
浏览器的同源政策的同源指的是:协议相同、域名相同、端口相同,如果非同源,有三种行为会受到限制:Cookie、LocalStorage和IndexDB无法读取;DOM无法获得、AJAX请求不能发送。
前后端分离的场景下,由于浏览器的同源策略,导致浏览器内的请求不同的源的后端是会失败,常见的解决跨域方法是使用CORS,现在常见的web框架都支持CORS,开启即可。
解决跨域的方法除了CORS,还有jsonp,不过已经很少使用了,jsonp本质是利用浏览器允许加载不同源的js文件即标签等,将跨域请求标签里,返回一段可执行的js代码,其中包含了请求结果,通常是json格式,前端通过返回的js代码执行回调获取结果。
详情见 跨域资源共享 CORS 详解
对于跨域产生的问题,如CSRF跨域请求攻击的解决方案,可参考:美团:如何防止csrf
session和cookie
首先HTTP是无状态的,因此需要通过session、cookie来达到记录用户状态的目的。
传统的session、cookie:session存用户信息,保存在服务端中,cookie里存session对应的sessionId,保存在客户端中,用于找到对应的session,每次请求都会带上该cookie来表示此用户。
如果客户端禁用掉cookie,可以使用LocalStorage存储,每次请求都以参数的形式传递;
cookie可设置长时间保持,session有效时间通常比较短,安全性上来将,cookie存储在客户端,容易被窃取,session存储在服务段,相对安全;
由于现在实例的部署不可能只部署一个,一般都是集群部署,因此session不可以只存在一个实例的内存中,因此引入Redis来存用户的登录信息
现在一般使用 token + Redis来实现 cookie - session 机制,本质上差不多,前端的cookie更多的是存token的信息而已,token也可以存在LocalStorage或sessionStorage中,发送请求时一般是把token的值放在请求头中,而不会把cookie发给后端,这样可以避免当用户禁用cookie导致功能不可用,还有CSRF问题。
JWT
JWT = JSON WEB TOKEN
原理
JWT实际上是一个token(令牌),分为三部分:Header(头部)、Payload(负载)、Signature(签名)。
Header(头部) :两部分组成,记录令牌类型和JWT的签名算法,一般是HMACSHA256。
Payload(负载): 记录用户登录信息(官方规范默认是不加密的,分为官方字段和私有字段)。
Signature(签名) :记录将 Header、Payload和服务端的密钥组合起来,使用Header(头部)里规定的方式加密。
比如header里保存的加密方式是HMACSHA256,签名 Signature = HMACSHA256(base64URL(header) + "." + base64URL(payload) + "." + 保存在后端的密钥)
最后的JWT = base64URL(Header) + "." + base64URL(Payload) + "." + Signature
,后端收到该JWT后验证该签名是否正确,来判断JWT里的用户信息是否可靠。
base64 :64指的是A-Z,a-z,0-9,+,/,将待转换的字符串转成二进制流,每3个8位转成4个6位,6位的二进制数转成十进制,根据码表找到对应的字符,以=号做后缀,凑齐位数
一般是为了解决一些字符编码的问题,将非ASCII字符转化为ASCII字符,还有就是可以对数据做简单加密,base64URL在base64的基础上增加对一些符号的编解码,比如把"-“替换成”+",使得它可以出现在url中。
HMACSHA256 :摘要算法,一般用于验证签名是否一致
使用
可以存储在浏览器的本地缓存localStorage或者cookie中,发送请求的时候放在cookie里,或者放在请求头中
JWT的目的是让服务器不保存任何session数据,让后端变成无状态的,因此没办法主动废弃某个token,一旦签发了JWT,在到期之前就会始终有效,如果想要实现这种功能,必然需要在后端保存JWT,就违背了JWT的设计初衷了。
要让JWT实现 续签 和 主动过期功能,必定需要在后端保存JWT
jwt主动过期问题,使用黑名单即可;分成两点,客户端要求失效,服务端记录token到黑名单;用户重置密码,服务端记录uid-time键值对,在此之前的token全部失效;客户端把保存的jwt删掉是没用的,此时的jwt依然有效,只是客户端没记录而已
jwt续签问题,一种解决方式是jwt中存储过期时间,服务端设置刷新时间,请求时判断是否在过期时间或刷新时间,在刷新时间内进行token刷新,失效token记入黑名单;
而黑名单过大问题,可以采用记录UID-刷新时间方式解决,判断jwt签发时间,jwt签发时间小于UID-刷新时间的记为失效
个人认为JWT的生成方式本身是有一套规范的,在实际使用过程中也可以对他进行改动,本质上还是一个签名校验而已,一般会对JWT进行魔改,比如使用Header(头部)里的加密方式加密Signature(签名),Signature(签名)加密Header(头部) 和Payload(负载) 这两部分,服务器里的私钥解密Payload(负载),得到需要的登录信息,不通过简单的base64URL编码,不对外暴露,签名算法或者签名里的密钥的方式可以改成其他等。
JWT参考:JWT 超详细分析
CAS模型 - SSO(单点登录)
可参考:CAS实现单点登录SSO执行原理探究 ,讲得算是比较明白,这里是总结基于CAS模式改的单点登录模式
第一次访问时,由于没有访问的token,会引导至登录
第一次访问
第二次访问
这里在总结一下关于GrantTicket和ServiceTicket,跟CAS模型中提到的TGT、ST、PGT这些东西是类似的,本质是作为验证的票据,图中的GrantTicket、ServiceTicket、token含义如下
GrantTicket:全局会话票据,保存在登录页,通过GrantTicket才能换取ServiceTicket;
ServiceTicket表示访问资源的一次性票据,根据ServiceTicket换取token,换取后失效;
token:登录凭证
GT、ST和token都是保存在Redis中的,他们在Redis中的存储结构如下
key:TOKEN_${Token的值}
value:
{
"createTime": 1565961654807,
"accountId": "123",
// 用户其他信息
"grantTicket": ${GrantTicket的值} // token关联GT,用于注销时实现全局注销
}
key:GRANT_TICKET_${GrantTicket的值}
value:
{
"createTime": 1565961654807,
"accountId": "123",
}
key:SERVICE_TICKET_${ServiceTicket的值}
value:
{
"createTime": 1565961654807,
"grantTicket": ${GrantTicket的值} // ST关联GT,用于判断该ST是否有效,换取token后删除
}
// token与grantTicket的记录,注销时,根据token中关联的GT,找到所有与之关联的token,进行删除,这里推荐使用Redis的scan命令进行分段查询,原因是Redis是单线程的,如果数据量太大使用keys命令遍历太久,阻塞Redis接收其他命令
key:{grantTicket}-{token}
value:无
基于OAuth2.0的第三方登录
可参考:理解OAuth 2.0 ,这样基本就入门了,这里是总结项目中如何接入,一般在集成facebook和google的第三方登录也是类似的流程机制,这里只用到了access_token,对于refresh_token,是用来延长access_token的过期时间的,减少短时间内的重复登录,这里就没有涉及到了
基于OAuth2的第三方登录
为什么要后端要根据code + clientId + secret换成access_token,再根据access_token换用户个人信息?
为什么后端不直接code + clientId + secret换用户个人信息呢?
主要还是为了安全,防止中间人攻击
重定向的参数是带在url里的,是直接暴露在客户端的,如果直接返回access_token就不安全,因此才多了code这一层,为了降低code被拦截泄漏后的风险,code的过期时间一般都很短,且是一次性的;
另外就是后端对于外部的请求都是不信任的,因此接收到的参数(code)首先还要配合凭证去验证其合法性,对于验证通过后获得的access_token也有更多的操作空间,由后端持有,不会暴露出去
像上图那种登录方案,后端只需要用户个人信息换完token就算完事了,所以看起来好像直接使用code + clientId + secret换用户个人信息就行,但是如果此时需要再获取用户的其他信息,就没有没办法再用code去换了,只能要求用户再次登录,此时如果有access_token就显得多么重要了
压测
总结一下做过的压测,压测工具jmetter,利用jmette可以多线程并发请求和可以实时查看简易报告的能力
先对被压测服务的接口针对不同场景编写压测用例,设定好TPS的起始和目标值,作为压测计划
画压测机器部署关系图,部署压测环境
编写压测脚本,压测脚本越简单越好,尽量让压测工具不影响被压测服务,脚本最重要的几个设置 : 发起请求时的并发线程数、响应的断言、TPS数,其他那些花里胡哨的输出树状图,饼图啊那些都不用配了,用最简单的报告输出即可
部署完后,将脚本配置放到jmeter的机器上,启动压测
nohup java -jar bin/ApacheJMeter.jar -n -t jmetter脚本路径/config.jmx > test.out &
输出到当前目录下的test.out文件里,这里启动是使用默认参数启动,如果对jmetter的JVM设置有要求,也可以在启动时指定JVM参数,如
nohup java -server -XX:+HeapDumpOnOutOfMemoryError -Xms512m -Xmx512m -XX:+UseG1GC -XX:MaxGCPauseMillis=250 -XX:G1ReservePercent=20 -Djava.security.egd=file:/dev/urandom -jar bin/ApacheJMeter.jar -n -t jmetter脚本路径/config.jmx > test.out &
压测开启后可以打开test.out文件查看压测报告
一般是按照TPS从小往大压,小的TPS压,在正常延时的情况下可以先判断程序是否有问题,比如内存泄漏,内存溢出,没问题了再逐步往大了压。如果先从大往小压,延时又上不去,此时判断不了是程序内部问题还是过大的TPS导致。压测时间一般最少压一天
输出压测报告
一般有如下几个点要注意,这些点到时也要输出到压测报告上
监控点
说明
jmetter端的TPS、延时、错误率
观察TPS是否符合预期、延时是否达到预期且稳定、错误率要为0。当程序正常时降低RT的手段 :减少不必要的日志输出、业务逻辑算法是否还有优化空间,是否有IO占用或者频繁序列化反序列化、内部队列是否阻塞
被压测服务的gc
fgc,ygc不要太频繁,一般来说fgc 一小时要小于3~4次 ;ygc一分钟要小于3~4次为佳 。
jmetter端的CPU、内存使用率等
注意jmetter端的CPU是否过高或波动很大,避免影响压测结论
被压测服务端的CPU、磁盘、内存使用率等
如果cpu过高,如果连续达到90以上,基本上是内存泄漏导致了频繁的fgc;磁盘的占用情况,注意生成的日志是否把磁盘占满了
使用 jstat -gcutil [pid] [时间间隔,每几秒打印] [打印次数]
查看GC情况
当被压测端的gc不正常时,应尽量保存事发环境
1、收集内存使用基本情况统计:jmap -heap [pid] > [文件名,如heap.log]
2、收集线程堆栈运行信息:jstack [pid] > [文件名,如stack.log]
3、收集内存详细使用信息,生成dump内存快照:jmap -dump:format=b,file=[文件名,如heap.dump] [pid]
一般使用eclipse mat工具进行内存快照的分析,排查出内存泄漏的问题。
mat的使用参见:Eclipse MAT内存分析工具
一般压测脚本的模板:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version= "1.2" properties= "3.2" jmeter= "3.2 r1790748" >
<hashTree>
<TestPlan guiclass= "TestPlanGui" testclass= "TestPlan" testname= "测试计划" enabled= "true" >
<!-- 一般写压测计划中的序号+名称 -->
<stringProp name= "TestPlan.comments" ></stringProp>
<boolProp name= "TestPlan.functional_mode" > false</boolProp>
<boolProp name= "TestPlan.serialize_threadgroups" > false</boolProp>
<elementProp name= "TestPlan.user_defined_variables" elementType= "Arguments" guiclass= "ArgumentsPanel" testclass= "Arguments" testname= "用户定义的变量" enabled= "true" >
<collectionProp name= "Arguments.arguments" />
</elementProp>
<stringProp name= "TestPlan.user_define_classpath" ></stringProp>
</TestPlan>
<hashTree>
<ThreadGroup guiclass= "ThreadGroupGui" testclass= "ThreadGroup" testname= "Thread Group" enabled= "true" >
<stringProp name= "ThreadGroup.on_sample_error" > continue</stringProp>
<elementProp name= "ThreadGroup.main_controller" elementType= "LoopController" guiclass= "LoopControlPanel" testclass= "LoopController" testname= "循环控制器" enabled= "true" >
<boolProp name= "LoopController.continue_forever" > false</boolProp>
<intProp name= "LoopController.loops" > -1</intProp>
</elementProp>
<stringProp name= "ThreadGroup.num_threads" > 500</stringProp> <!-- 发起请求时的并发线程数,这里设置为500个并发线程,表示使用这么多的线程数来达到下面设置的TPS数 -->
<stringProp name= "ThreadGroup.ramp_time" > 8</stringProp>
<longProp name= "ThreadGroup.start_time" > 1509332694000</longProp>
<longProp name= "ThreadGroup.end_time" > 1509332694000</longProp>
<boolProp name= "ThreadGroup.scheduler" > false</boolProp>
<stringProp name= "ThreadGroup.duration" ></stringProp>
<stringProp name= "ThreadGroup.delay" ></stringProp>
</ThreadGroup>
<hashTree>
<HTTPSamplerProxy guiclass= "HttpTestSampleGui" testclass= "HTTPSamplerProxy" testname= "click http request" enabled= "true" >
<elementProp name= "HTTPsampler.Arguments" elementType= "Arguments" guiclass= "HTTPArgumentsPanel" testclass= "Arguments" testname= "用户定义的变量" enabled= "true" >
<collectionProp name= "Arguments.arguments" />
</elementProp>
<stringProp name= "HTTPSampler.domain" > 192.168.1.123</stringProp> <!-- 此处为被压测服务的host -->
<stringProp name= "HTTPSampler.port" > 12345</stringProp> <!-- 此处为被压测服务的port -->
<stringProp name= "HTTPSampler.protocol" > http</stringProp>
<stringProp name= "HTTPSampler.contentEncoding" ></stringProp>
<stringProp name= "HTTPSampler.path" > ${__StringFromFile(/home/urls.log,,,)}</stringProp> <!-- 发起的http请求uri从文件读取,文件路径 -->
<stringProp name= "HTTPSampler.method" > GET</stringProp>
<boolProp name= "HTTPSampler.follow_redirects" > false</boolProp>
<boolProp name= "HTTPSampler.auto_redirects" > false</boolProp>
<boolProp name= "HTTPSampler.use_keepalive" > true</boolProp>
<boolProp name= "HTTPSampler.DO_MULTIPART_POST" > false</boolProp>
<stringProp name= "HTTPSampler.embedded_url_re" ></stringProp>
<stringProp name= "HTTPSampler.implementation" > Java</stringProp>
<stringProp name= "HTTPSampler.connect_timeout" ></stringProp>
<stringProp name= "HTTPSampler.response_timeout" ></stringProp>
</HTTPSamplerProxy>
<hashTree/>
<ResponseAssertion guiclass= "AssertionGui" testclass= "ResponseAssertion" testname= "Response Assertion" enabled= "true" >
<collectionProp name= "Asserion.test_strings" >
<stringProp name= "49586" > 200</stringProp> <!-- http请求的响应断言,要求返回的http code为200才判定为成功 -->
</collectionProp>
<stringProp name= "Assertion.test_field" > Assertion.response_code</stringProp>
<boolProp name= "Assertion.assume_success" > false</boolProp>
<intProp name= "Assertion.test_type" > 8</intProp>
</ResponseAssertion>
<hashTree/>
<ConstantThroughputTimer guiclass= "TestBeanGUI" testclass= "ConstantThroughputTimer" testname= "Constant Throughput Timer" enabled= "true" >
<intProp name= "calcMode" > 1</intProp>
<doubleProp>
<name> throughput</name>
<value> 30000.0</value> <!-- 1分钟内发起的请求数,换算为tps为500 -->
<savedValue> 0.0</savedValue>
</doubleProp>
</ConstantThroughputTimer>
<hashTree/>
</hashTree>
</hashTree>
<WorkBench guiclass= "WorkBenchGui" testclass= "WorkBench" testname= "工作台" enabled= "true" >
<boolProp name= "WorkBench.save" > true</boolProp>
</WorkBench>
<hashTree/>
</hashTree>
</jmeterTestPlan>
调优
参考:https://tech.meituan.com/2016/12/02/performance-tunning.html
布隆过滤器
本质上是基于hash的概率性数据结构,是一个很长的二进制数组,主要用于判断元素可能存在集合中,或者一定不在集合中。
原理
有一个长度为m的bit数组,初始每个bit都是0,另外还有k个hash函数;
当加入一个元素时,先调用k个hash函数得到k个结果,将这k个结果与bit数组长度取模得到k个数组下标,将这k个数组下标对应的值置为 1;
查询元素时,同样经过上面步骤的计算,最终得到k个数组下标,判断这些下标对应的值是否为1,如果为1,说明元素可能存在,如果有一个不为1,说明元素一定不存在,返回结果;
误判率计算
涉及到3个重要的参数:
m
表示bit数组的长度
k
表示散列函数的个数
n
表示插入的元素个数
布隆过滤器中,一个元素插入后,某个bit为0的概率是(1 − 1/m)^k
n元素插入后,某个bit为0的概率是(1 − 1/m)^(n*k)
false positive的概率是(1−(1−1/m)^n*k)^k
因为需要的是k
个不同的bit被设置成1,概率是大约是(1−e^(−k*n/m))^k
实现
可以基于redis实现,但这里只给出go版本的实现,支持并发安全
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
const (
mod7 = 1 << 3 - 1
bitPerByte = 8
)
type Filter struct {
lock * sync . RWMutex
concurrent bool
// 长度之所以要取2的指数是因为要将取模操作优化成与操作, % 等于 &(2^n-1)
m uint64 // bit array of m bits, m will be ceiling to power of 2
n uint64 // number of inserted elements
log2m uint64 // log_2 of m
k uint64 // the number of hash function
keys [] byte // byte array to store hash value
}
func New ( size uint64 , k uint64 , race bool ) * Filter {
log2 := uint64 ( math . Ceil ( math . Log2 ( float64 ( size ))))
filter := & Filter {
m : 1 << log2 ,
log2m : log2 ,
k : k ,
keys : make ([] byte , 1 << log2 ),
concurrent : race ,
}
if filter . concurrent {
filter . lock = & sync . RWMutex {}
}
return filter
}
func ( f * Filter ) Add ( data [] byte ) * Filter {
if f . concurrent {
f . lock . Lock ()
defer f . lock . Unlock ()
}
h := baseHash ( data )
for i := uint64 ( 0 ); i < f . k ; i ++ {
loc := location ( h , i )
slot , mod := f . location ( loc )
f . keys [ slot ] |= 1 << mod
}
f . n ++
return f
}
// location returns the bit position in byte array
// & (f.m - 1) is the quick way for mod operation
func ( f * Filter ) location ( h uint64 ) ( uint64 , uint64 ) {
slot := ( h / bitPerByte ) & ( f . m - 1 )
mod := h & mod7
return slot , mod
}
// baseHash returns the murmur3 128-bit hash
func baseHash ( data [] byte ) [] uint64 {
a1 := [] byte { 1 } // to grab another bit of data
hasher := murmur3 . New128 ()
hasher . Write ( data ) // #nosec
v1 , v2 := hasher . Sum128 ()
hasher . Write ( a1 ) // #nosec
v3 , v4 := hasher . Sum128 ()
return [] uint64 {
v1 , v2 , v3 , v4 ,
}
}
淘汰算法
LRU
java实现,非线程安全
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// 注意每一次对节点有操作对需要同时操作 nodeMap和lruQueue
// LRU本质是利用 hashMap 和 双向链表 实现
public class LRUCache {
private Map < String , Node > nodeMap ;
private DoubleLinkedList lruQueue ;
private int size = 0 ;
public LRUCache () {
this ( 3 );
}
public LRUCache ( int size ) {
this . size = size ;
this . nodeMap = new HashMap <>();
this . lruQueue = new DoubleLinkedList ();
}
public String get ( String key ) {
Node n = nodeMap . get ( key );
if ( n == null ) {
return null ;
}
// 获取后直接放到到最前的位置
put ( n . key , n . value );
return n . value ;
}
public void put ( String key , String value ) {
Node newNode = new Node ( key , value );
// 如果包含,则放到最前
if ( nodeMap . containsKey ( key )) {
lruQueue . remove ( nodeMap . get ( key ));
lruQueue . addFirst ( newNode );
// 记得更新map
nodeMap . put ( key , newNode );
} else {
// 如果满了,则移除最后一个
if ( size <= lruQueue . size ()) {
Node last = lruQueue . removeLast ();
nodeMap . remove ( last . key );
}
nodeMap . put ( key , newNode );
lruQueue . addFirst ( newNode );
}
}
private class Node {
private String key ;
private String value ;
private Node pre ;
private Node next ;
public Node ( String key , String value ) {
this . key = key ;
this . value = value ;
}
}
// 封装双向链表方法,构建时要注意前后节点指向和空指针问题
private class DoubleLinkedList {
private Node head ;
private Node tail ;
private int count = 0 ;
public void addFirst ( Node n ) {
if ( head == null ) {
tail = n ;
}
count ++;
n . next = head ;
if ( head != null ) {
head . pre = n ;
}
head = n ;
}
public Node removeLast () {
if ( count == 0 ) {
return null ;
}
Node result = tail ;
if ( tail . pre != null ) {
tail . pre . next = null ;
} else {
head = null ;
tail = null ;
}
count --;
return result ;
}
public void remove ( Node n ) {
if ( count == 0 ) {
return ;
}
count --;
if ( n . pre != null ) {
n . pre . next = n . next ;
} else {
head = n . next ;
}
if ( n . next != null ) {
n . next . pre = n . pre ;
} else {
tail = n . pre ;
}
}
public int size () {
return count ;
}
}
}
golang实现,非并发安全,下面这种写法是平铺了双向链表,链表的长度通过map的长度计算得到
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
type LRUCache struct {
nodeMap map [ string ] * Node
head * Node
tail * Node
cap int
}
type Node struct {
key string
val string
next * Node
pre * Node
}
func NewLRUCache ( capacity int ) LRUCache {
return LRUCache { nodeMap : make ( map [ string ] * Node ), cap : capacity }
}
func ( lru * LRUCache ) Get ( key string ) string {
if existNode , exist := lru . nodeMap [ key ]; exist {
lru . remove ( existNode )
lru . addFirst ( existNode )
return existNode . val
}
return ""
}
func ( lru * LRUCache ) Put ( key string , val string ) {
if existNode , exist := lru . nodeMap [ key ]; exist {
existNode . val = val
lru . remove ( existNode )
lru . addFirst ( existNode )
return
} else {
newNode := & node { key : key , val : val }
lru . nodeMap [ key ] = newNode
lru . addFirst ( newNode )
}
if len ( lru . nodeMap ) > lru . cap {
// 一定要先delete,如果先remove,会导致找不回tail的key进行delete,
delete ( lru . nodeMap , lru . tail . key )
lru . remove ( lru . tail )
}
}
func ( lru * LRUCache ) addFirst ( n * Node ) {
n . pre = nil
n . next = lru . head
if lru . head != nil {
lru . head . pre = n
}
lru . head = n
if lru . tail == nil {
lru . tail = n
lru . tail . next = nil
}
}
func ( lru * LRUCache ) remove ( n * Node ) {
// n的next和pre要置为null,防止内存泄漏
if n == lru . head {
lru . head = n . next
if n . next != nil {
n . next . pre = nil
}
n . next = nil
return
}
if n == lru . tail {
lru . tail = n . pre
n . pre . next = nil
n . pre = nil
return
}
n . pre . next = n . next
n . next . pre = n . pre
}
关于并发安全,最简单的实现就是在调用Get和Put方法时加读写锁,但是这种做法锁的粒度比较大,每次会锁住整个底层的双向链表和map,导致在高并发情况下吞吐量不高,优化的思路就是对map分片,通过分片上锁来减小锁的粒度,然后再双向链表节点的操作上进行优化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func New ( capacity int ) LRUCache {
shards := make ( map [ string ] * LRUCacheShard , 256 )
for i := 0 ; i < 256 ; i ++ {
shards [ fmt . Sprintf ( "%02x" , i )] = & LRUCacheShard {
Cap : capacity ,
Keys : make ( map [ int ] * list . Element ),
List : list . New (),
}
}
return LRUCache {
shards : shards ,
}
}
func ( c * LRUCache ) Get ( key int ) int {
shard := c . GetShard ( key )
shard . RLock ()
defer shard . RUnlock ()
……
}
func ( c * LRUCache ) Put ( key int , value int ) {
shard := c . GetShard ( key )
shard . Lock ()
defer shard . Unlock ()
……
}
func ( c * LRUCache ) GetShard ( key int ) ( shard * LRUCacheShard ) {
hasher := sha1 . New ()
hasher . Write ([] byte ( key ))
shardKey := fmt . Sprintf ( "%x" , hasher . Sum ( nil ))[ 0 : 2 ]
return c . shards [ shardKey ]
}
其中,在Get方法中,如果存在,还需要修改key所在节点的位置,直接调put即可,当然这种方式的粒度还是比较大,再次优化的思路是对map的操作还是得上锁,但对双向链表的操作无需上锁,双向链表移动节点和删除节点可以同时操作,可以通过两个channel实现,参考
LFU
java实现,非线程安全
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* 频次最少使用
* 设定容量,每次get key会修改使用次数和使用时间,当满容量时,移除次数最少的那个
* 如果有多个key的使用次数一样,则移除使用时间最旧的那个
*/
public class LFUCache {
Map < String , String > keyValMap ;
Map < String , Integer > key2FreqMap ; // key和使用频率的映射
// 用LinkHashSet来模拟使用时间,使用LinkHashSet还有一个目的是便于根据key进行删除
Map < Integer , LinkedHashSet < String >> freq2KeysMap ; // 使用频率和key的映射
int minFreq ;
int cap ;
public LFUCache ( int cap ) {
keyValMap = new HashMap <>();
key2FreqMap = new HashMap <>();
freq2KeysMap = new HashMap <>();
this . cap = cap ;
this . minFreq = 0 ;
}
public String get ( String key ) {
if (! keyValMap . containsKey ( key )) {
return "" ;
}
increaseFreq ( key );
return keyValMap . get ( key );
}
public void put ( String key , String val ) {
if ( this . cap <= 0 ) {
return ;
}
if ( keyValMap . containsKey ( key )) {
keyValMap . put ( key , val );
increaseFreq ( key );
return ;
}
if ( this . cap <= keyValMap . size ()) {
removeMinFreqKey ();
}
keyValMap . put ( key , val );
key2FreqMap . put ( key , 1 );
freq2KeysMap . putIfAbsent ( 1 , new LinkedHashSet <>());
freq2KeysMap . get ( 1 ). add ( key );
this . minFreq = 1 ;
}
void increaseFreq ( String key ) {
int freq = key2FreqMap . get ( key );
key2FreqMap . put ( key , freq + 1 );
freq2KeysMap . get ( freq ). remove ( key );
freq2KeysMap . putIfAbsent ( freq + 1 , new LinkedHashSet <>());
freq2KeysMap . get ( freq + 1 ). add ( key );
if ( freq2KeysMap . get ( freq ). isEmpty ()) {
freq2KeysMap . remove ( freq );
if ( freq == this . minFreq ) {
this . minFreq ++;
}
}
}
void removeMinFreqKey () {
LinkedHashSet < String > keys = freq2KeysMap . get ( this . minFreq );
String delKey = keys . iterator (). next ();
keys . remove ( delKey );
if ( keys . isEmpty ()) {
freq2KeysMap . remove ( this . minFreq );
// 这里无需更新 minFreq 的值,因为该方法是在插入新key时使用,此时minFreq一定是1
}
keyValMap . remove ( delKey );
key2FreqMap . remove ( delKey );
}
}
golang实现,非并发安全,利用 优先级队列(最小堆) + map实现,使用的是go自带的heap数据结构,通过Item数组结构实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import "container/heap"
type LFUNode struct {
key string
val string
freq int // 用于优先级,key的使用频率
count int // 用于当freq相同时的比较淘汰,总的获取次数,类似时间戳的概念
index int // 最小堆中元素的下标,用于重建最小堆.
}
type PriorityQueue [] * LFUNode
func ( pq PriorityQueue ) Len () int { return len ( pq ) }
func ( pq PriorityQueue ) Less ( i , j int ) bool {
// 优先根据使用频率排列,相等时才使用count,从小到大排序
if pq [ i ]. freq == pq [ j ]. freq {
return pq [ i ]. count < pq [ j ]. count
}
return pq [ i ]. freq < pq [ j ]. freq
}
func ( pq PriorityQueue ) Swap ( i , j int ) {
pq [ i ], pq [ j ] = pq [ j ], pq [ i ]
pq [ i ]. index = i
pq [ j ]. index = j
}
func ( pq * PriorityQueue ) Push ( x interface {}) {
n := len ( * pq )
node := x .( * LFUNode )
node . index = n
* pq = append ( * pq , node )
}
func ( pq * PriorityQueue ) Pop () interface {} {
old := * pq
n := len ( old )
node := old [ n - 1 ]
old [ n - 1 ] = nil // 防止内存泄露
node . index = - 1 // pop时重置下标保证安全
* pq = old [ 0 : n - 1 ]
return node
}
// 更新最小堆里的元素
func ( pq * PriorityQueue ) update ( node * LFUNode , value string , frequency int , count int ) {
node . val = value
node . count = count
node . freq = frequency
heap . Fix ( pq , node . index )
}
// ==========================================
type LFUCache struct {
cap int
pq PriorityQueue
nodeMap map [ string ] * LFUNode
counter int
}
func NewLFUCache ( capacity int ) LFUCache {
return LFUCache {
pq : PriorityQueue {},
nodeMap : make ( map [ string ] * LFUNode , capacity ),
cap : capacity ,
}
}
func ( lfu * LFUCache ) Get ( key string ) string {
if lfu . cap == 0 {
return ""
}
if node , ok := lfu . nodeMap [ key ]; ok {
lfu . counter ++
lfu . pq . update ( node , node . val , node . freq + 1 , lfu . counter )
return node . val
}
return ""
}
func ( lfu * LFUCache ) Put ( key string , value string ) {
if lfu . cap == 0 {
return
}
lfu . counter ++
// 如果存在,增加 frequency,再调整堆
if node , ok := lfu . nodeMap [ key ]; ok {
lfu . pq . update ( node , value , node . freq + 1 , lfu . counter )
return
}
// 如果不存在且缓存满了,需要删除。在 hashmap 和 pq 中删除。
if len ( lfu . pq ) == lfu . cap {
node := heap . Pop ( & lfu . pq ).( * LFUNode )
delete ( lfu . nodeMap , node . key )
}
// 新建结点,在 hashmap 和 pq 中添加。
node := & LFUNode {
val : value ,
key : key ,
count : lfu . counter ,
}
heap . Push ( & lfu . pq , node )
lfu . nodeMap [ key ] = node
}
另一个版本实现,利用go本身提供的双向链表,但是由于go没泛型,在类型推断方面导致整体性能不会很高
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
type LFUCache2 struct {
nodeMap map [ string ] * list . Element
freq2NodeList map [ int ] * list . List
cap int
min int
}
type node struct {
key string
value string
freq int
}
func NewLFUCache2 ( capacity int ) LFUCache2 {
return LFUCache2 { nodeMap : make ( map [ string ] * list . Element ),
freq2NodeList : make ( map [ int ] * list . List ),
cap : capacity ,
min : 0 ,
}
}
func ( lfu * LFUCache2 ) Get ( key string ) string {
value , ok := lfu . nodeMap [ key ]
if ! ok {
return ""
}
currentNode := value . Value .( * node )
lfu . freq2NodeList [ currentNode . freq ]. Remove ( value )
currentNode . freq ++
if _ , ok := lfu . freq2NodeList [ currentNode . freq ]; ! ok {
lfu . freq2NodeList [ currentNode . freq ] = list . New ()
}
newList := lfu . freq2NodeList [ currentNode . freq ]
newNode := newList . PushFront ( currentNode )
lfu . nodeMap [ key ] = newNode
if currentNode . freq - 1 == lfu . min && lfu . freq2NodeList [ currentNode . freq - 1 ]. Len () == 0 {
lfu . min ++
}
return currentNode . value
}
func ( lfu * LFUCache2 ) Put ( key string , value string ) {
if lfu . cap == 0 {
return
}
// 如果存在,更新访问次数
if currentValue , ok := lfu . nodeMap [ key ]; ok {
currentNode := currentValue . Value .( * node )
currentNode . value = value
lfu . Get ( key )
return
}
// 如果不存在且缓存满了,需要删除
if lfu . cap == len ( lfu . nodeMap ) {
currentList := lfu . freq2NodeList [ lfu . min ]
backNode := currentList . Back ()
delete ( lfu . nodeMap , backNode . Value .( * node ). key )
currentList . Remove ( backNode )
}
// 新建结点,插入到 2 个 map 中
lfu . min = 1
currentNode := & node {
key : key ,
value : value ,
freq : 1 ,
}
if _ , ok := lfu . freq2NodeList [ 1 ]; ! ok {
lfu . freq2NodeList [ 1 ] = list . New ()
}
newList := lfu . freq2NodeList [ 1 ]
newNode := newList . PushFront ( currentNode )
lfu . nodeMap [ key ] = newNode
}
参考
布隆过滤器原理及golang实现
LRU / LFU 的青铜与王者