主页 > 合法imtoken钱包下载 > 以太坊源码分析(三十六)ethdb源码分析
以太坊源码分析(三十六)ethdb源码分析
go-ethereum 的所有数据都存储在 levelDB,Google 开源的 KeyValue 文件数据库,整个区块链的所有数据都存储在一个 levelDB 数据库中。 LevelDB支持按文件大小拆分文件的功能,所以我们看到的区块链中的数据都是小文件,但这些小文件其实都是同一个levelDB实例。 这里简单看下levelDB的go package代码。
levelDB官网介绍的特性
**特征**:
- key和value都是任意长度的字节数组;
- entry(即一条KV记录)默认按照key的字典顺序存储,当然开发者也可以重载这个排序函数;
- 提供基本操作接口:Put()、Delete()、Get()、Batch();
- 支持将批处理操作作为原子操作执行;
- 可以创建数据全景快照,并允许在快照中搜索数据;
- 可以通过前向(或后向)迭代器遍历数据(迭代器会隐式创建一个快照);
- 使用 Snappy 自动压缩数据;
- 便携性;
**限制** :
- 非关系数据模型(NoSQL),不支持sql语句,也不支持索引;
- 一次只允许一个进程访问特定的数据库;
- 没有内置C/S架构以太坊开源代码,开发者可以使用LevelDB库自行封装一个服务器;
源码所在目录在ethereum/ethdb目录下。代码比较简单,分为以下三个文件
- database.go levelDB的封装代码
- memory_database.go 是基于内存的测试用数据库,不会持久化为文件,仅供测试用
- interface.go 定义数据库的接口
- database_test.go 测试用例
## 界面.go
看下面的代码,基本定义了KeyValue数据库的基本操作,如Put、Get、Has、Delete等,levelDB不支持SQL,基本可以理解为数据结构中的一个Map。
包 ethdb
const IdealBatchSize = 100 * 1024
// Putter 包装了批处理和常规数据库都支持的数据库写操作。
//Putter接口定义了批量操作和普通操作的写接口
输入推杆接口 {
Put(key []byte, value []byte) 错误
}
// 数据库包装了所有的数据库操作。 所有方法都可以安全地同时使用。
//数据库接口定义了所有的数据库操作,所有的方法都是多线程安全的。
类型数据库接口{
推杆
Get(key []byte) ([]byte, 错误)
有(键[]字节)(布尔值,错误)
删除(键[]字节)错误
关闭()
NewBatch() 批次
}
// Batch 是一个只写数据库,它将更改提交到其主机数据库
// 当 Write 被调用时。 批处理不能同时使用。
//批量操作接口不能同时被多个线程使用。 当调用 Write 方法时,数据库将提交写入的更改。
类型批处理界面{
推杆
ValueSize() int // 批次中的数据量
写入()错误
}
## 内存数据库.g
这基本上是一个封装内存的 Map 结构。 然后使用锁来保护资源不受多个线程的影响。
类型 MemDatabase 结构 {
数据库映射[字符串][]字节
锁同步.RWMutex
}
func NewMemDatabase() (*MemDatabase, error) {
返回 &MemDatabase{
数据库:制作(地图[字符串][]字节),
}, 无
}
func (db *MemDatabase) Put(key []byte, value []byte) error {
D b。 锁。 锁()
推迟分贝。 锁。 开锁()
db.db[字符串(键)] = common.CopyBytes(值)
返回零
}
func (db *MemDatabase) Has(key []byte) (bool, error) {
D b。 锁。 锁()
推迟分贝。 锁。 解锁()
_, 好的 := db.db[string(key)]
返回 ok,nil
}
然后就是Batch的操作。 也比较简单,一看就懂。
输入 kv struct{ k, v []byte }
类型 memBatch 结构 {
db *内存数据库
写 []kv
大小整数
}
func (b *memBatch) Put(key, value []byte) error {
b.writes = append(b.writes, kv{common.CopyBytes(key), common.CopyBytes(value)})
b. 大小 += 长度(值)
返回零
}
func (b *memBatch) Write() 错误 {
b. D b。 锁。 锁()
推迟 b.db.lock.Unlock()
对于 _, kv := range b.writes {
b.db.db[字符串(kv.k)] = kv.v
}
返回零
}
##数据库.go
这是以太坊客户端实际使用的代码,封装了levelDB接口。
进口 (
“strconv”
“字符串”
“同步”
“时间”
“github.com/ethereum/go-ethereum/log”
“github.com/ethereum/go-ethereum/metrics”
“github.com/syndtr/goleveldb/leveldb”
“github.com/syndtr/goleveldb/leveldb/errors”
“github.com/syndtr/goleveldb/leveldb/filter”
“github.com/syndtr/goleveldb/leveldb/iterator”
“github.com/syndtr/goleveldb/leveldb/opt”
gometrics“github.com/rcrowley/go-metrics”
)
使用了 github.com/syndtr/goleveldb/leveldb 的 leveldb 包,所以可以在那里找到一些使用过的文档。 可以看出数据结构主要是增加了很多Metrics来记录数据库的使用情况,并且增加了quitChan来处理停止时的一些情况,后面会分析。 如果对下面的代码有疑问,应该再过滤一下: filter.NewBloomFilter(10) 这个可以暂时忽略。 这是levelDB中性能优化的一个选项以太坊开源代码,可以忽略。
键入 LDBDatabase 结构 {
fn string // 报告文件名
db *leveldb.DB // LevelDB 实例
getTimer gometrics.Timer // 用于测量数据库获取请求计数和延迟的计时器
putTimer gometrics.Timer // 用于测量数据库放置请求计数和延迟的计时器
...指标
quitLock sync.Mutex // 保护退出通道访问的互斥量
quitChan chan chan error // 关闭数据库前退出通道停止metrics收集
日志日志。 Logger // 跟踪数据库路径的上下文记录器
}
// NewLDBDatabase 返回一个 LevelDB 包装对象。
func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) {
记录器:=日志。 New("数据库", 文件)
// 确保我们有一些最小的缓存和文件保证
如果缓存 < 16 {
缓存=16
}
如果句柄 < 16 {
手柄 = 16
}
logger.Info("分配的缓存和文件句柄", "cache", cache, "handles", handles)
// 打开数据库并恢复任何潜在的损坏
db, err := leveldb.OpenFile(file, &opt.Options{
OpenFilesCacheCapacity:句柄,
BlockCacheCapacity: 缓存 / 2 * opt.MiB,
WriteBuffer: cache / 4 * opt.MiB, // 其中两个在内部使用
过滤器:过滤器。 新布隆过滤器(10),
})
如果 _,已损坏:= err.(*errors.ErrCorrupted); 损坏的{
数据库,错误 = leveldb。 恢复文件(文件,无)
}
//(重新)检查错误并在打开数据库失败时中止
如果错误!=无{
返回零,错误
}
返回 &LDB数据库{
fn:文件,
分贝:分贝,
日志:记录器,
}, 无
}
看看下面Put和Has的代码,因为github.com/syndtr/goleveldb/leveldb封装的代码支持多线程同时访问,所以下面的代码是没有锁保护的,大家可以关注一下。 这里大部分代码都是直接调用leveldb包,就不详细介绍了。 比较有趣的地方之一是 Metrics 代码。
// Put 将给定的键/值放入队列
func (db *LDBDatabase) Put(key []byte, value []byte) error {
// 如果需要,测量数据库放置延迟
如果 db.putTimer != nil {
推迟 db.putTimer.UpdateSince(time.Now())
}
// 生成要写入磁盘的数据,更新仪表并写入
//值=rle。 压缩(值)
如果 db.writeMeter != nil {
db.writeMeter.Mark(int64(len(value)))
}
返回 db.db.Put(key, value, nil)
}
func (db *LDBDatabase) Has(key []byte) (bool, error) {
返回 db.db.Has(key, nil)
}
###指标处理
之前创建NewLDBDatabase的时候,很多内部的Metrics都没有初始化。 此时,Metrics 为 nil。 在 Meter 方法中初始化 Metrics。 从外部传入一个prefix参数,然后创建各种Metrics(如何创建Meter会在后面的Meter专题中分析),然后创建quitChan。 最后启动一个线程调用db.meter方法。
// Meter 配置数据库指标收集器和
func (db *LDBDatabase) Meter(前缀字符串) {
// 如果指标系统被禁用,则短路计量
如果!指标。 启用{
返回
}
// 在请求的前缀处初始化所有指标收集器
db.getTimer = metrics.NewTimer(prefix + "user/gets")
db.putTimer = metrics.NewTimer(prefix + "user/puts")
db.delTimer = metrics.NewTimer(prefix + "user/dels")
db.missMeter = metrics.NewMeter(prefix + "user/misses")
db.readMeter = metrics.NewMeter(prefix + "user/reads")
db.writeMeter = metrics.NewMeter(prefix + "user/writes")
db.compTimeMeter = metrics.NewMeter(prefix + "compact/time")
db.compReadMeter = metrics.NewMeter(prefix + "compact/input")
db.compWriteMeter = metrics.NewMeter(prefix + "compact/output")
// 为定期收集器创建一个退出通道并运行它
db.quitLock.Lock()
D b。 quitChan = make(陈陈错误)
db.quitLock.Unlock()
去 db.meter(3 * time.Second)
}
此方法每 3 秒获取 leveldb 内部计数器并将它们发布到指标子系统。 这是一个无限循环的方法,直到 quitChan 收到退出信号。
// meter 定期检索内部 leveldb 计数器并将它们报告给
// 指标子系统。
// 这是统计表的样子(当前):
//下面的注释是调用db.db.GetProperty("leveldb.stats")返回的字符串。 后续代码需要解析这个字符串,将信息写入Meter。
// 压实
// 等级 | 表 | 大小(MB) | 时间(秒) | 阅读(MB) | 写入(MB)
// ------+------------+------------+------------ ----+----------------+--------------
// 0 | 0 | 0.00000 | 1.27969 | 0.00000 | 12.31098
// 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294
// 2 | 523| 1000.37159 | 7.26059 | 66.86342 | 66.77884
// 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
func (db *LDBDatabase) meter(刷新时间.Duration) {
// 创建计数器来存储当前值和先前值
计数器 := make([][]float64, 2)
因为我:= 0; 我 < 2; 我++ {
计数器[i] = make([]float64, 3)
}
// 无限迭代并收集统计数据
因为我:= 1; ; 我++ {
// 检索数据库统计信息
统计数据,错误:= db.db.GetProperty("leveldb.stats")
如果错误!=无{
db.log.Error("读取数据库统计失败", "err", err)
返回
}
// 找到压缩表,跳过表头
行:=字符串。 拆分(统计数据,“\n”)
对于 len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" {
行 = 行 [1:]
}
如果 len(行)