Skip to content

Commit 123a66e

Browse files
authored
Merge pull request #4 from optimism-java/pebble
feat: add pebble storage
2 parents dbb9004 + 855d27b commit 123a66e

File tree

3 files changed

+493
-0
lines changed

3 files changed

+493
-0
lines changed

storage/content_storage.go

+3
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@ import (
77
)
88

99
var ErrContentNotFound = fmt.Errorf("content not found")
10+
var ErrInsufficientRadius = fmt.Errorf("insufficient radius")
1011

1112
var MaxDistance = uint256.MustFromHex("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
1213

14+
var SizeKey = []byte("size")
15+
1316
type ContentType byte
1417

1518
type ContentKey struct {

storage/pebble/storage.go

+324
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
package ethpepple
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
"runtime"
7+
"sync"
8+
"sync/atomic"
9+
10+
"github.com/cockroachdb/pebble"
11+
"github.com/cockroachdb/pebble/bloom"
12+
"github.com/ethereum/go-ethereum/common"
13+
"github.com/ethereum/go-ethereum/log"
14+
"github.com/ethereum/go-ethereum/p2p/enode"
15+
"github.com/holiman/uint256"
16+
"github.com/optimism-java/shisui2/storage"
17+
)
18+
19+
const (
20+
// minCache is the minimum amount of memory in megabytes to allocate to pebble
21+
// read and write caching, split half and half.
22+
minCache = 16
23+
24+
// minHandles is the minimum number of files handles to allocate to the open
25+
// database files.
26+
minHandles = 16
27+
28+
// 5% of the content will be deleted when the storage capacity is hit and radius gets adjusted.
29+
contentDeletionFraction = 0.05
30+
)
31+
32+
var _ storage.ContentStorage = &ContentStorage{}
33+
34+
type PeppleStorageConfig struct {
35+
StorageCapacityMB uint64
36+
DB *pebble.DB
37+
NodeId enode.ID
38+
NetworkName string
39+
}
40+
41+
func NewPeppleDB(dataDir string, cache, handles int, namespace string) (*pebble.DB, error) {
42+
// Ensure we have some minimal caching and file guarantees
43+
if cache < minCache {
44+
cache = minCache
45+
}
46+
if handles < minHandles {
47+
handles = minHandles
48+
}
49+
logger := log.New("database", namespace)
50+
logger.Info("Allocated cache and file handles", "cache", common.StorageSize(cache*1024*1024), "handles", handles)
51+
52+
// The max memtable size is limited by the uint32 offsets stored in
53+
// internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
54+
//
55+
// - MaxUint32 on 64-bit platforms;
56+
// - MaxInt on 32-bit platforms.
57+
//
58+
// It is used when slices are limited to Uint32 on 64-bit platforms (the
59+
// length limit for slices is naturally MaxInt on 32-bit platforms).
60+
//
61+
// Taken from https://github.com/cockroachdb/pebble/blob/master/internal/constants/constants.go
62+
maxMemTableSize := (1<<31)<<(^uint(0)>>63) - 1
63+
64+
// Two memory tables is configured which is identical to leveldb,
65+
// including a frozen memory table and another live one.
66+
memTableLimit := 2
67+
memTableSize := cache * 1024 * 1024 / 2 / memTableLimit
68+
69+
// The memory table size is currently capped at maxMemTableSize-1 due to a
70+
// known bug in the pebble where maxMemTableSize is not recognized as a
71+
// valid size.
72+
//
73+
// TODO use the maxMemTableSize as the maximum table size once the issue
74+
// in pebble is fixed.
75+
if memTableSize >= maxMemTableSize {
76+
memTableSize = maxMemTableSize - 1
77+
}
78+
opt := &pebble.Options{
79+
// Pebble has a single combined cache area and the write
80+
// buffers are taken from this too. Assign all available
81+
// memory allowance for cache.
82+
Cache: pebble.NewCache(int64(cache * 1024 * 1024)),
83+
MaxOpenFiles: handles,
84+
85+
// The size of memory table(as well as the write buffer).
86+
// Note, there may have more than two memory tables in the system.
87+
MemTableSize: uint64(memTableSize),
88+
89+
// MemTableStopWritesThreshold places a hard limit on the size
90+
// of the existent MemTables(including the frozen one).
91+
// Note, this must be the number of tables not the size of all memtables
92+
// according to https://github.com/cockroachdb/pebble/blob/master/options.go#L738-L742
93+
// and to https://github.com/cockroachdb/pebble/blob/master/db.go#L1892-L1903.
94+
MemTableStopWritesThreshold: memTableLimit,
95+
96+
// The default compaction concurrency(1 thread),
97+
// Here use all available CPUs for faster compaction.
98+
MaxConcurrentCompactions: runtime.NumCPU,
99+
100+
// Per-level options. Options for at least one level must be specified. The
101+
// options for the last level are used for all subsequent levels.
102+
Levels: []pebble.LevelOptions{
103+
{TargetFileSize: 2 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
104+
{TargetFileSize: 4 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
105+
{TargetFileSize: 8 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
106+
{TargetFileSize: 16 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
107+
{TargetFileSize: 32 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
108+
{TargetFileSize: 64 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
109+
{TargetFileSize: 128 * 1024 * 1024, FilterPolicy: bloom.FilterPolicy(10)},
110+
},
111+
ReadOnly: false,
112+
}
113+
// Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130
114+
// for more details.
115+
opt.Experimental.ReadSamplingMultiplier = -1
116+
db, err := pebble.Open(dataDir+"/"+namespace, opt)
117+
return db, err
118+
}
119+
120+
type ContentStorage struct {
121+
nodeId enode.ID
122+
storageCapacityInBytes uint64
123+
radius atomic.Value
124+
log log.Logger
125+
db *pebble.DB
126+
size atomic.Uint64
127+
writeOptions *pebble.WriteOptions
128+
bytePool sync.Pool
129+
}
130+
131+
func NewPeppleStorage(config PeppleStorageConfig) (storage.ContentStorage, error) {
132+
cs := &ContentStorage{
133+
nodeId: config.NodeId,
134+
db: config.DB,
135+
storageCapacityInBytes: config.StorageCapacityMB * 1000_000,
136+
log: log.New("storage", config.NetworkName),
137+
writeOptions: &pebble.WriteOptions{Sync: false},
138+
bytePool: sync.Pool{
139+
New: func() interface{} {
140+
out := make([]byte, 8)
141+
return &out
142+
},
143+
},
144+
}
145+
cs.radius.Store(storage.MaxDistance)
146+
147+
val, _, err := cs.db.Get(storage.SizeKey)
148+
if err != nil && err != pebble.ErrNotFound {
149+
return nil, err
150+
}
151+
if err == nil {
152+
size := binary.BigEndian.Uint64(val)
153+
// init stage, no need to use lock
154+
cs.size.Store(size)
155+
if size > cs.storageCapacityInBytes {
156+
err := cs.prune()
157+
if err != nil {
158+
return nil, err
159+
}
160+
}
161+
}
162+
163+
iter, err := cs.db.NewIter(nil)
164+
if err != nil {
165+
return nil, err
166+
}
167+
defer iter.Close()
168+
if iter.Last() && iter.Valid() {
169+
distance := iter.Key()
170+
dis := uint256.NewInt(0)
171+
err = dis.UnmarshalSSZ(distance)
172+
if err != nil {
173+
return nil, err
174+
}
175+
cs.radius.Store(dis)
176+
}
177+
return cs, nil
178+
}
179+
180+
// Get implements storage.ContentStorage.
181+
func (c *ContentStorage) Get(contentKey []byte, contentId []byte) ([]byte, error) {
182+
distance := xor(contentId, c.nodeId[:])
183+
data, closer, err := c.db.Get(distance)
184+
if err != nil && err != pebble.ErrNotFound {
185+
return nil, err
186+
}
187+
if err == pebble.ErrNotFound {
188+
return nil, storage.ErrContentNotFound
189+
}
190+
closer.Close()
191+
return data, nil
192+
}
193+
194+
// Put implements storage.ContentStorage.
195+
func (c *ContentStorage) Put(contentKey []byte, contentId []byte, content []byte) error {
196+
distance := xor(contentId, c.nodeId[:])
197+
valid, err := c.inRadius(distance)
198+
if err != nil {
199+
return err
200+
}
201+
if !valid {
202+
return storage.ErrInsufficientRadius
203+
}
204+
length := uint64(len(contentId)) + uint64(len(content))
205+
newSize := c.size.Add(length)
206+
207+
buf := c.bytePool.Get().(*[]byte)
208+
defer c.bytePool.Put(buf)
209+
binary.BigEndian.PutUint64(*buf, newSize)
210+
batch := c.db.NewBatch()
211+
212+
err = batch.Set(storage.SizeKey, *buf, c.writeOptions)
213+
if err != nil {
214+
return err
215+
}
216+
err = batch.Set(distance, content, c.writeOptions)
217+
if err != nil {
218+
return err
219+
}
220+
err = batch.Commit(c.writeOptions)
221+
if err != nil {
222+
return err
223+
}
224+
225+
if newSize > c.storageCapacityInBytes {
226+
err := c.prune()
227+
if err != nil {
228+
return err
229+
}
230+
}
231+
return nil
232+
}
233+
234+
// Radius implements storage.ContentStorage.
235+
func (c *ContentStorage) Radius() *uint256.Int {
236+
radius := c.radius.Load()
237+
val := radius.(*uint256.Int)
238+
return val
239+
}
240+
241+
func (c *ContentStorage) Close() error {
242+
return c.db.Close()
243+
}
244+
245+
func (c *ContentStorage) prune() error {
246+
expectSize := uint64(float64(c.storageCapacityInBytes) * contentDeletionFraction)
247+
var curentSize uint64 = 0
248+
249+
// get the keys to be deleted order by distance desc
250+
iter, err := c.db.NewIter(nil)
251+
if err != nil {
252+
return err
253+
}
254+
255+
batch := c.db.NewBatch()
256+
for iter.Last(); iter.Valid(); iter.Prev() {
257+
if bytes.Equal(iter.Key(), storage.SizeKey) {
258+
continue
259+
}
260+
if curentSize < expectSize {
261+
err := batch.Delete(iter.Key(), nil)
262+
if err != nil {
263+
return err
264+
}
265+
curentSize += uint64(len(iter.Key())) + uint64(len(iter.Value()))
266+
} else {
267+
distance := iter.Key()
268+
dis := uint256.NewInt(0)
269+
err = dis.UnmarshalSSZ(distance)
270+
if err != nil {
271+
return err
272+
}
273+
c.radius.Store(dis)
274+
break
275+
}
276+
}
277+
newSize := c.size.Add(-curentSize)
278+
buf := c.bytePool.Get().(*[]byte)
279+
defer c.bytePool.Put(buf)
280+
binary.BigEndian.PutUint64(*buf, newSize)
281+
err = batch.Set(storage.SizeKey, *buf, c.writeOptions)
282+
if err != nil {
283+
return err
284+
}
285+
err = batch.Commit(&pebble.WriteOptions{Sync: true})
286+
if err != nil {
287+
return err
288+
}
289+
go func() {
290+
start := uint256.NewInt(0).Bytes32()
291+
end := storage.MaxDistance.Bytes32()
292+
err := c.db.Compact(start[:], end[:], true)
293+
if err != nil {
294+
c.log.Error("compact err %v", err)
295+
}
296+
}()
297+
return nil
298+
}
299+
300+
func (c *ContentStorage) inRadius(distance []byte) (bool, error) {
301+
dis := uint256.NewInt(0)
302+
err := dis.UnmarshalSSZ(distance)
303+
if err != nil {
304+
return false, err
305+
}
306+
val := c.radius.Load()
307+
radius := val.(*uint256.Int)
308+
return radius.Gt(dis), nil
309+
}
310+
311+
func xor(contentId, nodeId []byte) []byte {
312+
// length of contentId maybe not 32bytes
313+
padding := make([]byte, 32)
314+
if len(contentId) != len(nodeId) {
315+
copy(padding, contentId)
316+
} else {
317+
padding = contentId
318+
}
319+
res := make([]byte, len(padding))
320+
for i := range padding {
321+
res[i] = padding[i] ^ nodeId[i]
322+
}
323+
return res
324+
}

0 commit comments

Comments
 (0)