博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm-源码分析-LocalState (backtype.storm.utils)
阅读量:6457 次
发布时间:2019-06-23

本文共 4105 字,大约阅读时间需要 13 分钟。

LocalState

A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes. Every read/write hits disk.

基于map实现, 每次读写都需要从磁盘上将数据读出, 并反序列化成map, 这个过程称为snapshot. 所以说是比较简单和低效的, 只能用于读取配置或参数, 这种偶尔读取的场景.

public synchronized Map
snapshot() throws IOException { int attempts = 0; while(true) { String latestPath = _vs.mostRecentVersionPath(); if(latestPath==null) return new HashMap
(); try { return (Map
) Utils.deserialize(FileUtils.readFileToByteArray(new File(latestPath))); } catch(IOException e) { attempts++; if(attempts >= 10) { throw e; } } }

读写操作都是基于map的操作, get和put, 但是put需要做persist操作. 

这里使用synchronized来做对象的线程间同步, 对于一个LocalState对象, 所有synchronized标有的函数只能被串行操作.

public Object get(Object key) throws IOException {        return snapshot().get(key);    }    public synchronized void put(Object key, Object val, boolean cleanup) throws IOException {        Map
curr = snapshot(); curr.put(key, val); persist(curr, cleanup); }

当然不止这么简单, 为了达到atomic, 还使用了VersionedStore, 参考下一章 

persist不会去update现有的文件, 而是不断的产生递增version的文件, 故每一批更新都会产生一个新的文件

把需要写入的数据序列化 

创建新的versionfile的path 
把数据写入versionfile 
调用succeedVersion, 创建tokenfile以标志versionfile的写入完成 
清除旧版本, 只保留4个版本

private void persist(Map
val, boolean cleanup) throws IOException { byte[] toWrite = Utils.serialize(val); String newPath = _vs.createVersion(); FileUtils.writeByteArrayToFile(new File(newPath), toWrite); _vs.succeedVersion(newPath); if(cleanup) _vs.cleanup(4); }

 

VersionedStore

public VersionedStore(String path) throws IOException {      _root = path;      mkdirs(_root);    }

这个store, 其实就是_root目录下的一堆文件 

文件分两种, 
VersionFile, _root + version, 真正的数据存储文件 
TokenFile, _root + version + “.version”, 标志位文件, 标志version文件是否完成写操作, 以避免读到正在更新的文件 

getAllVersions就是读出所有_root目录下的所有完成写操作的文件, 读出version, 并做从大到小的排序

public List
getAllVersions() throws IOException { List
ret = new ArrayList
(); for(String s: listDir(_root)) { if(s.endsWith(FINISHED_VERSION_SUFFIX)) { ret.add(validateAndGetVersion(s)); } } Collections.sort(ret); Collections.reverse(ret); return ret; }

 

找到最新的版本文件

public Long mostRecentVersion() throws IOException {        List
all = getAllVersions(); if(all.size()==0) return null; return all.get(0);

 

创建新版本号, 用当前时间作为version

public String createVersion() throws IOException {        Long mostRecent = mostRecentVersion();        long version = Time.currentTimeMillis();        if(mostRecent!=null && version <= mostRecent) {            version = mostRecent + 1;        }        return createVersion(version);    }    public String createVersion(long version) throws IOException {        String ret = versionPath(version);        if(getAllVersions().contains(version))            throw new RuntimeException("Version already exists or data already exists");        else            return ret;    }
 

创建tokenfile, 以标记versionfile写完成

public void succeedVersion(String path) throws IOException {        long version = validateAndGetVersion(path);        // should rewrite this to do a file move        createNewFile(tokenPath(version));    }
 

清除旧的版本, 只保留versionsToKeep个, 清除操作就是删除versionfile和tokenfile

public void cleanup(int versionsToKeep) throws IOException {        List
versions = getAllVersions(); if(versionsToKeep >= 0) { versions = versions.subList(0, Math.min(versions.size(), versionsToKeep)); } HashSet
keepers = new HashSet
(versions); for(String p: listDir(_root)) { Long v = parseVersion(p); if(v!=null && !keepers.contains(v)) { deleteVersion(v); } } }
 
本文章摘自博客园,原文发布日期:2013-07-11

转载地址:http://lnszo.baihongyu.com/

你可能感兴趣的文章
面象过程与面象对象
查看>>
用CSS实现图片水印效果代码
查看>>
谷歌设置支持webgl
查看>>
P3402 【模板】可持久化并查集
查看>>
js的AJAX请求有关知识总结
查看>>
Eclipse添加新server时无法选择Tomcat7的问题
查看>>
L207
查看>>
nginx 配置https 负载均衡
查看>>
listing_windows形式输出直线结构体的起点、终点信息
查看>>
双拓扑排序 HDOJ 5098 Smart Software Installer
查看>>
三分 POJ 2420 A Star not a Tree?
查看>>
Java多线程和线程池
查看>>
36.Node.js 工具模块--OS模块系统操作
查看>>
存储过程报错行提示
查看>>
第一篇markdown博文
查看>>
Leetcode 4 - median-of-two-sorted-arrays
查看>>
noj 2033 一页书的书 [ dp + 组合数 ]
查看>>
ERDAS软件应用(四)遥感影像数据增强
查看>>
修改OBS为仅直播音频
查看>>
完整版:《开源框架实战宝典电子书V1.0.0》内测版下载地址!
查看>>