spark 怎么启动worker

发布网友 发布时间:2022-04-23 20:27

我来回答

2个回答

懂视网 时间:2022-05-04 03:50

Akka 中的有类型 Actor 是 Active Objects 模式的一种实现. Smalltalk诞生之时,就已经缺省地将方法调用从同步操作发为异步派发。 有类型 Actor 由两 “部分” 组成, 一个public接口和一个实现, 如果你有 “企业级” Java的开发经验, 这对你应该非常熟悉。

Akka 中的有类型 Actor 是 Active Objects 模式的一种实现. Smalltalk诞生之时,就已经缺省地将方法调用从同步操作发为异步派发。

有类型 Actor 由两 “部分” 组成, 一个public接口和一个实现, 如果你有 “企业级” Java的开发经验, 这对你应该非常熟悉。 对普通actor来说,你拥有一个外部API (public接口的实例) 来将方法调用异步地委托给其实现类的私有实例。

有类型Actor相对于普通Actor的优势在于有类型Actor拥有静态的契约, 你不需要定义你自己的消息, 它的劣势在于对你能做什么和不能做什么进行了一些,比如 你不能使用 become/unbecome.

有类型Actor是使用 JDK Proxies 实现的,JDK Proxies提供了非常简单的api来拦截方法调用。

注意

和普通Akka actor一样,有类型actor也一次处理一个消息。

什么时候使用有类型的Actor

有类型的Actor很适合用在连接actor系统和非actor的代码,因为它可以使你能在外部编写正常的OO模式的代码。但切记不可滥用。

工具箱

返回有类型actor扩展 Returns the Typed Actor Extension
TypedActorExtension extension =
TypedActor.get(system); //system is an instance of ActorSystem

判断一个引用是否是有类型actor代理 Returns whether the reference is a Typed Actor Proxy or not
TypedActor.get(system).isTypedActor(someReference);

返回一个外部有类型actor代理所代表的Akka actor Returns the backing Akka Actor behind an external Typed Actor Proxy
TypedActor.get(system).getActorRefFor(someReference);

返回当前的ActorContext//Returns the current ActorContext,
此方法仅在一个TypedActor 实现的方法中有效 // method only valid within methods of a TypedActor implementation
ActorContext context = TypedActor.context();

返回当前有类型actor的外部代理//Returns the external proxy of the current Typed Actor,
此方法仅在一个TypedActor 实现的方法中有效// method only valid within methods of a TypedActor implementation
Squarer sq = TypedActor.self();


返回一个有类型Actor扩展的上下文实例//Returns a contextual instance of the Typed Actor Extension
这意味着如果你用它创建其它的有类型actor,它们会成为当前有类型actor的子actor//this means that if you create other Typed Actors with this,
//they will become children to the current Typed Actor.
TypedActor.get(TypedActor.context());

具体例子及说明

package practise.akka.typedactors

import akka.dispatch.Future
import akka.japi.Option

/**
 * 这个就是对外的接口,各函数就是Typed Actor的接口方法
 */
public interface Squarer {
 void squareDontCare(int i); //fire-forget

 Future square(int i); //non-blocking send-request-reply

 Option squareNowPlease(int i);//blocking send-request-reply

 int squareNow(int i); //blocking send-request-reply
}


package practise.akka.typedactors

import akka.dispatch.Future
import akka.dispatch.Futures
import akka.actor.TypedActor
import akka.japi.Option
import akka.actor.ActorContext
import groovy.util.logging.Log4j
import akka.actor.ActorRef

/**
 * 这个是接口实现。(实现akka.actor.TypedActor.Receiver接口就能接收actor发来的普通消息(非函数调用消息)。)
 */
@Log4j
class SquarerImpl implements Squarer, akka.actor.TypedActor.Receiver {
 private String name;

 public SquarerImpl() {
 this.name = "default";
 }

 public SquarerImpl(String name) {
 this.name = name;
 }

 public void squareDontCare(int i) {
 log.debug("squareDontCare,fire-and-forget只接收不返回结果,与ActorRef.tell完全一致----" + i) //可以从线程号看出是异步处理的
 int sq = i * i; //Nobody cares :(

 //返回当前的ActorContext,
 // 此方法仅在一个TypedActor 实现的方法中有效
 ActorContext context = TypedActor.context();
 println "context ---- " + context

 //返回当前有类型actor的外部代理,
 // 此方法仅在一个TypedActor 实现的方法中有效
 Squarer mysq = TypedActor. self();
 println "--self --" + mysq

 }

 public Future square(int i) {
 log.debug("square send-request-reply Future----" + i) //可以从线程号看出是异步处理的
 return Futures.successful(i * i, TypedActor.dispatcher());
 }

 public Option squareNowPlease(int i) {
 log.debug("squareNowPlease send-request-reply Option----" + i) //可以从线程号看出是异步处理的
 return Option.some(i * i);
 }

 public int squareNow(int i) {
 log.debug("squareNow send-request-reply result----" + i) //可以从线程号看出是异步处理的
 return i * i;
 }

 @Override
 void onReceive(Object o, ActorRef actorRef) {
 log.debug("TypedActor收到消息----${o}---from:${actorRef}")
 }
}


package practise.akka.typedactors

import akka.actor.ActorSystem
import akka.actor.TypedActor
import akka.actor.TypedProps
import com.typesafe.config.ConfigFactory
import akka.japi.Creator
import groovy.util.logging.Log4j
import akka.actor.ActorContext

/**
 * 这里创建Typed Actor.
 */
@Log4j
class TypedActorsFactory {

 ActorSystem system

 private final String config = """akka {
 loglevel = "${log?.debugEnabled ? "DEBUG" : "INFO"}"
 actor.provider = "akka.remote.RemoteActorRefProvider"
 remote.netty.hostname = "127.0.0.1"
 remote.netty.port = 2552
 remote.log-received-messages = on
 remote.log-sent-messages = on
}"""

 TypedActorsFactory(String sysName) {
 this.system = ActorSystem.create(sysName, ConfigFactory.parseString(config))
 }


 Squarer getTypedActorDefault() {
 Squarer mySquarer =
  TypedActor.get(system).typedActorOf(new TypedProps(Squarer.class, SquarerImpl.class));
 //这里创建的是代理类型
 return mySquarer
 }

 Squarer getTypedActor(String name) {
 Squarer otherSquarer =
  TypedActor.get(system).typedActorOf(new TypedProps(Squarer.class,
   new Creator() {
   public SquarerImpl create() { return new SquarerImpl(name); } //这里创建的是具体的实现类型
   }),
   name); //这个name是actor的name:akka//sys@host:port/user/name
 return otherSquarer
 }

}


下面用几个测试用例实验一下

package practise.akka.typedactors

import akka.actor.ActorRef
import akka.actor.TypedActor
import akka.actor.UntypedActorContext
import akka.dispatch.Future
import com.baoxian.akka.AkkaClientNoReply
import com.baoxian.akka.AkkaServerApp

class TestTypedActors extends GroovyTestCase {

 def testTypeActor() {
 println("----")
 TypedActorsFactory factory = new TypedActorsFactory("typedServer")
// Squarer squarer = factory?.getTypedActorDefault() //创建代理
 Squarer squarer = factory?.getTypedActor("serv") //具体实现
 squarer?.squareDontCare(10)
 Future future = squarer?.square(10)
 AkkaServerApp app = new AkkaServerApp("tmp", "127.0.0.1", 6666, "result") //这是我自己构建的接收器
 app.messageProcessor = {msg, UntypedActorContext context ->
  log.info("
结果为" + msg) } app.startup() akka.pattern.Patterns.pipe(future).to(app.serverActor) //Future的返回结果pipe到接收器中了,在log中能看到结果 println "----" + squarer?.squareNowPlease(10)?.get() println "----" + squarer?.squareNow(10) //返回有类型actor扩展 TypedActor.get(factory.system) //返回一个外部有类型actor代理所代表的Akka actor ActorRef actor = TypedActor.get(factory.system).getActorRefFor(squarer); actor.tell("消息") //这个消息将会在SquarerImpl的onReceive方法中接收到 sleep(1000 * 60 * 10) // TypedActor.get(factory.system).stop(squarer); //这将会尽快地异步终止与指定的代理关联的有类型Actor TypedActor.get(factory.system).poisonPill(squarer);//这将会在有类型actor完成所有在当前调用之前对它的调用后异步地终止它 } def testRemoteTypedActor() { AkkaClientNoReply client = new AkkaClientNoReply("akka://typedServer@127.0.0.1:2552/user/serv") client.send("远程消息") //这将会在SquarerImpl的onReceive方法中接收到 sleep(1000) client.shutdown() } }

热心网友 时间:2022-05-04 00:58

基于spark1.3.1的源码进行分析
Spark master启动源码分析
1、在start-master.sh调用master的main方法,main方法调用
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)//启动系统和actor
actorSystem.awaitTermination()
}
2、调用startSystemAndActor启动系统和创建actor
def startSystemAndActor(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
securityManager = securityMgr)
val actor = actorSystem.actorOf(
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
val timeout = AkkaUtils.askTimeout(conf)
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
3、调用AkkaUtils.createActorSystem来创建ActorSystem
def createActorSystem(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager): (ActorSystem, Int) = {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(port, startService, conf, name)
}
4、调用Utils.startServiceOnPort启动一个端口上的服务,创建成功后调用doCreateActorSystem创建ActorSystem
5、ActorSystem创建成功后创建Actor
6、调用Master的主构造函数,执行preStart()

1、start-slaves.sh调用Worker类的main方法
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir)
actorSystem.awaitTermination()
}
2、调用startSystemAndActor启动系统和创建actor
def startSystemAndActor(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): (ActorSystem, Int) = {

// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val actorName = "Worker"
val securityMgr = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
conf = conf, securityManager = securityMgr)
val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
(actorSystem, boundPort)
}
3、调用AkkaUtils的createActorSystem创建ActorSystem
def createActorSystem(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager): (ActorSystem, Int) = {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
Utils.startServiceOnPort(port, startService, conf, name)
}
4、创建完ActorSystem后调用Worker的主构造函数,执行preStart方法
override def preStart() {
assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
logInfo("Spark home: " + sparkHome)
createWorkDir()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
registerWithMaster()

metricsSystem.registerSource(workerSource)
metricsSystem.start()
// Attach the worker metrics servlet handler to the web ui after the metrics system is started.
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
5、调用registerWithMaster方法向Master注册启动的worker
def registerWithMaster() {
// DisassociatedEvent may be triggered multiple times, so don't attempt registration
// if there are outstanding registration attempts scheled.
registrationRetryTimer match {
case None =>
registered = false
tryRegisterAllMasters()
connectionAttemptCount = 0
registrationRetryTimer = Some {
context.system.scheler.schele(INITIAL_REGISTRATION_RETRY_INTERVAL,
INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
}
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
" attempt scheled already.")
}
}
6、调用tryRegisterAllMasters向Master发送注册的Worker消息
private def tryRegisterAllMasters() {
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
val actor = context.actorSelection(masterAkkaUrl)
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
}
}
7、Master的receiveWithLogging接收到消息执行
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
{
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
sender, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schele()
} else {
val workerAddress = worker.actor.path.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress)
}
}
}
8、失败向worker返回失败消息,成功则返回Master的相关信息
9、返回消息后调用schele,但是因为没有application,所以这时候不会进行资源的分配

至此整个Spark集群就已经启动完成

声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com