- 1、有哪些信誉好的足球投注网站(book118)网站文档一经付费(服务费),不意味着购买了该文档的版权,仅供个人/单位学习、研究之用,不得用于商业用途,未经授权,严禁复制、发行、汇编、翻译或者网络传播等,侵权必究。。
- 2、本站所有内容均由合作方或网友上传,本站不对文档的完整性、权威性及其观点立场正确性做任何保证或承诺!文档内容仅供研究参考,付费前请自行鉴别。如您付费,意味着您自己接受本站规则且自行承担风险,本站不退款、不进行额外附加服务;查看《如何避免下载的几个坑》。如果您已付费下载过本站文档,您可以点击 这里二次下载。
- 3、如文档侵犯商业秘密、侵犯著作权、侵犯人身权等,请点击“版权申诉”(推荐),也可以打举报电话:400-050-0827(电话支持时间:9:00-18:30)。
- 4、该文档为VIP文档,如果想要下载,成为VIP会员后,下载免费。
- 5、成为VIP后,下载本文档将扣除1次下载权益。下载后,不支持退款、换文档。如有疑问请联系我们。
- 6、成为VIP后,您将拥有八大权益,权益包括:VIP文档下载权益、阅读免打扰、文档格式转换、高级专利检索、专属身份标志、高级客服、多端互通、版权登记。
- 7、VIP文档为合作方或网友上传,每下载1次, 网站将根据用户上传文档的质量评分、类型等,对文档贡献者给予高额补贴、流量扶持。如果你也想贡献VIP文档。上传文档
查看更多
04.Scala编程实战
Scala编程实战 课程目标 目标:熟练使用Scala编写程序 项目概述 需求 目前大多数的分布式架构底层通信都是通过RPC实现的,RPC框架非常多,比如前我们学过的Hadoop项目的RPC通信框架,但是Hadoop在设计之初就是为了运行长达数小时的批量而设计的,在某些极端的情况下,任务提交的延迟很高,所有Hadoop的RPC显得有些笨重。 Spark 的RPC是通过Akka类库实现的,Akka用Scala语言开发,基于Actor并发模型实现,Akka具有高可靠、高性能、可扩展等特点,使用Akka可以轻松实现分布式RPC功能。 Akka简介 Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。 Actor模型:在计算机科学领域,Actor模型是一个并行计算(Concurrent Computation)模型,它把actor作为并行计算的基本元素来对待:为响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。 Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统,Actor具有如下特性: 1.提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发 2.提供了异步非阻塞的、高性能的事件驱动编程模型 3.超级轻量级事件处理(每GB堆内存几百万Actor) 项目实现 架构图 重要类介绍 ActorSystem 在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,我们可以使用这个ActorSystem创建很多Actor。 Actor 在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法。 preStart()方法:该方法在Actor对象构造方法执行后执行,整个Actor生命周期中仅执行一次。 receive()方法:该方法在Actor的preStart方法执行完成后执行,用于接收消息,会被反复执行。 Master类 package cn.itcast.sparkimport scala.concurrent.duration._import akka.actor.{Props, ActorSystem, Actor}import akka.actor.Actor.Receiveimport com.typesafe.config.ConfigFactoryimport scala.collection.mutable/** * Master为整个集群中的主节点 * Master继承了Actor */class Master extends Actor{ //保存WorkerID和Work信息的map val idToWorker = new mutable.HashMap[String, WorkerInfo] //保存所有Worker信息的Set val workers = new mutable.HashSet[WorkerInfo] //Worker超时时间 val WORKER_TIMEOUT = 10 * 1000 //重新receive方法 //导入隐式转换,用于启动定时器 import context.dispatcher //构造方法执行完执行一次 override def preStart(): Unit = { //启动定时器,定时执行 context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckOfTimeOutWorker) } //该方法会被反复执行,用于接收消息,通过case class模式匹配接收消息 override def receive: Receive = { //Worker向Master发送的注册消息 case RegisterWorker(id, workerHost, memory, cores) = { if(!idToWorker.contains(id)) { val worker = new WorkerI
文档评论(0)