博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm-kafka【接口实现】4-1:ZKCoordinator: ZK协调器
阅读量:5842 次
发布时间:2019-06-18

本文共 4604 字,大约阅读时间需要 15 分钟。

hot3.png

     阅读背景:您需要对Zk,Kafka有基础的了解

     本章主题:详尽的梳理ZkCoordinator的过程

package com.mixbox.storm.kafka;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;import java.util.*;import static com.mixbox.storm.kafka.KafkaUtils.taskId;/** *  *  * ZKCoordinator 协调器 *  * @author Yin Shuai */public class ZkCoordinator implements PartitionCoordinator {	public static final Logger LOG = LoggerFactory			.getLogger(ZkCoordinator.class);	SpoutConfig _spoutConfig;	int _taskIndex;	int _totalTasks;		String _topologyInstanceId;		// 每一个分区对应着一个分区管理器	Map
 _managers = new HashMap(); //缓存的List List
 _cachedList; //上次刷新的时间 Long _lastRefreshTime = null; //刷新频率 毫秒 int _refreshFreqMs; //动态分区连接 DynamicPartitionConnections _connections; //动态BrokersReader DynamicBrokersReader _reader; ZkState _state; Map _stormConf; /**  *   * @param connections  *            动态的 分区连接  * @param stormConf  *            Storm的配置文件  * @param spoutConfig  *            Storm sput的配置文件  * @param state  *            对于ZKState的连接  * @param taskIndex  *            任务  * @param totalTasks  *            总共的任务  * @param topologyInstanceId  *            拓扑的实例ID  */ public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig)); } public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { _spoutConfig = spoutConfig; _connections = connections; _taskIndex = taskIndex; _totalTasks = totalTasks; _topologyInstanceId = topologyInstanceId; _stormConf = stormConf; _state = state; ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts; _refreshFreqMs = brokerConf.refreshFreqSecs * 1000; _reader = reader; } /**  * @param stormConf  * @param spoutConfig  * @return  */ private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) { ZkHosts hosts = (ZkHosts) spoutConfig.hosts; return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic); } @Override public List
 getMyManagedPartitions() { if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) { refresh(); _lastRefreshTime = System.currentTimeMillis(); } return _cachedList; } /**  * 简单的刷新的行为  *   */ void refresh() { try { LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections"); // 拿到所有的分区信息 GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo(); // 拿到自己任务的所有分区 List
 mine = KafkaUtils.calculatePartitionsForTask( brokerInfo, _totalTasks, _taskIndex); // 拿到当前任务的分区 Set
 curr = _managers.keySet(); // 构造一个集合 Set
 newPartitions = new HashSet
(mine); // 在new分区中,移除掉所有 自己拥有的分区 newPartitions.removeAll(curr); // 要删除的分区 Set
 deletedPartitions = new HashSet
(curr); // deletedPartitions.removeAll(mine); LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString()); for (Partition id : deletedPartitions) { PartitionManager man = _managers.remove(id); man.close(); } LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString()); for (Partition id : newPartitions) { PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); _managers.put(id, man); } } catch (Exception e) { throw new RuntimeException(e); } _cachedList = new ArrayList
(_managers.values()); LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing"); } @Override public PartitionManager getManager(Partition partition) { return _managers.get(partition); }}

   1 : 首先 ZKCoorDinator 实现 PartitionCoordinator的接口

               

package com.mixbox.storm.kafka;import java.util.List;/** * @author Yin Shuai */public interface PartitionCoordinator {		/**	 * 拿到我管理的分区列表  List{PartitionManager}	 * @return	 */	List
 getMyManagedPartitions(); /**  * @param 依据制定的分区partition,去getManager  * @return  */ PartitionManager getManager(Partition partition);}

         第一个方法拿到所有的   PartitionManager

         第二个方法依据特定的   Partition去得到一个分区管理器

            对于PartitionManager 请参看本空间的另外一篇博文:

             Storm-kafka【接口实现】4-2:PartitionManager: 分区管理器

转载于:https://my.oschina.net/infiniteSpace/blog/293975

你可能感兴趣的文章
抽象类的调用
查看>>
使用硬盘,安装双系统,Win7+CentOS
查看>>
Javascript学习总结
查看>>
php 用正则替换中文字符一系列问题解决
查看>>
ActiveMQ应用笔记一:基本概念&安装
查看>>
大话数据结构之四(串)
查看>>
加热炉简是新来的整个系统的板
查看>>
Mockito使用注意事项
查看>>
[LeetCode] Palindrome Linked List 回文链表
查看>>
UVA - 825Walking on the Safe Side(dp)
查看>>
android大概是通过logcat拦截Log
查看>>
关于codeMirror插件使用的一个坑
查看>>
评论:人才流失强力折射出现实畸形人才观
查看>>
git服务器gitlab之搭建和使用--灰常好的git服务器【转】
查看>>
基于机器学习的web异常检测——基于HMM的状态序列建模,将原始数据转化为状态机表示,然后求解概率判断异常与否...
查看>>
分享一种需求评审的方案
查看>>
虚拟运营商10月或大面积放号 哭穷背后仍有赢家
查看>>
Server2016开发环境配置
查看>>
分布式光伏发电建设中的逆变器及其选型
查看>>
增强网络安全防御 推动物联网走向应用
查看>>