首页
留言
导航
统计
Search
1
追番推荐!免费看动漫的网站 - 支持在线观看和磁力下载
2,508 阅读
2
推荐31个docker应用,每一个都很实用
1,311 阅读
3
PVE自动启动 虚拟机 | 容器 顺序设置及参数说明
931 阅读
4
一条命令,永久激活!Office 2024!
618 阅读
5
优选 Cloudflare 官方 / 中转 IP
489 阅读
默认分类
服务器
宝塔
VPS
Docker
OpenWRT
Nginx
群晖
前端编程
Vue
React
Angular
NodeJS
uni-app
后端编程
Java
Python
SpringBoot
SpringCloud
流程引擎
检索引擎
Linux
CentOS
Ubuntu
Debian
数据库
Redis
MySQL
Oracle
虚拟机
VMware
VirtualBox
PVE
Hyper-V
计算机
网络技术
网站源码
主题模板
登录
Search
标签搜索
Java
小程序
Redis
SpringBoot
docker
Typecho
Cloudflare
docker部署
虚拟机
WordPress
群晖
uni-app
CentOS
Vue
Java类库
Linux命令
防火墙配置
Mysql
脚本
Nginx
微醺
累计撰写
264
篇文章
累计收到
11
条评论
首页
栏目
默认分类
服务器
宝塔
VPS
Docker
OpenWRT
Nginx
群晖
前端编程
Vue
React
Angular
NodeJS
uni-app
后端编程
Java
Python
SpringBoot
SpringCloud
流程引擎
检索引擎
Linux
CentOS
Ubuntu
Debian
数据库
Redis
MySQL
Oracle
虚拟机
VMware
VirtualBox
PVE
Hyper-V
计算机
网络技术
网站源码
主题模板
页面
留言
导航
统计
搜索到
7
篇与
的结果
2023-12-27
超越Redis,新一代Redis Plus来了,性能炸裂!
线程模型链接管理锁机制Active-Replica今天给大家介绍的是 KeyDB ,KeyDB项目是从redis fork出来的分支。众所周知redis是一个单线程的kv内存存储系统,而KeyDB在100%兼容redis API的情况下将redis改造成多线程。上次也跟大家说了,项目地址是:https://github.com/EQ-Alpha/KeyDB线程模型KeyDB 将 redis 原来的主线程拆分成了 主线程 和 worker线程 。每个worker线程都是io线程,负责监听端口,accept请求,读取数据和解析协议。如图所示:KeyDB使用了SO_REUSEPORT特性,多个线程可以绑定监听同个端口。每个worker线程做了cpu绑核,读取数据也使用了SO_INCOMING_CPU特性,指定cpu接收数据。解析协议之后每个线程都会去操作内存中的数据,由一把全局锁来控制多线程访问内存数据。主线程其实也是一个worker线程,包括了worker线程的工作内容,同时也包括只有主线程才可以完成的工作内容。在worker线程数组中下标为0的就是主线程。主线程的主要工作在实现 serverCron ,包括:处理统计客户端链接管理db数据的resize和reshard处理aofreplication主备同步cluster模式下的任务链接管理在redis中所有链接管理都是在一个线程中完成的。在KeyDB的设计中,每个worker线程负责一组链接,所有的链接插入到本线程的链接列表中维护。链接的产生、工作、销毁必须在同个线程中。每个链接新增一个字段int iel; /* the event loop index we're registered with */用来表示链接属于哪个线程接管。KeyDB维护了三个关键的数据结构做链接管理:clients_pending_write:线程专属的链表,维护同步给客户链接发送数据的队列clients_pending_asyncwrite:线程专属的链表,维护异步给客户链接发送数据的队列clients_to_close:全局链表,维护需要异步关闭的客户链接分成同步和异步两个队列,是因为redis有些联动api,比如pub/sub,pub之后需要给sub的客户端发送消息,pub执行的线程和sub的客户端所在线程不是同一个线程,为了处理这种情况,KeyDB将需要给非本线程的客户端发送数据维护在异步队列中。同步发送的逻辑比较简单,都是在本线程中完成,以下图来说明如何同步给客户端发送数据:如上文所提到的,一个链接的创建、接收数据、发送数据、释放链接都必须在同个线程执行。异步发送涉及到两个线程之间的交互。KeyDB通过管道在两个线程中传递消息:int fdCmdWrite; //写管道 int fdCmdRead; //读管道本地线程需要异步发送数据时,先检查client是否属于本地线程,非本地线程获取到client专属的线程ID,之后给专属的线程管到发送AE_ASYNC_OP::CreateFileEvent的操作,要求添加写socket事件。专属线程在处理管道消息时将对应的请求添加到写事件中,如图所示:redis有些关闭客户端的请求并非完全是在链接所在的线程执行关闭,所以在这里维护了一个全局的异步关闭链表。锁机制KeyDB实现了一套类似spinlock的锁机制,称之为fastlock。fastlock的主要数据结构有:int fdCmdWrite; //写管道 int fdCmdRead; //读管道使用原子操作__atomic_load_2,__atomic_fetch_add,__atomic_compare_exchange来通过比较m_active=m_avail判断是否可以获取锁。fastlock提供了两种获取锁的方式:try_lock:一次获取失败,直接返回lock:忙等,每1024 * 1024次忙等后使用sched_yield 主动交出cpu,挪到cpu的任务末尾等待执行。在KeyDB中将try_lock和事件结合起来,来避免忙等的情况发生。每个客户端有一个专属的lock,在读取客户端数据之前会先尝试加锁,如果失败,则退出,因为数据还未读取,所以在下个epoll_wait处理事件循环中可以再次处理。Active-ReplicaKeyDB实现了多活的机制,每个replica可设置成可写非只读,replica之间互相同步数据。主要特性有:每个replica有个uuid标志,用来去除环形复制新增加rreplay API,将增量命令打包成rreplay命令,带上本地的uuidkey,value加上时间戳版本号,作为冲突校验,如果本地有相同的key且时间戳版本号大于同步过来的数据,新写入失败。采用当前时间戳向左移20位,再加上后44位自增的方式来获取key的时间戳版本号。项目地址:https://github.com/EQ-Alpha/KeyDB
2023年12月27日
56 阅读
0 评论
0 点赞
2023-10-21
JAVA实现订单 30 分钟未支付则自动取消,我有五种方案!
引言方案分析(1)数据库轮询(2)JDK的延迟队列(3)时间轮算法(4)redis缓存(5)使用消息队列总结引言在开发中,往往会遇到一些关于延时任务的需求。例如:生成订单30分钟未支付,则自动取消;生成订单60秒后,给用户发短信。对上述的任务,我们给一个专业的名字来形容,那就是 延时任务 。那么这里就会产生一个问题,这个 延时任务和定时任务 的区别究竟在哪里呢?一共有如下几点区别:定时任务有明确的触发时间,延时任务没有;定时任务有执行周期,而延时任务在某事件触发后一段时间内执行,没有执行周期;定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务。下面,我们以判断订单是否超时为例,进行方案分析。方案分析(1) 数据库轮询思路 该方案通常是在小型项目中使用,即通过一个线程定时的去扫描数据库,通过订单时间来判断是否有超时的订单,然后进行update或delete等操作。实现 博主当年早期是用 quartz 来实现的(实习那会的事),简单介绍一下 maven 项目引入一个依赖如下所示<dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.2.2</version> </dependency>调用Demo类MyJob如下所示package com.rjzheng.delay1; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.SchedulerFactory; import org.quartz.SimpleScheduleBuilder; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.impl.StdSchedulerFactory; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; public class MyJob implements Job { public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println("要去数据库扫描啦。。。"); } public static void main(String[] args) throws Exception { // 创建任务 JobDetail jobDetail = JobBuilder.newJob(MyJob.class) .withIdentity("job1", "group1").build(); // 创建触发器 每3秒钟执行一次 Trigger trigger = TriggerBuilder .newTrigger() .withIdentity("trigger1", "group3") .withSchedule( SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(3).repeatForever()) .build(); Scheduler scheduler = new StdSchedulerFactory().getScheduler(); // 将任务及其触发器放入调度器 scheduler.scheduleJob(jobDetail, trigger); // 调度器开始调度任务 scheduler.start(); } }运行代码,可发现每隔3秒,输出如下要去数据库扫描啦。。。优缺点 优点: 简单易行,支持集群操作。缺点:(1)对服务器内存消耗大;(2)存在延迟,比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟;(3)假设你的订单有几千万条,每隔几分钟这样扫描一次,数据库损耗极大。(2) JDK的延迟队列思路 该方案是利用 JDK 自带的 DelayQueue 来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,是必须实现Delayed接口的。DelayedQueue实现工作流程如下图所示其中poll(): 获取并移除队列的超时元素,没有则返回空;take(): 获取并移除队列的超时元素,如果没有则wait当前线程,直到有元素满足超时条件,返回结果。实现定义一个类OrderDelay实现Delayed,代码如下:package com.rjzheng.delay2; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class OrderDelay implements Delayed { private String orderId; private long timeout; OrderDelay(String orderId, long timeout) { this.orderId = orderId; this.timeout = timeout + System.nanoTime(); } public int compareTo(Delayed other) { if (other == this) return 0; OrderDelay t = (OrderDelay) other; long d = (getDelay(TimeUnit.NANOSECONDS) - t .getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } // 返回距离你自定义的超时时间还有多少 public long getDelay(TimeUnit unit) { return unit.convert(timeout - System.nanoTime(), TimeUnit.NANOSECONDS); } void print() { System.out.println(orderId+"编号的订单要删除啦。。。。"); } }运行的测试Demo为,我们设定延迟时间为3秒。package com.rjzheng.delay2; import java.util.ArrayList; import java.util.List; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; public class DelayQueueDemo { public static void main(String[] args) { // TODO Auto-generated method stub List<String> list = new ArrayList<String>(); list.add("00000001"); list.add("00000002"); list.add("00000003"); list.add("00000004"); list.add("00000005"); DelayQueue<OrderDelay> queue = new DelayQueue<OrderDelay>(); long start = System.currentTimeMillis(); for(int i = 0;i<5;i++){ //延迟三秒取出 queue.put(new OrderDelay(list.get(i), TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS))); try { queue.take().print(); System.out.println("After " + (System.currentTimeMillis()-start) + " MilliSeconds"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }输出如下:00000001编号的订单要删除啦。。。。 After 3003 MilliSeconds 00000002编号的订单要删除啦。。。。 After 6006 MilliSeconds 00000003编号的订单要删除啦。。。。 After 9006 MilliSeconds 00000004编号的订单要删除啦。。。。 After 12008 MilliSeconds 00000005编号的订单要删除啦。。。。 After 15009 MilliSeconds可以看到都是延迟3秒,订单被删除。优缺点 优点: 效率高,任务触发时间延迟低。缺点:(1) 服务器重启后,数据全部消失,怕宕机;(2) 集群扩展相当麻烦;(3) 因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常;(4) 代码复杂度较高。(3) 时间轮算法思路 先上一张时间轮的图(这图到处都是啦)时间轮算法可以类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位),例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈。位置是在2圈之后的5上面(20 % 8 + 1)。实现 我们用Netty的HashedWheelTimer来实现 给Pom加上下面的依赖:<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.24.Final</version> </dependency>测试代码HashedWheelTimerTest,如下所示:package com.rjzheng.delay3; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; import java.util.concurrent.TimeUnit; public class HashedWheelTimerTest { static class MyTimerTask implements TimerTask{ boolean flag; public MyTimerTask(boolean flag){ this.flag = flag; } public void run(Timeout timeout) throws Exception { // TODO Auto-generated method stub System.out.println("要去数据库删除订单了。。。。"); this.flag =false; } } public static void main(String[] argv) { MyTimerTask timerTask = new MyTimerTask(true); Timer timer = new HashedWheelTimer(); timer.newTimeout(timerTask, 5, TimeUnit.SECONDS); int i = 1; while(timerTask.flag){ try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(i+"秒过去了"); i++; } } }输出如下:1秒过去了 2秒过去了 3秒过去了 4秒过去了 5秒过去了 要去数据库删除订单了。。。。 6秒过去了优缺点 优点: 效率高,任务触发时间延迟时间比delayQueue低,代码复杂度比delayQueue低。缺点:(1) 服务器重启后,数据全部消失,怕宕机;(2) 集群扩展相当麻烦;(3) 因为内存条件限制的原因,比如下单未付款的订单数太多,那么很容易就出现OOM异常。(4) redis缓存思路一 利用 redis 的 zset ,zset是一个有序集合,每一个元素(member)都关联了一个score,通过score排序来取集合中的值。zset常用命令添加元素:ZADD key score member [[score member] [score member] ...]按顺序查询元素:ZRANGE key start stop [WITHSCORES]查询元素score:ZSCORE key member移除元素:ZREM key member [member ...]测试如下: # 添加单个元素 redis> ZADD page_rank 10 google.com (integer) 1 # 添加多个元素 redis> ZADD page_rank 9 baidu.com 8 bing.com (integer) 2 redis> ZRANGE page_rank 0 -1 WITHSCORES 1) "bing.com" 2) "8" 3) "baidu.com" 4) "9" 5) "google.com" 6) "10" # 查询元素的score值 redis> ZSCORE page_rank bing.com "8" # 移除单个元素 redis> ZREM page_rank google.com (integer) 1 redis> ZRANGE page_rank 0 -1 WITHSCORES 1) "bing.com" 2) "8" 3) "baidu.com" 4) "9"那么如何实现呢?我们将订单超时时间戳与订单号分别设置为 score 和 member ,系统扫描第一个元素判断是否超时,具体如下图所示:实现一package com.rjzheng.delay4; import java.util.Calendar; import java.util.Set; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Tuple; public class AppTest { private static final String ADDR = "127.0.0.1"; private static final int PORT = 6379; private static JedisPool jedisPool = new JedisPool(ADDR, PORT); public static Jedis getJedis() { return jedisPool.getResource(); } //生产者,生成5个订单放进去 public void productionDelayMessage(){ for(int i=0;i<5;i++){ //延迟3秒 Calendar cal1 = Calendar.getInstance(); cal1.add(Calendar.SECOND, 3); int second3later = (int) (cal1.getTimeInMillis() / 1000); AppTest.getJedis().zadd("OrderId", second3later,"OID0000001"+i); System.out.println(System.currentTimeMillis()+"ms:redis生成了一个订单任务:订单ID为"+"OID0000001"+i); } } //消费者,取订单 public void consumerDelayMessage(){ Jedis jedis = AppTest.getJedis(); while(true){ Set<Tuple> items = jedis.zrangeWithScores("OrderId", 0, 1); if(items == null || items.isEmpty()){ System.out.println("当前没有等待的任务"); try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } continue; } int score = (int) ((Tuple)items.toArray()[0]).getScore(); Calendar cal = Calendar.getInstance(); int nowSecond = (int) (cal.getTimeInMillis() / 1000); if(nowSecond >= score){ String orderId = ((Tuple)items.toArray()[0]).getElement(); jedis.zrem("OrderId", orderId); System.out.println(System.currentTimeMillis() +"ms:redis消费了一个任务:消费的订单OrderId为"+orderId); } } } public static void main(String[] args) { AppTest appTest =new AppTest(); appTest.productionDelayMessage(); appTest.consumerDelayMessage(); } }此时对应输出如下:1525086085261ms:redis生成了一个订单任务:订单ID为OID00000010 1525086085263ms:redis生成了一个订单任务:订单ID为OID00000011 1525086085266ms:redis生成了一个订单任务:订单ID为OID00000012 1525086085268ms:redis生成了一个订单任务:订单ID为OID00000013 1525086085270ms:redis生成了一个订单任务:订单ID为OID00000014 1525086088000ms:redis消费了一个任务:消费的订单OrderId为OID00000010 1525086088001ms:redis消费了一个任务:消费的订单OrderId为OID00000011 1525086088002ms:redis消费了一个任务:消费的订单OrderId为OID00000012 1525086088003ms:redis消费了一个任务:消费的订单OrderId为OID00000013 1525086088004ms:redis消费了一个任务:消费的订单OrderId为OID00000014 当前没有等待的任务 当前没有等待的任务 当前没有等待的任务可以看到,几乎都是3秒之后,消费订单。然而,这一版存在一个致命的硬伤,在高并发条件下,多消费者会取到同一个订单号,我们上测试代码 ThreadTest 。package com.rjzheng.delay4; import java.util.concurrent.CountDownLatch; public class ThreadTest { private static final int threadNum = 10; private static CountDownLatch cdl = new CountDownLatch(threadNum); static class DelayMessage implements Runnable{ public void run() { try { cdl.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } AppTest appTest =new AppTest(); appTest.consumerDelayMessage(); } } public static void main(String[] args) { AppTest appTest =new AppTest(); appTest.productionDelayMessage(); for(int i=0;i<threadNum;i++){ new Thread(new DelayMessage()).start(); cdl.countDown(); } } }输出如下所示:1525087157727ms:redis生成了一个订单任务:订单ID为OID00000010 1525087157734ms:redis生成了一个订单任务:订单ID为OID00000011 1525087157738ms:redis生成了一个订单任务:订单ID为OID00000012 1525087157747ms:redis生成了一个订单任务:订单ID为OID00000013 1525087157753ms:redis生成了一个订单任务:订单ID为OID00000014 1525087160009ms:redis消费了一个任务:消费的订单OrderId为OID00000010 1525087160011ms:redis消费了一个任务:消费的订单OrderId为OID00000010 1525087160012ms:redis消费了一个任务:消费的订单OrderId为OID00000010 1525087160022ms:redis消费了一个任务:消费的订单OrderId为OID00000011 1525087160023ms:redis消费了一个任务:消费的订单OrderId为OID00000011 1525087160029ms:redis消费了一个任务:消费的订单OrderId为OID00000011 1525087160038ms:redis消费了一个任务:消费的订单OrderId为OID00000012 1525087160045ms:redis消费了一个任务:消费的订单OrderId为OID00000012 1525087160048ms:redis消费了一个任务:消费的订单OrderId为OID00000012 1525087160053ms:redis消费了一个任务:消费的订单OrderId为OID00000013 1525087160064ms:redis消费了一个任务:消费的订单OrderId为OID00000013 1525087160065ms:redis消费了一个任务:消费的订单OrderId为OID00000014 1525087160069ms:redis消费了一个任务:消费的订单OrderId为OID00000014 当前没有等待的任务 当前没有等待的任务 当前没有等待的任务 当前没有等待的任务显然,出现了多个线程消费同一个资源的情况。解决方案(1) 用分布式锁,但是用分布式锁,性能下降了,该方案不细说;(2) 对ZREM的返回值进行判断,只有大于0的时候,才消费数据,于是将 consumerDelayMessage() 方法里的。if(nowSecond >= score){ String orderId = ((Tuple)items.toArray()[0]).getElement(); jedis.zrem("OrderId", orderId); System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId); }修改为:if(nowSecond >= score){ String orderId = ((Tuple)items.toArray()[0]).getElement(); Long num = jedis.zrem("OrderId", orderId); if( num != null && num>0){ System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId); } }在这种修改后,重新运行ThreadTest类,发现输出正常了。思路二 该方案使用 redis 的 Keyspace Notifications ,中文翻译就是键空间机制,就是利用该机制可以在key失效之后,提供一个回调,实际上是redis会给客户端发送一个消息。是需要 redis版本2.8以上 。实现二 在 redis.conf 中,加入一条配置:notify-keyspace-events Ex运行代码如下:package com.rjzheng.delay5; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; public class RedisTest { private static final String ADDR = "127.0.0.1"; private static final int PORT = 6379; private static JedisPool jedis = new JedisPool(ADDR, PORT); private static RedisSub sub = new RedisSub(); public static void init() { new Thread(new Runnable() { public void run() { jedis.getResource().subscribe(sub, "__keyevent@0__:expired"); } }).start(); } public static void main(String[] args) throws InterruptedException { init(); for(int i =0;i<10;i++){ String orderId = "OID000000"+i; jedis.getResource().setex(orderId, 3, orderId); System.out.println(System.currentTimeMillis()+"ms:"+orderId+"订单生成"); } } static class RedisSub extends JedisPubSub { @Override public void onMessage(String channel, String message) { System.out.println(System.currentTimeMillis()+"ms:"+message+"订单取消"); } } }输出如下:1525096202813ms:OID0000000订单生成 1525096202818ms:OID0000001订单生成 1525096202824ms:OID0000002订单生成 1525096202826ms:OID0000003订单生成 1525096202830ms:OID0000004订单生成 1525096202834ms:OID0000005订单生成 1525096202839ms:OID0000006订单生成 1525096205819ms:OID0000000订单取消 1525096205920ms:OID0000005订单取消 1525096205920ms:OID0000004订单取消 1525096205920ms:OID0000001订单取消 1525096205920ms:OID0000003订单取消 1525096205920ms:OID0000006订单取消 1525096205920ms:OID0000002订单取消可以明显看到3秒过后,订单取消了。ps: redis的pub/sub 机制存在一个硬伤,官网内容如下:原: Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.翻: Redis的发布/订阅目前是即发即弃(fire and forget)模式的,因此无法实现事件的可靠通知。也就是说,如果发布/订阅的客户端断链之后又重连,则在客户端断链期间的所有事件都丢失了。因此,方案二不是太推荐。当然,如果你对可靠性要求不高,可以使用。优缺点 优点:(1) 由于使用Redis作为消息通道,消息都存储在Redis中。如果发送程序或者任务处理程序挂了,重启之后,还有重新处理数据的可能性;(2) 做集群扩展相当方便;(3) 时间准确度高。缺点: (1) 需要额外进行redis维护。(5) 使用消息队列我们可以采用 RabbitMQ 的延时队列。RabbitMQ 具有以下两个特性,可以实现延迟队列:(1)RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter(2)lRabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了deadletter,则按照这两个参数重新路由。结合以上两个特性,就可以模拟出延迟消息的功能,具体的,我改天再写一篇文章,这里再讲下去,篇幅太长。优缺点优点: 高效,可以利用rabbitmq的分布式特性轻易的进行横向扩展,消息支持持久化增加了可靠性。缺点:本身的易用度要依赖于rabbitMq的运维。因为要引用rabbitMq,所以复杂度和成本变高。总结本文总结了目前互联网中,绝大部分的延时任务的实现方案。希望大家在工作中能够有所收获。
2023年10月21日
23 阅读
0 评论
0 点赞
2023-09-25
《Redis学习笔记》
史上最全的 Redis 知识总结!Redis作为稳居世界排名第一的KV内存数据库,同时也是最受欢迎的分布式缓存中间件,是应对高并发,大流量,低延迟业务场景的不二选择。面试必问!今天给大家分享的是某大厂的一位大佬整理的 《Redis学习笔记》 ,图文并茂,特别详细,基本上涵盖了你需要的所有 Redis 所有知识点。建议大家至少看 3 遍。学习资料Redis学习笔记.pdf
2023年09月25日
23 阅读
0 评论
0 点赞
2023-07-24
SpringBoot 项目使用 Redis 对用户 IP 进行接口限流
一、思路使用接口限流的主要目的在于提高系统的稳定性,防止接口被恶意打击(短时间内大量请求)。比如要求某接口在1分钟内请求次数不超过1000次,那么应该如何设计代码呢?下面讲两种思路,如果想看代码可直接翻到后面的代码部分。1.1 固定时间段(旧思路)1.1.1 思路描述 该方案的思路是:使用Redis记录固定时间段内某用户IP访问某接口的次数,其中:Redis的key:用户IP + 接口方法名Redis的value:当前接口访问次数。当用户在近期内第一次访问该接口时,向Redis中设置一个包含了用户IP和接口方法名的key,value的值初始化为1(表示第一次访问当前接口)。同时,设置该key的过期时间(比如为60秒)。之后,只要这个key还未过期,用户每次访问该接口都会导致value自增1次。用户每次访问接口前,先从Redis中拿到当前接口访问次数,如果发现访问次数大于规定的次数(如超过1000次),则向用户返回接口访问失败的标识。1.1.2 思路缺陷 该方案的缺点在于,限流时间段是固定的。比如要求某接口在1分钟内请求次数不超过1000次,观察以下流程:可以发现,00:59和01:01之间仅仅间隔了2秒,但接口却被访问了1000+999=1999次,是限流次数(1000次)的2倍!所以在该方案中,限流次数的设置可能不起作用,仍然可能在短时间内造成大量访问。1.2 滑动窗口(新思路)1.2.1 思路描述 为了避免出现方案1中由于键过期导致的短期访问量增大的情况,我们可以改变一下思路,也就是把固定的时间段改成动态的:假设某个接口在10秒内只允许访问5次。用户每次访问接口时,记录当前用户访问的时间点(时间戳),并计算前10秒内用户访问该接口的总次数。如果总次数大于限流次数,则不允许用户访问该接口。这样就能保证在任意时刻用户的访问次数不会超过1000次。如下图,假设用户在0:19时间点访问接口,经检查其前10秒内访问次数为5次,则允许本次访问。假设用户0:20时间点访问接口,经检查其前10秒内访问次数为6次(超出限流次数5次),则不允许本次访问。1.2.2 Redis部分的实现1)选用何种 Redis 数据结构首先是需要确定使用哪个Redis数据结构。用户每次访问时,需要用一个key记录用户访问的时间点,而且还需要利用这些时间点进行范围检查。为何选择 zSet 数据结构为了能够实现范围检查,可以考虑使用Redis中的zSet有序集合。添加一个zSet元素的命令如下:ZADD [key] [score] [member]它有一个关键的属性score,通过它可以记录当前member的优先级。于是我们可以把score设置成用户访问接口的时间戳,以便于通过score进行范围检查。key则记录用户IP和接口方法名,至于member设置成什么没有影响,一个member记录了用户访问接口的时间点。因此member也可以设置成时间戳。3)zSet 如何进行范围检查(检查前几秒的访问次数)思路是,把特定时间间隔之前的member都删掉,留下的member就是时间间隔之内的总访问次数。然后统计当前key中的member有多少个即可。① 把特定时间间隔之前的member都删掉。zSet有如下命令,用于删除score范围在[min~max]之间的member:Zremrangebyscore [key] [min] [max]假设限流时间设置为5秒,当前用户访问接口时,获取当前系统时间戳为currentTimeMill,那么删除的score范围可以设置为:min = 0 max = currentTimeMill - 5 * 1000相当于把5秒之前的所有member都删除了,只留下前5秒内的key。② 统计特定key中已存在的member有多少个。zSet有如下命令,用于统计某个key的member总数: ZCARD [key]统计的key的member总数,就是当前接口已经访问的次数。如果该数目大于限流次数,则说明当前的访问应被限流。二、代码实现主要是使用注解 + AOP的形式实现。2.1 固定时间段思路使用了lua脚本。参考:https://blog.csdn.net/qq_43641418/article/details/1277644622.1.1 限流注解@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface RateLimiter { /** * 限流时间,单位秒 */ int time() default 5; /** * 限流次数 */ int count() default 10; }2.1.2 定义lua脚本 在 resources/lua 下新建 limit.lua :-- 获取redis键 local key = KEYS[1] -- 获取第一个参数(次数) local count = tonumber(ARGV[1]) -- 获取第二个参数(时间) local time = tonumber(ARGV[2]) -- 获取当前流量 local current = redis.call('get', key); -- 如果current值存在,且值大于规定的次数,则拒绝放行(直接返回当前流量) if current and tonumber(current) > count then return tonumber(current) end -- 如果值小于规定次数,或值不存在,则允许放行,当前流量数+1 (值不存在情况下,可以自增变为1) current = redis.call('incr', key); -- 如果是第一次进来,那么开始设置键的过期时间。 if tonumber(current) == 1 then redis.call('expire', key, time); end -- 返回当前流量 return tonumber(current)2.1.3 注入Lua执行脚本 关键代码是 limitScript() 方法@Configuration public class RedisConfig { @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(connectionFactory); // 使用Jackson2JsonRedisSerialize 替换默认序列化(默认采用的是JDK序列化) Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); redisTemplate.setKeySerializer(jackson2JsonRedisSerializer); redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); return redisTemplate; } /** * 解析lua脚本的bean */ @Bean("limitScript") public DefaultRedisScript<Long> limitScript() { DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/limit.lua"))); redisScript.setResultType(Long.class); return redisScript; } }2.1.3 定义Aop切面类@Slf4j @Aspect @Component public class RateLimiterAspect { @Autowired private RedisTemplate redisTemplate; @Autowired private RedisScript<Long> limitScript; @Before("@annotation(rateLimiter)") public void doBefore(JoinPoint point, RateLimiter rateLimiter) throws Throwable { int time = rateLimiter.time(); int count = rateLimiter.count(); String combineKey = getCombineKey(rateLimiter.type(), point); List<String> keys = Collections.singletonList(combineKey); try { Long number = (Long) redisTemplate.execute(limitScript, keys, count, time); // 当前流量number已超过限制,则抛出异常 if (number == null || number.intValue() > count) { throw new RuntimeException("访问过于频繁,请稍后再试"); } log.info("[limit] 限制请求数'{}',当前请求数'{}',缓存key'{}'", count, number.intValue(), combineKey); } catch (Exception ex) { ex.printStackTrace(); throw new RuntimeException("服务器限流异常,请稍候再试"); } } /** * 把用户IP和接口方法名拼接成 redis 的 key * @param point 切入点 * @return 组合key */ private String getCombineKey(JoinPoint point) { StringBuilder sb = new StringBuilder("rate_limit:"); ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = attributes.getRequest(); sb.append( Utils.getIpAddress(request) ); MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); Class<?> targetClass = method.getDeclaringClass(); // keyPrefix + "-" + class + "-" + method return sb.append("-").append( targetClass.getName() ) .append("-").append(method.getName()).toString(); } }2.2 滑动窗口思路2.2.1 限流注解@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface RateLimiter { /** * 限流时间,单位秒 */ int time() default 5; /** * 限流次数 */ int count() default 10; }2.2.2 定义Aop切面类@Slf4j @Aspect @Component public class RateLimiterAspect { @Autowired private RedisTemplate redisTemplate; /** * 实现限流(新思路) * @param point * @param rateLimiter * @throws Throwable */ @SuppressWarnings("unchecked") @Before("@annotation(rateLimiter)") public void doBefore(JoinPoint point, RateLimiter rateLimiter) throws Throwable { // 在 {time} 秒内仅允许访问 {count} 次。 int time = rateLimiter.time(); int count = rateLimiter.count(); // 根据用户IP(可选)和接口方法,构造key String combineKey = getCombineKey(rateLimiter.type(), point); // 限流逻辑实现 ZSetOperations zSetOperations = redisTemplate.opsForZSet(); // 记录本次访问的时间结点 long currentMs = System.currentTimeMillis(); zSetOperations.add(combineKey, currentMs, currentMs); // 这一步是为了防止member一直存在于内存中 redisTemplate.expire(combineKey, time, TimeUnit.SECONDS); // 移除{time}秒之前的访问记录(滑动窗口思想) zSetOperations.removeRangeByScore(combineKey, 0, currentMs - time * 1000); // 获得当前窗口内的访问记录数 Long currCount = zSetOperations.zCard(combineKey); // 限流判断 if (currCount > count) { log.error("[limit] 限制请求数'{}',当前请求数'{}',缓存key'{}'", count, currCount, combineKey); throw new RuntimeException("访问过于频繁,请稍后再试!"); } } /** * 把用户IP和接口方法名拼接成 redis 的 key * @param point 切入点 * @return 组合key */ private String getCombineKey(JoinPoint point) { StringBuilder sb = new StringBuilder("rate_limit:"); ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = attributes.getRequest(); sb.append( Utils.getIpAddress(request) ); MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); Class<?> targetClass = method.getDeclaringClass(); // keyPrefix + "-" + class + "-" + method return sb.append("-").append( targetClass.getName() ) .append("-").append(method.getName()).toString(); } }
2023年07月24日
44 阅读
0 评论
0 点赞
2023-03-11
Spring Boot + Redis 解决重复提交问题,一定用的到
前言在实际的开发项目中,一个对外暴露的接口往往会面临很多次请求,我们来解释一下幂等的概念:任意多次执行所产生的影响均与一次执行的影响相同。按照这个含义,最终的含义就是 对数据库的影响只能是一次性的,不能重复处理。如何保证其幂等性,通常有以下手段:数据库建立唯一性索引,可以保证最终插入数据库的只有一条数据token机制,每次接口请求前先获取一个token,然后再下次请求的时候在请求的header体中加上这个token,后台进行验证,如果验证通过删除token,下次请求再次判断token悲观锁或者乐观锁,悲观锁可以保证每次for update的时候其他sql无法update数据(在数据库引擎是innodb的时候,select的条件必须是唯一索引,防止锁全表)先查询后判断,首先通过查询数据库是否存在数据,如果存在证明已经请求过了,直接拒绝该请求,如果没有存在,就证明是第一次进来,直接放行。Redis实现自动幂等的原理图:搭建Redis的服务Api1、首先是搭建 Redis 服务器。2、引入 springboot 中到的 redis 的 stater ,或者 Spring 封装的 jedis 也可以,后面主要用到的 api 就是它的 set 方法和 exists 方法,这里我们使用 springboot 的封装好的 redisTemplate/** * redis工具类 */ @Component public class RedisService { @Autowired private RedisTemplate redisTemplate; /** * 写入缓存 * @param key * @param value * @return */ public boolean set(final String key, Object value) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); operations.set(key, value); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 写入缓存设置时效时间 * @param key * @param value * @return */ public boolean setEx(final String key, Object value, Long expireTime) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); operations.set(key, value); redisTemplate.expire(key, expireTime, TimeUnit.SECONDS); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 判断缓存中是否有对应的value * @param key * @return */ public boolean exists(final String key) { return redisTemplate.hasKey(key); } /** * 读取缓存 * @param key * @return */ public Object get(final String key) { Object result = null; ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); result = operations.get(key); return result; } /** * 删除对应的value * @param key */ public boolean remove(final String key) { if (exists(key)) { Boolean delete = redisTemplate.delete(key); return delete; } return false; } }自定义注解AutoIdempotent自定义一个注解,定义此注解的主要目的是把它添加在需要实现幂等的方法上,凡是某个方法注解了它,都会实现自动幂等。后台利用反射如果扫描到这个注解,就会处理这个方法实现自动幂等,使用元注解 ElementType.METHOD 表示它只能放在方法上, etentionPolicy.RUNTIME 表示它在运行时@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface AutoIdempotent { }Token创建和检验1、Token服务接口 我们新建一个接口,创建 token 服务,里面主要是两个方法,一个用来创建 token ,一个用来验证 token 。创建 token 主要产生的是一个字符串,检验 token 的话主要是传达 request 对象,为什么要传 request 对象呢?主要作用就是获取 header 里面的 token ,然后检验,通过抛出的 Exception 来获取具体的报错信息返回给前端public interface TokenService { /** * 创建token * @return */ public String createToken(); /** * 检验token * @param request * @return */ public boolean checkToken(HttpServletRequest request) throws Exception; }2、Token的服务实现类 token 引用了 redis 服务,创建 token 采用随机算法工具类生成随机 uuid 字符串,然后放入到 redis 中(为了防止数据的冗余保留,这里设置过期时间为10000秒,具体可视业务而定),如果放入成功,最后返回这个 token 值。 checkToken 方法就是从 header 中获取 token 到值(如果 header 中拿不到,就从 paramter 中获取),如若不存在,直接抛出异常。这个异常信息可以被拦截器捕捉到,然后返回给前端。@Service public class TokenServiceImpl implements TokenService { @Autowired private RedisService redisService; /** * 创建token * * @return */ @Override public String createToken() { String str = RandomUtil.randomUUID(); StrBuilder token = new StrBuilder(); try { token.append(Constant.Redis.TOKEN_PREFIX).append(str); redisService.setEx(token.toString(), token.toString(),10000L); boolean notEmpty = StrUtil.isNotEmpty(token.toString()); if (notEmpty) { return token.toString(); } }catch (Exception ex){ ex.printStackTrace(); } return null; } /** * 检验token * * @param request * @return */ @Override public boolean checkToken(HttpServletRequest request) throws Exception { String token = request.getHeader(Constant.TOKEN_NAME); if (StrUtil.isBlank(token)) {// header中不存在token token = request.getParameter(Constant.TOKEN_NAME); if (StrUtil.isBlank(token)) {// parameter中也不存在token throw new ServiceException(Constant.ResponseCode.ILLEGAL_ARGUMENT, 100); } } if (!redisService.exists(token)) { throw new ServiceException(Constant.ResponseCode.REPETITIVE_OPERATION, 200); } boolean remove = redisService.remove(token); if (!remove) { throw new ServiceException(Constant.ResponseCode.REPETITIVE_OPERATION, 200); } return true; } }拦截器的配置1、Web配置类 实现WebMvcConfigurerAdapter,主要作用就是添加autoIdempotentInterceptor到配置类中,这样我们到拦截器才能生效,注意使用@Configuration注解,这样在容器启动是时候就可以添加进入context中@Configuration public class WebConfiguration extends WebMvcConfigurerAdapter { @Resource private AutoIdempotentInterceptor autoIdempotentInterceptor; /** * 添加拦截器 * @param registry */ @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(autoIdempotentInterceptor); super.addInterceptors(registry); } }2、拦截处理器 主要的功能是拦截扫描到 AutoIdempotent 到注解到方法,然后调用 tokenService 的 checkToken() 方法校验token是否正确,如果捕捉到异常就将异常信息渲染成json返回给前端/** * 拦截器 */ @Component public class AutoIdempotentInterceptor implements HandlerInterceptor { @Autowired private TokenService tokenService; /** * 预处理 * * @param request * @param response * @param handler * @return * @throws Exception */ @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (!(handler instanceof HandlerMethod)) { return true; } HandlerMethod handlerMethod = (HandlerMethod) handler; Method method = handlerMethod.getMethod(); //被ApiIdempotment标记的扫描 AutoIdempotent methodAnnotation = method.getAnnotation(AutoIdempotent.class); if (methodAnnotation != null) { try { return tokenService.checkToken(request);// 幂等性校验, 校验通过则放行, 校验失败则抛出异常, 并通过统一异常处理返回友好提示 }catch (Exception ex){ ResultVo failedResult = ResultVo.getFailedResult(101, ex.getMessage()); writeReturnJson(response, JSONUtil.toJsonStr(failedResult)); throw ex; } } //必须返回true,否则会被拦截一切请求 return true; } @Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception { } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { } /** * 返回的json值 * @param response * @param json * @throws Exception */ private void writeReturnJson(HttpServletResponse response, String json) throws Exception{ PrintWriter writer = null; response.setCharacterEncoding("UTF-8"); response.setContentType("text/html; charset=utf-8"); try { writer = response.getWriter(); writer.print(json); } catch (IOException e) { } finally { if (writer != null) writer.close(); } } }测试用例1、模拟业务请求类 首先我们需要通过 /get/token 路径通过 getToken() 方法去获取具体的 token ,然后我们调用 testIdempotence 方法,这个方法上面注解了 @AutoIdempotent ,拦截器会拦截所有的请求,当判断到处理的方法上面有该注解的时候,就会调用 TokenService 中的 checkToken() 方法,如果捕获到异常会将异常抛出调用者,下面我们来模拟请求一下:@RestController public class BusinessController { @Resource private TokenService tokenService; @Resource private TestService testService; @PostMapping("/get/token") public String getToken(){ String token = tokenService.createToken(); if (StrUtil.isNotEmpty(token)) { ResultVo resultVo = new ResultVo(); resultVo.setCode(Constant.code_success); resultVo.setMessage(Constant.SUCCESS); resultVo.setData(token); return JSONUtil.toJsonStr(resultVo); } return StrUtil.EMPTY; } @AutoIdempotent @PostMapping("/test/Idempotence") public String testIdempotence() { String businessResult = testService.testIdempotence(); if (StrUtil.isNotEmpty(businessResult)) { ResultVo successResult = ResultVo.getSuccessResult(businessResult); return JSONUtil.toJsonStr(successResult); } return StrUtil.EMPTY; } }2、使用postman请求 首先访问get/token路径获取到具体到token:利用获取到到token,然后放到具体请求到header中,可以看到第一次请求成功,接着我们请求第二次:第二次请求,返回到是重复性操作,可见重复性验证通过,再多次请求到时候我们只让其第一次成功,第二次就是失败:总结本文介绍了使用 springboot 和 拦截器 、 redis 来优雅的实现接口幂等,对于幂等在实际的开发过程中是十分重要的,因为一个接口可能会被无数的客户端调用,如何保证其不影响后台的业务处理,如何保证其只影响数据一次是非常重要的,它可以防止产生脏数据或者乱数据,也可以减少并发量,实乃十分有益的一件事。而传统的做法是每次判断数据,这种做法不够智能化和自动化,比较麻烦。而今天的这种自动化处理也可以提升程序的伸缩性。
2023年03月11日
36 阅读
0 评论
0 点赞
2023-02-16
Docker部署 Mysql、redis、Rabbitmq、Vue、Java 项目
Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的镜像中,然后发布到任何流行的 Linux 或 Windows 机器上,也可以实现虚拟化。容器是完全使用沙箱机制,相互之间不会有任何接口。本文主要讲解如何在Linux环境下使用 Docker 部署前后端分离项目,其中涉及到使用 Docker 安装本人项目相关的一些环境 ,例如mysql、rabbitmq、redis,基于CenterOS7.0。Docker 环境安装1.安装 Docker 客户端# step 1: 安装必要的一些系统工具 sudo yum install -y yum-utils device-mapper-persistent-data lvm2 # Step 2: 添加软件源信息 sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo # Step 3: 更新并安装 Docker-CE sudo yum makecache fast sudo yum -y install docker-ce # Step 4: 开启Docker服务 sudo service docker start2.配置镜像加速器sudo mkdir -p /etc/docker sudo tee /etc/docker/daemon.json <<-'EOF' { "registry-mirrors": ["https://sq2b0kv9.mirror.aliyuncs.com"] } EOF sudo systemctl daemon-reload sudo systemctl restart docker安装 PortainerPortainer 是一个轻量级 Web 端的 Docker 管理 UI,Portainer 够轻松地管理不同的 Docker 环境(Docker 主机或集群)。Portainer 的部署和使用十分简单。Portainer 可以部署为 Linux 容器或 Windows 本机容器,也支持其他平台。Portainer 允许您管理所有 Docker 资源(容器、映像、卷、网络等)!它与独立的 Docker 引擎和 Docker 集群模式兼容。1.安装# 拉取官方镜像 docker pull portainer/portainer # 运行镜像到容器 docker run -d -p 9000:9000\ --restart=always\ -v /var/run/docker.sock:/var/run/docker.sock\ -m 20M --oom-kill-disable --memory-swap=-1\ --name portainer\ portainer/portainer2.访问页面访问地址:http://localhost:9000,第一次打开需要设置用户名、密码,docker 模式我一般选择 Local 本机模式。通过此工具我们可以更加简便的对镜像和容器进行操作和管理。登录页 面板页 安装 mysql# docker search mysql 可通过此命令查看可用版本 # 拉取mysql镜像,默认会拉取最新版本,我这里加上版本号 docker pull mysql:8.0.0 # 查看镜像是否拉取成功 docker images # 在/home/docker/mysql目录下创建mysql挂载目录 mkdir {data,logs,conf} # 运行容器 docker run -d -p 3306:3306 -v /home/docker/mysql/my.cnf:/etc/mysql/conf.d/mysqld.cnf -v /home/docker/mysql/data:/var/lib/mysql -v /home/docker/mysql/logs:/var/log/mysql -e MYSQL_ROOT_PASSWORD=12345 --name mysql_test mysql:8.0.0说明:--name:容器名-e:配置信息,此处配置 mysql 的 root 用户登陆密码-d:后台运行容器,保证在退出终端后容器继续运行-p:端口映射,此处映射 主机 3306 端口 到 容器的 3306 端口-v:挂载目录此处需要注意不要直接挂载容器中的 mysql 配置文件目录,可能会将容器内的配置文件目录清空。个人建议将容器中的 my.cnf 文件复制出来进行选择性的修改,再挂载 mysql.cnf 文件即可。docker cp :用于容器与主机之间的数据拷贝。# 语法 docker cp [OPTIONS] CONTAINER:SRC_PATH DEST_PATH|- # 实例 docker cp 96f7f14e99ab:/etc/mysql/conf.d/mysqld.cnf /home/docker/mysql/my.cnf安装 redis因为 redis 默认配置只能够本地连接,不能进行远程访问,使用 Redis 客户端工具连接都会报错,因此需要手动挂载 redis 配置文件。# /home/docker/redis目录下新增挂载文件夹 mkdir {data,conf} # 下载最新版本的Redis镜像 docker pull redis # 新增redis配置文件 cd /home/docker/redis/conf touch redis.conf vim redis.conf添加以下内容#bind 127.0.0.1 protected-mode no appendonly yes requirepass 123456说明:bind 127.0.0.1 ,注释掉这部分,这是限制 redis 只能本地访问protected-mode:默认 yes,开启保护模式,限制为本地访问appendonly:redis 持久化(可选)requirepass:设置访问密码为 123456运行容器docker run --name myredis -p 6379:6379 -v /home/docker/redis/data:/data -v /home/docker/redis/conf/redis.conf:/etc/redis/redis.conf -d redis redis-server /etc/redis/redis.conf说明:--name:容器名称-p :表示将服务器的 6379(冒号前的 6379)端口映射到 docker 的 6379(冒号后的 6379)端口-d :表示以后台服务的形式运行 redis-v :挂载宿主机目录redis redis-server /etc/redis/redis.conf:表示运行 redis 服务器程序,并且指定运行时的配置文件经过以上步骤,便可以通过 redis 客户端工具进行连接,如果连接不上,检查安全组和服务器防火墙端口是否开放安装 rabbitmq# 拉取带图形化管理界面的镜像 docker pull rabbitmq:3.7.7-management # 根据下载的镜像创建和启动容器 docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9说明:-d:后台运行容器;--name:指定容器名;-p:指定服务运行的端口(5672:应用访问端口;15672:控制台 Web 端口号);-v:映射目录或文件;--hostname :主机名(RabbitMQ 的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);-e:指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)Rabbitmq 访问地址:http://localhost:15672 至此,基本的运行环境都安装完毕,下面就是关键的打包步骤了。Vue 前端项目打包将 dist 下的所有文件目录拷贝到 SpringBoot 后端项目的 resources\static 目录下,static 目录需要新建。如果你的项目中用到了 shiro 或者 spring security 等安全框架,需要对静态资源放行。以上配置完成后,先在本地运行,再用 maven 进行打包。将 jar 包上传到服务器后,就要开始制作自己的镜像了,首先在与 jar 包同目录下新建 Dockerfile 文件。# 新建Dockerfile文件 touch Dockerfile # 编写Dockerfile文件 vim Dockerfile加入以下内容# Docker image for springboot file run # VERSION 0.0.1 FROM java:8 # VOLUME 指定了临时文件目录为/tmp。 # 其效果是在主机 /var/lib/docker 目录下创建了一个临时文件,并链接到容器的/tmp VOLUME /tmp # 将jar包添加到容器中并更名为app.jar ADD demo-01.jar app.jar # 运行jar包 RUN bash -c 'touch /app.jar' ENTRYPOINT ["java","-Djava.security.egd=file:/dev/./urandom","-jar","/app.jar"]执行 docker build -t [镜像名称] . ,至此镜像文件就制作完成了。docker images查看镜像是否存在。最后一步,创建并启动容器,docker run --name [容器名称] -d -p 80:8080 [镜像名]。
2023年02月16日
64 阅读
0 评论
0 点赞
2023-02-10
Windows下搭建Redis集群的方法步骤
Windows下搭建Redis集群的方法步骤本文主要介绍了Windows下搭建Redis集群的方法步骤,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下目录Redis集群:在Windows系统下搭建Redis集群:1.下载并安装Redis2.下载并安装ruby3.创建Redis集群Redis集群:如果部署到多台电脑,就跟普通的集群一样;因为Redis是单线程处理的,多核CPU也只能使用一个核,所以部署在同一台电脑上,通过运行多个Redis实例组成集群,然后能提高CPU的利用率。在Windows系统下搭建Redis集群:需要4个部件:Redis、Ruby语言运行环境、Redis的Ruby驱动redis-xxxx.gem、创建Redis集群的工具redis-trib.rb安装Redis,并运行3个实例(Redis集群需要至少3个以上节点,低于3个无法创建);使用redis-trib.rb工具来创建Redis集群,由于该文件是用ruby语言写的,所以需要安装Ruby开发环境,以及驱动redis-xxxx.gem1.下载并安装Redis其GitHub路径如下:https://github.com/MSOpenTech/redis/releases/Redis提供msi和zip格式的下载文件,这里下载zip格式 3.0.504版本将下载到的Redis-x64-3.0.504.zip解压即可,为了方便使用,建议放在盘符根目录下,并修改目录名为Redis,如:C:\Redis 或者D:\Redis 通过配置文件来启动3个不同的Redis实例,由于Redis默认端口为6379,所以这里使用了6380、6381、6382来运行3个Redis实例。注意:为了避免不必要的错误,配置文件尽量保存为utf8格式,并且不要包含注释;配置文件中以下两种保存日志的方式(保存在文件中、保存到System Log中)请根据需求选择其中一种即可:loglevel notice #日志的记录级别,notice是适合生产环境的 logfile "D:/Redis/Logs/redis6380_log.txt" #指定log的保持路径,默认是创建在Redis安装目录下,如果有子目录需要手动创建,如此处的Logs目录 syslog-enabled yes #是否使用系统日志 syslog-ident redis6380 #在系统日志的标识名这里使用了保存在文件中的方式,所以先在Redis目录D:/Redis下新建Logs文件夹[/code]redis.6380.conf 内容如下:port 6380 loglevel notice logfile "D:/Redis/Logs/redis6380_log.txt" appendonly yes appendfilename "appendonly.6380.aof" cluster-enabled yes cluster-config-file nodes.6380.conf cluster-node-timeout 15000 cluster-slave-validity-factor 10 cluster-migration-barrier 1 cluster-require-full-coverage yesredis.6381.conf 内容如下:port 6381 loglevel notice logfile "D:/Redis/Logs/redis6381_log.txt" appendonly yes appendfilename "appendonly.6381.aof" cluster-enabled yes cluster-config-file nodes.6381.conf cluster-node-timeout 15000 cluster-slave-validity-factor 10 cluster-migration-barrier 1 cluster-require-full-coverage yesredis.6382.conf 内容如下:port 6382 loglevel notice logfile "D:/Redis/Logs/redis6382_log.txt" appendonly yes appendfilename "appendonly.6382.aof" cluster-enabled yes cluster-config-file nodes.6382.conf cluster-node-timeout 15000 cluster-slave-validity-factor 10 cluster-migration-barrier 1 cluster-require-full-coverage yes配置内容的解释如下:port 6380 #端口号 loglevel notice #日志的记录级别,notice是适合生产环境的 logfile "Logs/redis6380_log.txt" #指定log的保持路径,默认是创建在Redis安装目录下,如果有子目录需要手动创建,如此处的Logs目录 syslog-enabled yes #是否使用系统日志 syslog-ident redis6380 #在系统日志的标识名 appendonly yes #数据的保存为aof格式 appendfilename "appendonly.6380.aof" #数据保存文件 cluster-enabled yes #是否开启集群 cluster-config-file nodes.6380.conf cluster-node-timeout 15000 cluster-slave-validity-factor 10 cluster-migration-barrier 1 cluster-require-full-coverage yes将上述配置文件保存到Redis目录下,并使用这些配置文件安装3个redis服务,命令如下:注意:redis.6380.conf等配置文件最好使用完整路径,避免重启Redis集群出现问题,博主的安装目录为D:/RedisD:/Redis/redis-server.exe --service-install D:/Redis/redis.6380.conf --service-name redis6380 D:/Redis/redis-server.exe --service-install D:/Redis/redis.6381.conf --service-name redis6381 D:/Redis/redis-server.exe --service-install D:/Redis/redis.6382.conf --service-name redis6382启动这3个服务,命令如下:D:/Redis/redis-server.exe --service-start --service-name Redis6380 D:/Redis/redis-server.exe --service-start --service-name Redis6381 D:/Redis/redis-server.exe --service-start --service-name Redis6382执行结果:2.下载并安装ruby2.1. 下载路径如下:http://dl.bintray.com/oneclick/rubyinstaller/rubyinstaller-2.2.4-x64.exe下载后,双击安装即可,同样,为了操作方便,也是建议安装在盘符根目录下,如: C:\Ruby22-x64 ,安装时这里选中后两个选项,意思是将ruby添加到系统的环境变量中,在cmd命令中能直接使用ruby的命令2.2.下载ruby环境下Redis的驱动,考虑到兼容性,这里下载的是3.2.2版本https://rubygems.org/gems/redis/versions/3.2.2注意:下载在页面右下角相关连接一项中安装该驱动,命令如下:gem install --local path_to_gem/filename.gem实际操作如下:2.3.下载Redis官方提供的创建Redis集群的ruby脚本文件redis-trib.rb,路径如下:https://raw.githubusercontent.com/MSOpenTech/redis/3.0/src/redis-trib.rb打开该链接如果没有下载,而是打开一个页面,那么将该页面保存为redis-trib.rb建议保存到Redis的目录下。注意:因为redis-trib.rb是ruby代码,必须用ruby来打开,若redis-trib.rb无法识别,需要手动选择该文件的打开方式:**选择ruby为的打开方式后,redis-trib.rb的logo都会发生改变,如下图:3.创建Redis集群CMD下切换到Redis目录,使用redis-trib.rb来创建Redis集群:redis-trib.rb create --replicas 0 127.0.0.1:6380 127.0.0.1:6381 127.0.0.1:6382 执行结果:当出现提示时,需要手动输入yes,输入后,当出现以下内容,说明已经创建了Redis集群检验是否真的创建成功,输入以下命令:redis-trib.rb check 127.0.0.1:6380出现以下信息,说明创建的Redis集群是没问题的使用Redis客户端Redis-cli.exe来查看数据记录数,以及集群相关信息D:/Redis/redis-cli.exe -c -p 6380-c 表示 cluster-p 表示 port 端口号输入dbsize查询 记录总数dbsize或者一次输入完整命令:D:/Redis/redis-cli.exe -c -p 6380 dbsize结果如下:输入cluster info可以从客户端的查看集群的信息:cluster info结果如下: 到此这篇关于Windows下搭建Redis集群的方法步骤的文章就介绍到这了。
2023年02月10日
63 阅读
0 评论
0 点赞