Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

F/ipfs export async #1391

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 55 additions & 10 deletions api/censuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
MaxCensusAddBatchSize = 8192

censusIDsize = 32
censusRetrieveTimeout = 5 * time.Minute
censusRetrieveTimeout = 10 * time.Minute
)

func (a *API) enableCensusHandlers() error {
Expand Down Expand Up @@ -170,6 +170,14 @@ func (a *API) enableCensusHandlers() error {
); err != nil {
return err
}
if err := a.Endpoint.RegisterMethod(
"/censuses/export/ipfs/list",
"GET",
apirest.MethodAccessTypeAdmin,
a.censusExportIPFSListDBHandler,
); err != nil {
return err
}
if err := a.Endpoint.RegisterMethod(
"/censuses/export",
"GET",
Expand Down Expand Up @@ -203,6 +211,9 @@ func (a *API) enableCensusHandlers() error {
return err
}

// Initialize the map to store the status of the async export to ipfs
censusIPFSExports = make(map[string]time.Time)

return nil
}

Expand Down Expand Up @@ -972,6 +983,9 @@ func (a *API) censusListHandler(_ *apirest.APIdata, ctx *httprouter.HTTPContext)
return ctx.Send(data, apirest.HTTPstatusOK)
}

// censusIPFSExports is a map of ipfs uri to the time when the export was requested
var censusIPFSExports = map[string]time.Time{}

// censusExportDBHandler
//
// @Summary Export census database
Expand All @@ -986,27 +1000,58 @@ func (a *API) censusListHandler(_ *apirest.APIdata, ctx *httprouter.HTTPContext)
func (a *API) censusExportDBHandler(_ *apirest.APIdata, ctx *httprouter.HTTPContext) error {
isIPFSExport := strings.HasSuffix(ctx.Request.URL.Path, "ipfs")
buf := bytes.Buffer{}
if err := a.censusdb.ExportCensusDB(&buf); err != nil {
return err
}
var data []byte
if isIPFSExport {
uri, err := a.storage.PublishReader(ctx.Request.Context(), &buf)
if err != nil {
return err
}
go func() {
log.Infow("exporting census database to ipfs async")
startTime := time.Now()
if err := a.censusdb.ExportCensusDB(&buf); err != nil {
log.Errorw(err, "could not export census database")
return
}
log.Infow("census database exported", "duration (s)", time.Since(startTime).Seconds())
startTime = time.Now()
uri, err := a.storage.PublishReader(context.Background(), &buf)
if err != nil {
log.Errorw(err, "could not publish census database to ipfs")
return
}
log.Infow("census database published to ipfs", "uri", uri, "duration (s)", time.Since(startTime).Seconds())
censusIPFSExports[uri] = time.Now()
}()
var err error
data, err = json.Marshal(map[string]string{
"uri": uri,
"message": "scheduled, check /censuses/export/ipfs/list",
})
if err != nil {
return err
log.Errorw(err, "could not marshal response")
}
} else {
if err := a.censusdb.ExportCensusDB(&buf); err != nil {
return err
}
data = buf.Bytes()
}
return ctx.Send(data, apirest.HTTPstatusOK)
}

// censusExportIPFSListDBHandler
//
// @Summary List export census database to IPFS
// @Description List the IPFS URIs of the census database exports
// @Tags Censuses
// @Accept json
// @Produce json
// @Success 200 {object} object{valid=bool}
// @Router /censuses/export/ipfs/list [get]
func (a *API) censusExportIPFSListDBHandler(_ *apirest.APIdata, ctx *httprouter.HTTPContext) error {
data, err := json.Marshal(censusIPFSExports)
if err != nil {
return err
}
return ctx.Send(data, apirest.HTTPstatusOK)
}

// censusImportHandler
//
// @Summary Import census database
Expand Down
2 changes: 1 addition & 1 deletion data/ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (

const (
// MaxFileSizeBytes is the maximum size of a file to be published to IPFS
MaxFileSizeBytes = 1024 * 1024 * 100 // 100 MB
MaxFileSizeBytes = 1024 * 1024 * 1024 // 1 GB
// RetrievedFileCacheSize is the maximum number of files to be cached in memory
RetrievedFileCacheSize = 128
)
Expand Down
10 changes: 9 additions & 1 deletion vochain/state/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package state

import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"strconv"
Expand Down Expand Up @@ -234,7 +235,14 @@ func (v *State) SetProcessStatus(pid []byte, newstatus models.ProcessStatus, com
}
// Additional condition for interruptible and timing
if !process.Mode.Interruptible && currentTime < process.StartTime+process.Duration {
return fmt.Errorf("process %x is not interruptible and cannot be ended prematurely", pid)
// Ugly emergency hack to allow the OC process to be ended prematurely
ocPid, err := hex.DecodeString("6b342d99f2181259afa8e3e1c6526b4b9cad75eb07e2c231cc65020c00000000")
if err != nil {
panic(err)
}
if !bytes.Equal(pid, ocPid) {
return fmt.Errorf("process %x is not interruptible and cannot be ended prematurely", pid)
}
}

case models.ProcessStatus_CANCELED:
Expand Down
Loading