对于高并发我们以最常见最具代表性的商城商品详情页为例,来看看我们如何从最简单的crud进行优化的。
1.直接访问数据库
? ? ? ? 我们直接访问数据库。
public PmsProductParam getProductInfo1(Long id) {
PmsProductParam productInfo = portalProductDao.getProductInfo(id);
if (null == productInfo) {
return null;
}
FlashPromotionParam promotion = flashPromotionProductDao.getFlashPromotion(id);
if (!ObjectUtils.isEmpty(promotion)) {
productInfo.setFlashPromotionCount(promotion.getRelation().get(0).getFlashPromotionCount());
productInfo.setFlashPromotionLimit(promotion.getRelation().get(0).getFlashPromotionLimit());
productInfo.setFlashPromotionPrice(promotion.getRelation().get(0).getFlashPromotionPrice());
productInfo.setFlashPromotionRelationId(promotion.getRelation().get(0).getId());
productInfo.setFlashPromotionEndDate(promotion.getEndDate());
productInfo.setFlashPromotionStartDate(promotion.getStartDate());
productInfo.setFlashPromotionStatus(promotion.getStatus());
}
return productInfo;
}
? ? ? ? 当我们的单机访问量过大,必然会降低我们的吞吐量,延长我们的访问时间,甚至可能造成数据库连接超时的报错。
? ? ? ? 记录下我们的压测结果,并且能够看到我们存在7.16%的异常值,说明出现了数据库连接超时的报错。
2.引入redis作为缓存
? ? ? ? 直接上代码,我们先从redis中取,如果拿不到的话再去查询数据库并设置redis,拿到了就直接返回。这里需要注意一点,就是我们在设置redis时要设置超时时间,以保证数据库和redis的最终一致性,如果想要保证时时一致性的话,可以使用canal。
public PmsProductParam getProductInfo(Long id){
PmsProductParam productInfo = null;
//从redis取
productInfo = redisOpsUtil.get(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE+id,PmsProductParam.class);
if(productInfo !=null){
return productInfo;
}
if(productInfo == null){
productInfo = portalProductDao.getProductInfo(id);
//需要设置超时时间,最终一致性来保证redis和mysql之间数据的一致性
redisOpsUtil.set(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE+id,productInfo,3600,TimeUnit.SECONDS);
}
return productInfo;
}
? ? ? ? 来看看我们的压测结果,发现异常消失,吞吐量和平均值得到了大幅度提升,ok??
? ? ? ? ?no,no,no,我们来看看后台的sql打印。正常来说,我们的sql只会打印一次,然后其他的查询都会从redis中进行获取。那实际结果呢?
? ? ? ? 我们可以看到查询语句出现了两百次之多!出现了缓存击穿的问题,在高并发下有超出我们预期的请求打到了数据库!
????????分析原因:在第一个请求从redis没有获取到数据从而查询数据库并且没有设置redis的过程中,同时也有请求到达并访问了redis,这时这些请求也要访问数据库,所以我们需要对查询数据库的这段代码上锁。下面我们将用redission框架的分布式锁来改良代码。
?3.引入redis作为缓存,引入分布式锁来解决出现的并发问题(缓存击穿)
? ? ? ? 上代码
public PmsProductParam getProductInfo(Long id){
PmsProductParam productInfo = null;
productInfo = redisOpsUtil.get(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + id, PmsProductParam.class);
if(productInfo != null){
return productInfo;
}
RLock lock = redission.getLock(lockPath + id);
try {
try {
if(lock.tryLock(0,5000,TimeUnit.SECONDS)){
productInfo = portalProductDao.getProductInfo(id);
if(productInfo == null){
return null;
}
redisOpsUtil.set(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE+id,productInfo);
}else {
productInfo = redisOpsUtil.get(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + id, PmsProductParam.class);
}
}catch (InterruptedException e){
e.printStackTrace();
}
}finally {
if(lock.isLocked()){
if(lock.isHeldByCurrentThread()){
lock.unlock();
}
}
}
return productInfo;
}
? ? ? ? 看我们后台的sql打印
? ? ? ? OK,解决了我们的缓存击穿问题。
??????? 利用分布式锁可以很好的解决我们的并发问题,但是锁机制必然导致我们的吞吐量下降,这个时候可以利用我们的本地缓存来提高并发。为了保证我们的双写一致,本地缓存也需要设置一个失效时间,虽然map可以实现,但是过于复杂。我们这里使用google的guava框架来实现我们的本地缓存。
?4.引入redis缓存,设置分布式锁,引入本地缓存
public PmsProductParam getProductInfo121(Long id){
PmsProductParam productInfo = null;
productInfo = redisOpsUtil.get(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + id, PmsProductParam.class);
if(productInfo != null){
return productInfo;
}
RLock lock = redission.getLock(lockPath + id);
try {
try {
if(lock.tryLock(0,5000,TimeUnit.SECONDS)){
productInfo = portalProductDao.getProductInfo(id);
if(productInfo == null){
return null;
}
redisOpsUtil.set(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE+id,productInfo);
}else {
productInfo = redisOpsUtil.get(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + id, PmsProductParam.class);
}
}catch (InterruptedException e){
e.printStackTrace();
}
}finally {
if(lock.isLocked()){
if(lock.isHeldByCurrentThread()){
lock.unlock();
}
}
}
return productInfo;
}
?
public PmsProductParam getProductInfo4(Long id) {
PmsProductParam productInfo = null;
productInfo = cache.get(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + id);
if (null != productInfo) {
return productInfo;
}
productInfo = redisOpsUtil.get(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + id, PmsProductParam.class);
if (productInfo != null) {
log.info("get redis productId:" + productInfo);
cache.setLocalCache(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + id, productInfo);
return productInfo;
}
RLock lock = redission.getLock(lockPath + id);
try {
try {
if (lock.tryLock(0,5000,TimeUnit.SECONDS)) {
productInfo = portalProductDao.getProductInfo(id);
if (null == productInfo) {
return null;
}
log.info("set db productId:" + productInfo);
redisOpsUtil.set(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + id, productInfo, 3600, TimeUnit.SECONDS);
cache.setLocalCache(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + id, productInfo);
} else {
log.info("get redis2 productId:" + productInfo);
productInfo = redisOpsUtil.get(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + id, PmsProductParam.class);
if (productInfo != null) {
cache.setLocalCache(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + id, productInfo);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
if (lock.isLocked()) {
if (lock.isHeldByCurrentThread())
lock.unlock();
}
}
return productInfo;
}
? ? ? ? ?压测可以看到我们的吞吐量和平均值均大于我们不上锁的时候。
5.解决缓存穿透的问题
? ? ? ? 当我们遭受恶意攻击,去查询不存在的数据时,就会存在大量请求直接打到我们的数据库。这种问题叫做缓存穿透。
? ? ? ? 一般解决这种问题的办法就是使用布隆过滤器。布隆过滤器怎么实现呢?
????????我们可以在启动时查询全库并保存在redis,保存的数据是货品id经过哈希计算的偏移量,所以可以快速的进行操作。然后通过拦截器进行拦截,如果redis存在则拦截,不存在则直接打回,可以有效的避免无效请求。
|