博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
akka设计模式系列-Aggregate模式
阅读量:6274 次
发布时间:2019-06-22

本文共 3575 字,大约阅读时间需要 11 分钟。

  所谓的Aggregate模式,其实就是聚合模式,跟masterWorker模式有点类似,但其出发点不同。masterWorker模式是指master向worker发送命令,worker完成某种业务逻辑。而聚合模式则刚好相反,由各个worker完成某种业务逻辑后,把结果汇总发给某个actor,这个actor不一定是masterActor。

class AggregateMasterActor extends Actor{  override def receive: Receive = {    case cmd: AggregateCommand.Aggregate =>      // 将此次汇总结果汇报给from,为了简化,此处用self替代      val from = self      val backendActor = context.actorOf(Props(new AggregateMasterBackendActor(from,cmd.parallel)),s"AggregateMasterBackendActor-${cmd.at}")      backendActor ! cmd    case AggregateBackendEvent.WorkDone(sum) =>      val from = sender()      println(s"AggregateMasterActor [${self.path.name}] 收到 ${from.path.name} 汇总结果 $sum")  }}class AggregateMasterBackendActor(replyTo:ActorRef,parallel:Int) extends Actor{  var counter = 0  var sum = 0L  override def receive: Receive = {    case AggregateCommand.Aggregate(_) =>      println(s"AggregateMasterBackendActor [${self.path.name}] 开始工作,parallel $parallel,工作结果汇总给 ${replyTo.path.name}")      1 to parallel foreach { i =>        val worker = context.actorOf(Props(new AggregateWorker(self)),s"AggregateWorker-$i")        worker ! AggregateBackendCommand.Aggregate(i,parallel)      }    case AggregateWorkerEvent.WorkDone(result) =>      counter += 1      sum += result      if(counter == parallel){        replyTo ! AggregateBackendEvent.WorkDone(sum)        context.stop(self)        println(s"AggregateMasterBackendActor [${self.path.name}] 工作结束退出")      }  }}class AggregateWorker(replyTo:ActorRef) extends Actor{  def calcResult(index:Int,parallel:Int):Long = index * parallel  override def receive: Receive = {    case AggregateBackendCommand.Aggregate(index,parallel) =>      println(s"AggregateWorker [${self.path.name}] 开始工作 index=$index,工作汇总给 ${replyTo.path.name}")      val result = calcResult(index,parallel)      replyTo ! AggregateWorkerEvent.WorkDone(result)      println(s"AggregateWorker [${self.path.name}] 工作结束退出")      context.stop(self)  }}object AggregatePattern {  def main(args: Array[String]): Unit = {    val system = ActorSystem("AggregatePattern",ConfigFactory.load())    val aggregateMasterActor =  system.actorOf(Props(new AggregateMasterActor),"AggregateMasterActor")    aggregateMasterActor ! AggregateCommand.Aggregate(3)  }}

 输出:

AggregateMasterBackendActor [AggregateMasterBackendActor-1531383454073] 开始工作,parallel 3,工作结果汇总给 AggregateMasterActorAggregateWorker [AggregateWorker-1] 开始工作 index=1,工作汇总给 AggregateMasterBackendActor-1531383454073AggregateWorker [AggregateWorker-2] 开始工作 index=2,工作汇总给 AggregateMasterBackendActor-1531383454073AggregateWorker [AggregateWorker-3] 开始工作 index=3,工作汇总给 AggregateMasterBackendActor-1531383454073AggregateWorker [AggregateWorker-1] 工作结束退出AggregateWorker [AggregateWorker-3] 工作结束退出AggregateWorker [AggregateWorker-2] 工作结束退出AggregateMasterBackendActor [AggregateMasterBackendActor-1531383454073] 工作结束退出AggregateMasterActor [AggregateMasterActor] 收到 AggregateMasterBackendActor-1531383454073 汇总结果 18

  从代码来看该设计模式也比较简单,就是由Master创建以临时的子actor,此处命名为MasterBackend,将汇报对象的actorRef以构造函数的形式传递给MasterBackend,此处为了简单用self替代;MasterBackend根据并行参数,创建对应个数的workerActor,并把本身的actorRef以构造函数的形式传递给workerActor,workerActor执行具体的业务逻辑,并将汇总结果,发送给replyTo(也就是MasterBackend);MasterBackend收到workerActor的汇总结果,根据并行参数,判断所有子actor是否执行结束,若执行结束,此次计算完成,将汇总后的结果,发送给replyTo(也就是MasterActor)。

  上面这种设计模式有一个明显的好处,就是Master可以迅速创建大量的聚合工作而不阻塞,因为它收到命令后,只是简单的创建MasterBackend,工作交给它去执行,这个过程非常快。如果某个聚合工作比较慢,并不会影响其他任务。

  之所以说这个设计模式非常重要,是因为在spark/storm等大多分布式框架中都有它的影子。他们都选择将功能进行拆解,专门的节点或actor分别负责任务的接收、创建、执行、汇总这些工作,工作之间互不影响。如果能够深刻的理解这种设计模式,你将会设计出一个架构分层合理、互相解耦的高质量应用系统。

 

转载地址:http://grgpa.baihongyu.com/

你可能感兴趣的文章
企业能源管理系统的基本要求和主要内容
查看>>
JAVA基础学习之-AQS的实现原理分析
查看>>
IT兄弟连 JavaWeb教程 监听器4
查看>>
[喵咪BELK实战(3)] logstash+filebeat搭建
查看>>
线程中无法注入bean
查看>>
jetty的xml配置文件
查看>>
Hyper-V:虚拟网络配置
查看>>
按位运算符操作
查看>>
java8对接口的改变
查看>>
springboot中使用filter时注入bean为null的解决办法
查看>>
唠唠SE的IO-04——缓冲输入输出流
查看>>
hive join 数据倾斜 真实案例
查看>>
Object-C代码练习【文件管理练习(每秒写入一个时间到文件)】
查看>>
Redis列表
查看>>
文件查找工具之find命令详解
查看>>
linux命令 — lsof 查看进程打开那些文件 或者 查看文件给那个进程使用
查看>>
PHP+Swoole及时通讯
查看>>
centos安装图形
查看>>
SpringCloud(第 012 篇)电影微服务接入 Feign 进行客户端负载均衡,通过 FeignClient 调用远程 Http 微服务...
查看>>
mysql tomcat redis nginx 版本的查看方法
查看>>