caseOnStart => endpoint.onStart() if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) { inbox.synchronized { if (!stopped) { enableConcurrent = true } } }
caseOnStop => val activeThreads = inbox.synchronized { inbox.numActiveThreads } assert(activeThreads == 1, s"There should be only a single active thread but found $activeThreads threads.") dispatcher.removeRpcEndpointRef(endpoint) endpoint.onStop() assert(isEmpty, "OnStop should be the last message")
inbox.synchronized { // "enableConcurrent" will be set to false after `onStop` is called, so we should check it // every time. if (!enableConcurrent && numActiveThreads != 1) { // If we are not the only one worker, exit numActiveThreads -= 1 return } message = messages.poll() if (message == null) { numActiveThreads -= 1 return } } } }
// Track the receivers whose inboxes may contain messages. privateval receivers = newLinkedBlockingQueue[EndpointData]
/** Thread pool used for dispatching messages. */ privateval threadpool: ThreadPoolExecutor = { val availableCores = if (numUsableCores > 0) numUsableCores elseRuntime.getRuntime.availableProcessors() val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads", math.max(2, availableCores)) val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop") for (i <- 0 until numThreads) { pool.execute(newMessageLoop) } pool } }
defregisterRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = { val addr = RpcEndpointAddress(nettyEnv.address, name) val endpointRef = newNettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) synchronized { if (stopped) { thrownewIllegalStateException("RpcEnv has been stopped") } if (endpoints.putIfAbsent(name, newEndpointData(name, endpoint, endpointRef)) != null) { thrownewIllegalArgumentException(s"There is already an RpcEndpoint called $name") } val data = endpoints.get(name) endpointRefs.put(data.endpoint, data.ref) receivers.offer(data) // for the OnStart message } endpointRef }
发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
privatedefpostMessage( endpointName: String, message: InboxMessage, callbackIfStopped: (Exception) => Unit): Unit = { val error = synchronized { val data = endpoints.get(endpointName) if (stopped) { Some(newRpcEnvStoppedException()) } elseif (data == null) { Some(newSparkException(s"Could not find $endpointName.")) } else { data.inbox.post(message) //向相应的RpcEndpointData的收件箱添加消息 receivers.offer(data) //同时,向receivers消息队列添加消息,以便MessageLoop线程处理消息 None } } // We don't need to call `onStop` in the `synchronized` block error.foreach(callbackIfStopped) }
/** Message loop used for dispatching messages. */ privateclassMessageLoopextendsRunnable{ overridedefrun(): Unit = { try { while (true) { try { val data = receivers.take() if (data == PoisonPill) { // Put PoisonPill back so that other MessageLoops can see it. receivers.offer(PoisonPill) return } data.inbox.process(Dispatcher.this) } catch { caseNonFatal(e) => logError(e.getMessage, e) } } } catch { ... ... } } }