博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊storm client的nimbus.seeds参数
阅读量:7230 次
发布时间:2019-06-29

本文共 8844 字,大约阅读时间需要 29 分钟。

本文主要研究一下storm client的nimbus.seeds参数

NIMBUS_SEEDS

storm-core-1.1.0-sources.jar!/org/apache/storm/Config.java

/**     * The host that the master server is running on, added only for backward compatibility,     * the usage deprecated in favor of nimbus.seeds config.     */    @Deprecated    @isString    public static final String NIMBUS_HOST = "nimbus.host";    /**     * List of seed nimbus hosts to use for leader nimbus discovery.     */    @isStringList    public static final String NIMBUS_SEEDS = "nimbus.seeds";复制代码
  • 可以看到这里废除了 nimbus.host 参数,而 nimbus.seeds 参数主要用于发现 nimbus leader

StormSubmitter

storm-core-1.1.0-sources.jar!/org/apache/storm/StormSubmitter.java

public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser)            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException {        if(!Utils.isValidConf(stormConf)) {            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");        }        stormConf = new HashMap(stormConf);        stormConf.putAll(Utils.readCommandLineOpts());        Map conf = Utils.readStormConfig();        conf.putAll(stormConf);        stormConf.putAll(prepareZookeeperAuthentication(conf));        validateConfs(conf, topology);        Map
passedCreds = new HashMap<>(); if (opts != null) { Credentials tmpCreds = opts.get_creds(); if (tmpCreds != null) { passedCreds = tmpCreds.get_creds(); } } Map
fullCreds = populateCredentials(conf, passedCreds); if (!fullCreds.isEmpty()) { if (opts == null) { opts = new SubmitOptions(TopologyInitialStatus.ACTIVE); } opts.set_creds(new Credentials(fullCreds)); } try { if (localNimbus!=null) { LOG.info("Submitting topology " + name + " in local mode"); if (opts!=null) { localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts); } else { // this is for backwards compatibility localNimbus.submitTopology(name, stormConf, topology); } LOG.info("Finished submitting topology: " + name); } else { String serConf = JSONValue.toJSONString(stormConf); try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) { if (topologyNameExists(name, client)) { throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); } // Dependency uploading only makes sense for distributed mode List
jarsBlobKeys = Collections.emptyList(); List
artifactsBlobKeys; DependencyUploader uploader = new DependencyUploader(); try { uploader.init(); jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader); artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader); } catch (Throwable e) { // remove uploaded jars blobs, not artifacts since they're shared across the cluster uploader.deleteBlobs(jarsBlobKeys); uploader.shutdown(); throw e; } try { setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys); submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf, client); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { // remove uploaded jars blobs, not artifacts since they're shared across the cluster // Note that we don't handle TException to delete jars blobs // because it's safer to leave some blobs instead of topology not running uploader.deleteBlobs(jarsBlobKeys); throw e; } finally { uploader.shutdown(); } } } } catch(TException e) { throw new RuntimeException(e); } invokeSubmitterHook(name, asUser, conf, topology); }复制代码
  • StormSubmitter 的 submitTopologyAs 通过 NimbusClient.getConfiguredClientAs(conf, asUser) 创建 NimbusClient

NimbusClient

storm-core-1.1.0-sources.jar!/org/apache/storm/utils/NimbusClient.java

public static NimbusClient getConfiguredClientAs(Map conf, String asUser) {        if (conf.containsKey(Config.STORM_DO_AS_USER)) {            if (asUser != null && !asUser.isEmpty()) {                LOG.warn("You have specified a doAsUser as param {} and a doAsParam as config, config will take precedence."                        , asUser, conf.get(Config.STORM_DO_AS_USER));            }            asUser = (String) conf.get(Config.STORM_DO_AS_USER);        }        List
seeds; if(conf.containsKey(Config.NIMBUS_HOST)) { LOG.warn("Using deprecated config {} for backward compatibility. Please update your storm.yaml so it only has config {}", Config.NIMBUS_HOST, Config.NIMBUS_SEEDS); seeds = Lists.newArrayList(conf.get(Config.NIMBUS_HOST).toString()); } else { seeds = (List
) conf.get(Config.NIMBUS_SEEDS); } for (String host : seeds) { int port = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString()); NimbusSummary nimbusSummary; NimbusClient client = null; try { client = new NimbusClient(conf, host, port, null, asUser); nimbusSummary = client.getClient().getLeader(); if (nimbusSummary != null) { String leaderNimbus = nimbusSummary.get_host() + ":" + nimbusSummary.get_port(); LOG.info("Found leader nimbus : {}", leaderNimbus); if (nimbusSummary.get_host().equals(host) && nimbusSummary.get_port() == port) { NimbusClient ret = client; client = null; return ret; } try { return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port(), null, asUser); } catch (TTransportException e) { throw new RuntimeException("Failed to create a nimbus client for the leader " + leaderNimbus, e); } } } catch (Exception e) { LOG.warn("Ignoring exception while trying to get leader nimbus info from " + host + ". will retry with a different seed host.", e); continue; } finally { if (client != null) { client.close(); } } throw new NimbusLeaderNotFoundException("Could not find a nimbus leader, please try " + "again after some time."); } throw new NimbusLeaderNotFoundException( "Could not find leader nimbus from seed hosts " + seeds + ". " + "Did you specify a valid list of nimbus hosts for config " + Config.NIMBUS_SEEDS + "?"); }复制代码
  • 这里仍然兼容NIMBUS_HOST 参数,如果有 NIMBUS_HOST 参数则从中读取 seeds,没有则从 NIMBUS_SEEDS 参数获取
  • 之后遍历 seeds,根据每个 seed 创建 NimbusClient,然后调用 client.getClient().getLeader() 获取 leader 信息,如果获取成功,则判断 leader 是否当前连接的 seed,如果是则直接返回,如果不是则根据 leader 的 host 和 port 创建新的 NimbusClient 返回
  • 如果 nimbusSummary 为 null,则会抛出 NimbusLeaderNotFoundException("Could not find a nimbus leader, please try again after some time.")
  • 如果连接 leader 出现异常,则遍历下一个 seed,进行 retry 操作,如果所有 seed 都 retry 失败,则跳出循环,最后抛出 NimbusLeaderNotFoundException("Could not find leader nimbus from seed hosts " + seeds + ". Did you specify a valid list of nimbus hosts for config nimbus.seeds?")

小结

  • 对于storm client 来说,nimbus.seeds 参数用于 client 进行寻找 nimbus leader,而 nimbus.host 参数已经被废弃
  • 寻找 nimbus leader 的过程就是挨个遍历 seeds 配置的 host,进行连接,然后获取 leader 的信息,如果获取成功但是 nimbusSummary 为 null,则抛出 NimbusLeaderNotFoundException("Could not find a nimbus leader, please try again after some time.")。
  • 如果有异常则遍历下一个 seed 进行 retry,如果都不成功,则最后跳出循环,抛出 NimbusLeaderNotFoundException("Could not find leader nimbus from seed hosts " + seeds + ". Did you specify a valid list of nimbus hosts for config nimbus.seeds?")

doc

转载地址:http://vgdfm.baihongyu.com/

你可能感兴趣的文章
阿里CTO:阿里所有技术和产品输出都将必须通过阿里云进行
查看>>
更好用的集群限流功能,Sentinel 发布 v1.4.2
查看>>
Python(生成执行文件)
查看>>
redis安装配置 - ttlsa教程系列之redis
查看>>
Linux --DHCP服务器配置;DHCP服务器中继
查看>>
IE版本多的可爱_已迁移
查看>>
eclipse查看jar包中class的中文注释乱码问题的解决
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
mariadb安装
查看>>
vue+vuex+axios+echarts画一个动态更新的中国地图
查看>>
5.8 volumetric post-processing--game programming gems5 笔记
查看>>
8086的地址空间
查看>>
Android开发动画效果被遮掉的解决方法
查看>>
Apache2.2.17源码编译安装以及配置虚拟主机
查看>>
2017年开发语言排名
查看>>
读二进制表的显示 Binary Watch
查看>>
我的友情链接
查看>>
linux基础:10、基础命令(4)
查看>>
linux中强大的screen命令
查看>>