多线程设计模式 - Master-Worker模式
并发设计模式属于设计优化的一部分,它对于一些常用的多线程结构的总结和抽象。与串行相比并行程序结构通常较为复杂,因此合理的使用并行模式在多线程并发中更具有意义。
1. Master-Worker模式
- - Master-Worker模式是常用的并行模式。它的核心思想是系统由两类进程协作工作:Master进程和Worker进程。Master负责接收和分配任务,Worker负责处理子任务。当各个Worker子进程处理完成后,会将结果返回给Master,由Master做归纳和总结。
- - 其好处是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量。
示例:下
说明:看类注释即可明了 不明白可以去屎了1 //Task.java 2 public class Task { 3 4 private int id; 5 private String name; 6 private int price; 7 public int getId() { 8 return id; 9 } 10 public void setId(int id) { 11 this.id = id; 12 } 13 public String getName() { 14 return name; 15 } 16 public void setName(String name) { 17 this.name = name; 18 } 19 public int getPrice() { 20 return price; 21 } 22 public void setPrice(int price) { 23 this.price = price; 24 } 25 } 26 //Worker.java 27 public class Worker implements Runnable{ 28 29 private ConcurrentLinkedQueueworkQueue; 30 private ConcurrentHashMap resultMap; 31 32 @Override 33 public void run() { 34 while(true){ 35 Task input = this.workQueue.poll(); 36 if(input==null)break; 37 //真正的去做业务处理 38 Object output = MyWorker.handle(input); 39 this.resultMap.put(Integer.toString(input.getId()), output); 40 } 41 } 42 43 public static Object handle(Task input){ 44 return null; 45 } 46 47 public void setWorkerQueue(ConcurrentLinkedQueue workQueue) { 48 this.workQueue=workQueue; 49 } 50 51 public void setResultMap(ConcurrentHashMap resultMap) { 52 this.resultMap=resultMap; 53 54 } 55 } 56 57 //MyWorker.java继承Worker.java,重写handle方法 58 public class MyWorker extends Worker{ 59 60 public static Object handle(Task input) { 61 Object output = null; 62 try { 63 //处理task的耗时,可能是数据的加工,也可能是操作数据库 64 Thread.sleep(500); 65 output = input.getPrice(); 66 } catch (InterruptedException e) { 67 e.printStackTrace(); 68 } 69 return output; 70 } 71 72 } 73 //Master.java * 74 public class Master { 75 76 //1 应该有一个承装任务的集合 77 private ConcurrentLinkedQueue workQueue = new ConcurrentLinkedQueue (); 78 //2 使用普通的HashMap去承装所有的worker对象 79 private HashMap workers = new HashMap (); 80 //3 使用一个容器承装每一个worker并发执行的结果集 81 private ConcurrentHashMap resultMap = new ConcurrentHashMap (); 82 //4 构造方法 83 public Master(Worker worker,int workerCount){ 84 //每一个worker对象都需要有master的应用workQueue用于任务的领取,resultMap用于任务的提交 85 worker.setWorkerQueue(this.workQueue); 86 worker.setResultMap(this.resultMap); 87 88 for(int i=0;i me:workers.entrySet()){100 me.getValue().start();101 }102 }103 //8 判断线程是否执行完毕104 public boolean isComplete() {105 for(Map.Entry me:workers.entrySet()){106 if(me.getValue().getState()!=Thread.State.TERMINATED){107 return false;108 }109 }110 return true;111 }112 //9 返回结果集数据113 public int getResult() {114 int ret = 0;115 for(Map.Entry me:resultMap.entrySet()){116 //汇总逻辑117 ret+=(Integer)me.getValue();118 }119 return ret;120 }121 }122 //主函数123 public class Main {124 125 public static void main(String[] args) {126 System.out.println("我的机器可用processor的数量:"+Runtime.getRuntime().availableProcessors());127 Master master = new Master(new MyWorker(),10);128 129 Random r = new Random();130 for(int i=0;i<100;i++){131 Task task = new Task();132 task.setId(i);133 task.setName("任务"+i);134 task.setPrice(r.nextInt(1000));135 master.submit(task);136 }137 master.execute();138 139 long start = System.currentTimeMillis();140 while(true){141 if(master.isComplete()){142 long end = System.currentTimeMillis() -start;143 int ret = master.getResult();144 System.out.println("最终结果:"+ret+",执行耗时:"+end+"毫秒");145 break;146 }147 }148 }149 150 }