Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added long OOO writes support for cwhisper for single retention metrics (WIP) #40

Open
wants to merge 2 commits into
base: aguzun/shortOOOforSingleArchive
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (whisper *Whisper) WriteHeaderCompressed() (err error) {
i += FreeCompressedArchiveInfoSize - mixSpecSize

if FreeCompressedArchiveInfoSize < mixSpecSize {
panic("out of FreeCompressedArchiveInfoSize") // a panic that should never happens
panic("out of FreeCompressedArchiveInfoSize") // a panic that should never happen
}
}

Expand All @@ -137,7 +137,11 @@ func (whisper *Whisper) WriteHeaderCompressed() (err error) {
i += copy(b[i:], archive.buffer)
}
}

for _, oooArchive := range whisper.oooArchives {
i += packInt(b, oooArchive.offset, i)
i += packInt(b, oooArchive.secondsPerPoint, i)
i += packInt(b, oooArchive.numberOfPoints, i)
}
whisper.crc32 = crc32(b, 0)
packInt(b, int(whisper.crc32), whisper.crc32Offset())

Expand All @@ -152,6 +156,7 @@ func (whisper *Whisper) WriteHeaderCompressed() (err error) {
}

func (whisper *Whisper) readHeaderCompressed() (err error) {
// TODO check if we can do everything in one read
if _, err := whisper.file.Seek(int64(len(compressedMagicString)), 0); err != nil {
return err
}
Expand Down Expand Up @@ -290,7 +295,16 @@ func (whisper *Whisper) readHeaderCompressed() (err error) {
return fmt.Errorf("unable to read archive %d buffer: readed = %d want = %d", i, readed, arc.bufferSize)
}
}

// reading oooArchives info, works only for single archive metrics
if whisper.compVersion >= CompVersionLongOOOSingleArchive && len(whisper.archives) == 1 {
b = make([]byte, ArchiveInfoSize)
readed, err = whisper.file.Read(b)
if err != nil || readed != ArchiveInfoSize {
err = fmt.Errorf("unable to read ooo archive metadata: %s", err)
return
}
whisper.oooArchives = append(whisper.oooArchives, unpackArchiveInfo(b))
}
return nil
}

Expand Down Expand Up @@ -473,6 +487,7 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points

baseIntervalsPerUnit, currentUnit, minInterval := archive.getBufferInfo()
bufferUnitPointsCount := whisper.bufferUnitPointsCount(archive)
var oooDataPoints []dataPoint
for aindex := 0; aindex < len(alignedPoints); {
dp := alignedPoints[aindex]
dpBaseInterval := archive.AggregateInterval(dp.interval)
Expand All @@ -482,6 +497,7 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points
if minInterval != 0 && dpBaseInterval < minInterval { // TODO: check against cblock pn1.interval?
archive.stats.discard.oldInterval++
aindex++
oooDataPoints = append(oooDataPoints, dp)
continue
}

Expand Down Expand Up @@ -559,6 +575,13 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points
}
}
}
if len(oooDataPoints) > 0 && len(whisper.oooArchives) != 0 && whisper.oooArchives[0].SecondsPerPoint() == archive.SecondsPerPoint() {
// updating OOO archive, no propagation because OOO archive exits only for single archive
if err := whisper.archiveUpdateManyDataPoints(whisper.oooArchives[0], oooDataPoints, false); err != nil {
return err
}
archive.stats.discard.oldInterval -= uint32(len(oooDataPoints))
}

return nil
}
Expand Down
62 changes: 59 additions & 3 deletions compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,6 @@ func TestCompressedWhisperBufferOOOWrite(t *testing.T) {
points = append(points, p)
}
}

if diff := cmp.Diff(points, []TimeSeriesPoint{
{Time: 1544476080, Value: 666}, {Time: 1544476140, Value: 666}, {Time: 1544476200, Value: 666},
{Time: 1544476260, Value: 666}, {Time: 1544476320, Value: 666}, {Time: 1544477340, Value: 666},
Expand Down Expand Up @@ -842,7 +841,7 @@ func TestCompressedWhisperSingleRetentionOutOfOrderWrite(t *testing.T) {
}
cwhisper.UpdateMany(points)

// buffer us flushed, can't accept OOO data not within the buffer
// buffer is flushed, can accept OOO data in OOO archive only

cwhisper.UpdateMany([]*TimeSeriesPoint{
{Value: 1000, Time: now + 1},
Expand All @@ -854,7 +853,7 @@ func TestCompressedWhisperSingleRetentionOutOfOrderWrite(t *testing.T) {
}
if got, want := data.Points(), []TimeSeriesPoint{
{Time: now + 0, Value: 1},
{Time: now + 1, Value: 0},
{Time: now + 1, Value: 1000},
{Time: now + 2, Value: 1},
}; !reflect.DeepEqual(got, want) {
t.Errorf("data.Points() = %v; want %v", got, want)
Expand Down Expand Up @@ -926,6 +925,63 @@ func TestCompressTo(t *testing.T) {
}
}

func TestCompressToSingleArchive(t *testing.T) {
fpath := "compress_to_single_archive.wsp"
os.Remove(fpath)

whisper, err := CreateWithOptions(
fpath,
[]*Retention{
{secondsPerPoint: 1, numberOfPoints: 172800}, // 1s:2d
},
Average,
0,
&Options{Compressed: false, PointsPerBlock: 7200, InMemory: true},
)
if err != nil {
panic(err)
}
whisper.Close()
archive := whisper.archives[0]
var ps []*TimeSeriesPoint
for i := 0; i < archive.numberOfPoints; i++ {
start := Now().Add(time.Second * time.Duration(archive.secondsPerPoint*i) * -1)
ps = append(ps, &TimeSeriesPoint{
// Time: int(start.Add(time.Duration(i) * time.Second).Unix()),
Time: int(start.Unix()),
// Value: float64(i),
// Value: 2000.0 + float64(rand.Intn(100000))/100.0, // skipcq: GSC-G404
// Value: rand.NormFloat64(), // skipcq: GSC-G404
Value: float64(rand.Intn(100000)), // skipcq: GSC-G404
})
}
whisper, err = OpenWithOptions(fpath, &Options{InMemory: true})
if err != nil {
t.Fatal(err)
}
if err := whisper.UpdateMany(ps); err != nil {
t.Fatal(err)
}
if err := whisper.Close(); err != nil {
t.Fatal(err)
}
whisper.file.(*memFile).dumpOnDisk(fpath)

whisper, err = OpenWithOptions(fpath, &Options{})
if err != nil {
t.Fatal(err)
}
os.Remove(fpath + ".cwsp")
if err := whisper.CompressTo(fpath + ".cwsp"); err != nil {
t.Fatal(err)
}

t.Log("go", "run", "cmd/compare.go", "-v", fpath, fpath+".cwsp")
output, err := Compare(fpath, fpath+".cwsp", 0, false, "", false, false, 2)
if err != nil {
t.Fatalf("%s: %s", err, output)
}
}
func TestRandomReadWrite(t *testing.T) {
// os.Remove("test_random_read_write.wsp")
fileTs := time.Now().Unix()
Expand Down
10 changes: 8 additions & 2 deletions debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func (whisper *Whisper) Dump(all, showDecompressionInfo bool) {
}
fmt.Printf("archives.%d.retention: %s%s\n", i, arc.Retention, agg)
}

for i, arc := range whisper.archives {
fmt.Printf("\nArchive %d info:\n", i)
if whisper.compressed {
Expand All @@ -103,11 +102,18 @@ func (whisper *Whisper) Dump(all, showDecompressionInfo bool) {
arc.dumpInfoStandard()
}
}
if len(whisper.oooArchives) > 0 {
fmt.Printf("\nOOO archive %d info:\n", 0)
whisper.oooArchives[0].dumpInfoStandard()
}

if !all {
return
}

if len(whisper.oooArchives) > 0 {
fmt.Printf("\nOOO archive %d data:\n", 0)
whisper.dumpDataPointsStandard(whisper.oooArchives[0])
}
for i, arc := range whisper.archives {
fmt.Printf("\nArchive %d data:\n", i)
if whisper.compressed {
Expand Down
Loading