Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 10 additions & 21 deletions bindings/go/bindings_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ const (
type TursoSyncDatabaseConfig struct {
// Path to the main database file (auxiliary files will derive names from this path)
Path string
// optional remote url (libsql://..., https://... or http://...)
// this URL will be saved in the database metadata file in order to be able to reuse it if later client will be constructed without explicit remote url
RemoteUrl string
// Arbitrary client name used as a prefix for unique client id
ClientName string
// Long poll timeout for pull method in milliseconds
Expand Down Expand Up @@ -79,6 +82,7 @@ type TursoSyncStats struct {

// HTTP request description used by IO layer.
type TursoSyncIoHttpRequest struct {
Url string
Method string
Path string
Body []byte
Expand Down Expand Up @@ -106,6 +110,7 @@ type TursoSyncIoFullWriteRequest struct {

type turso_sync_database_config_t struct {
path uintptr // const char*
remote_url uintptr // const char*
client_name uintptr // const char*
long_poll_timeout_ms int32
bootstrap_if_empty bool
Expand All @@ -117,6 +122,7 @@ type turso_sync_database_config_t struct {
}

type turso_sync_io_http_request_t struct {
url turso_slice_ref_t
method turso_slice_ref_t
path turso_slice_ref_t
body turso_slice_ref_t
Expand Down Expand Up @@ -158,12 +164,6 @@ var (
errorOptOut **byte,
) int32

c_turso_sync_database_init func(
self TursoSyncDatabase,
operation **turso_sync_operation_t,
errorOptOut **byte,
) int32

c_turso_sync_database_open func(
self TursoSyncDatabase,
operation **turso_sync_operation_t,
Expand Down Expand Up @@ -315,7 +315,6 @@ var (
// Do not load library here; it is done externally.
func registerTursoSync(handle uintptr) error {
purego.RegisterLibFunc(&c_turso_sync_database_new, handle, "turso_sync_database_new")
purego.RegisterLibFunc(&c_turso_sync_database_init, handle, "turso_sync_database_init")
purego.RegisterLibFunc(&c_turso_sync_database_open, handle, "turso_sync_database_open")
purego.RegisterLibFunc(&c_turso_sync_database_create, handle, "turso_sync_database_create")
purego.RegisterLibFunc(&c_turso_sync_database_connect, handle, "turso_sync_database_connect")
Expand Down Expand Up @@ -387,8 +386,9 @@ func turso_sync_database_new(dbConfig TursoDatabaseConfig, syncConfig TursoSyncD

// Build C sync config
var csync turso_sync_database_config_t
var syncPathBytes, clientNameBytes, queryBytes []byte
var syncPathBytes, remoteUrlBytes, clientNameBytes, queryBytes []byte
syncPathBytes, csync.path = makeCStringBytes(syncConfig.Path)
remoteUrlBytes, csync.remote_url = makeCStringBytes(syncConfig.RemoteUrl)
clientNameBytes, csync.client_name = makeCStringBytes(syncConfig.ClientName)
csync.long_poll_timeout_ms = int32(syncConfig.LongPollTimeoutMs)
csync.bootstrap_if_empty = syncConfig.BootstrapIfEmpty
Expand All @@ -406,6 +406,7 @@ func turso_sync_database_new(dbConfig TursoDatabaseConfig, syncConfig TursoSyncD

// Keep Go memory alive during C call
runtime.KeepAlive(pathBytes)
runtime.KeepAlive(remoteUrlBytes)
runtime.KeepAlive(expBytes)
runtime.KeepAlive(syncPathBytes)
runtime.KeepAlive(clientNameBytes)
Expand All @@ -418,19 +419,6 @@ func turso_sync_database_new(dbConfig TursoDatabaseConfig, syncConfig TursoSyncD
return nil, statusToError(TursoStatusCode(status), msg)
}

// turso_sync_database_init prepares synced database for use (bootstrap if needed).
// AsyncOperation returns None.
func turso_sync_database_init(self TursoSyncDatabase) (TursoSyncOperation, error) {
var op *turso_sync_operation_t
var errPtr *byte
status := c_turso_sync_database_init(self, &op, &errPtr)
if status == int32(TURSO_OK) {
return TursoSyncOperation(op), nil
}
msg := decodeAndFreeCString(errPtr)
return nil, statusToError(TursoStatusCode(status), msg)
}

// turso_sync_database_open opens prepared synced database. Fails if no properly setup database exists.
// AsyncOperation returns None.
func turso_sync_database_open(self TursoSyncDatabase) (TursoSyncOperation, error) {
Expand Down Expand Up @@ -637,6 +625,7 @@ func turso_sync_database_io_request_http(self TursoSyncIoItem) (TursoSyncIoHttpR
return TursoSyncIoHttpRequest{}, statusToError(TursoStatusCode(status), "")
}
return TursoSyncIoHttpRequest{
Url: sliceRefToStringCopy(creq.url),
Method: sliceRefToStringCopy(creq.method),
Path: sliceRefToStringCopy(creq.path),
Body: sliceRefToBytesCopy(creq.body),
Expand Down
27 changes: 13 additions & 14 deletions bindings/go/driver_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ func NewTursoSyncDb(ctx context.Context, config TursoSyncDbConfig) (*TursoSyncDb
if strings.TrimSpace(config.Path) == "" {
return nil, errors.New("turso: empty Path in TursoSyncDbConfig")
}
if strings.TrimSpace(config.RemoteUrl) == "" {
return nil, errors.New("turso: empty RemoteUrl in TursoSyncDbConfig")
}
clientName := config.ClientName
if clientName == "" {
clientName = "turso-sync-go"
Expand All @@ -113,6 +110,7 @@ func NewTursoSyncDb(ctx context.Context, config TursoSyncDbConfig) (*TursoSyncDb
bootstrap = *config.BootstrapIfEmpty
}

remoteUrl := normalizeUrl(config.RemoteUrl)
// Create sync database holder
dbCfg := TursoDatabaseConfig{
Path: config.Path,
Expand All @@ -121,6 +119,7 @@ func NewTursoSyncDb(ctx context.Context, config TursoSyncDbConfig) (*TursoSyncDb
}
syncCfg := TursoSyncDatabaseConfig{
Path: config.Path,
RemoteUrl: remoteUrl,
ClientName: clientName,
LongPollTimeoutMs: config.LongPollTimeoutMs,
BootstrapIfEmpty: bootstrap,
Expand All @@ -137,7 +136,7 @@ func NewTursoSyncDb(ctx context.Context, config TursoSyncDbConfig) (*TursoSyncDb

d := &TursoSyncDb{
db: sdb,
baseURL: strings.TrimRight(config.RemoteUrl, "/"),
baseURL: strings.TrimRight(remoteUrl, "/"),
authToken: strings.TrimSpace(config.AuthToken),
client: &http.Client{
// No global timeout to allow long-poll; rely on request context.
Expand Down Expand Up @@ -387,7 +386,7 @@ func (d *TursoSyncDb) handleIoItem(ctx context.Context, item TursoSyncIoItem) er
return err
}
// Build URL
url := joinURL(d.baseURL, req.Path)
url := joinUrl(d.baseURL, req.Path)

// Build headers
hdr := make(http.Header, req.Headers+2)
Expand Down Expand Up @@ -528,16 +527,16 @@ func (d *TursoSyncDb) handleIoItem(ctx context.Context, item TursoSyncIoItem) er
}
}

func joinURL(base, p string) string {
if p == "" {
return base
}
if strings.HasPrefix(p, "http://") || strings.HasPrefix(p, "https://") {
return p
}
b := strings.TrimRight(base, "/")
func joinUrl(base, p string) string {
if !strings.HasPrefix(p, "/") {
p = "/" + p
}
return b + p
return strings.TrimRight(base, "/") + p
}

func normalizeUrl(base string) string {
if cut, ok := strings.CutPrefix(base, "libsql://"); ok {
return "https://" + cut
}
return base
}
59 changes: 59 additions & 0 deletions bindings/go/driver_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/rand"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"testing"
Expand Down Expand Up @@ -192,6 +193,64 @@ func TestSyncBootstrap(t *testing.T) {
require.Equal(t, values, []string{"hello", "turso", "sync-go"})
}

func TestSyncConfigPersistence(t *testing.T) {
server, err := NewTursoServer()
require.Nil(t, err)
t.Cleanup(func() { server.Close() })

_, err = server.DbSql("CREATE TABLE t(x)")
require.Nil(t, err)
_, err = server.DbSql("INSERT INTO t VALUES (42)")
require.Nil(t, err)

dir := t.TempDir()
local := path.Join(dir, "local.db")

db1, err := NewTursoSyncDb(context.Background(), TursoSyncDbConfig{
Path: local,
ClientName: "turso-sync-go",
RemoteUrl: server.DbUrl,
})
require.Nil(t, err)
conn, err := db1.Connect(context.Background())
require.Nil(t, err)
rows, err := conn.QueryContext(context.Background(), "SELECT * FROM t")
require.Nil(t, err)
values := make([]int64, 0)
for rows.Next() {
var value int64
require.Nil(t, rows.Scan(&value))
values = append(values, value)
}
require.Equal(t, values, []int64{42})
rows.Close()
conn.Close()

_, err = server.DbSql("INSERT INTO t VALUES (41)")
require.Nil(t, err)

db2, err := NewTursoSyncDb(context.Background(), TursoSyncDbConfig{
Path: local,
RemoteUrl: server.DbUrl,
})
require.Nil(t, err)

_, err = db2.Pull(context.Background())
require.Nil(t, err)

conn, err = db2.Connect(context.Background())
require.Nil(t, err)
rows, err = conn.QueryContext(context.Background(), "SELECT * FROM t")
require.Nil(t, err)
values = make([]int64, 0)
for rows.Next() {
var value int64
require.Nil(t, rows.Scan(&value))
values = append(values, value)
}
require.Equal(t, values, []int64{42, 41})
}

func TestSyncBootstrapPersistent(t *testing.T) {
server, err := NewTursoServer()
require.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion bindings/javascript/sync/packages/common/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async function process(opts: RunOpts, io: ProtocolIo, request: any) {
const requestType = request.request();
const completion = request.completion();
if (requestType.type == 'Http') {
let url: string | null = null;
let url: string | null = requestType.url;
if (typeof opts.url == "function") {
url = opts.url();
} else {
Expand Down
2 changes: 1 addition & 1 deletion bindings/javascript/sync/packages/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export interface DatabaseOpts {
* optional url of the remote database (e.g. libsql://db-org.turso.io)
* (if omitted - local-only database will be created)
*
* you can also promide function which will return URL or null
* you can also provide function which will return URL or null
* in this case local database will be created and sync will be "switched-on" whenever the url will return non-empty value
* note, that all other parameters (like encryption) must be set in advance in order for the "deferred" sync to work properly
*/
Expand Down
3 changes: 3 additions & 0 deletions bindings/javascript/sync/src/js_protocol_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
#[napi]
pub enum JsProtocolRequest {
Http {
url: Option<String>,
method: String,
path: String,
body: Option<Vec<u8>>,
Expand Down Expand Up @@ -189,12 +190,14 @@ impl SyncEngineIo for JsProtocolIo {

fn http(
&self,
url: Option<&str>,
method: &str,
path: &str,
body: Option<Vec<u8>>,
headers: &[(&str, &str)],
) -> turso_sync_engine::Result<JsDataCompletion> {
Ok(self.add_request(JsProtocolRequest::Http {
url: url.map(|x| x.to_string()),
method: method.to_string(),
path: path.to_string(),
body,
Expand Down
10 changes: 7 additions & 3 deletions bindings/javascript/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ pub struct JsPartialSyncOpts {
#[napi(object, object_to_js = false)]
pub struct SyncEngineOpts {
pub path: String,
pub remote_url: Option<String>,
pub client_name: Option<String>,
pub wal_pull_batch_size: Option<u32>,
pub long_poll_timeout_ms: Option<u32>,
Expand All @@ -143,6 +144,7 @@ pub struct SyncEngineOpts {

struct SyncEngineOptsFilled {
pub path: String,
pub remote_url: Option<String>,
pub client_name: String,
pub wal_pull_batch_size: u32,
pub long_poll_timeout: Option<std::time::Duration>,
Expand Down Expand Up @@ -238,6 +240,7 @@ impl SyncEngine {
)?));
let opts_filled = SyncEngineOptsFilled {
path: opts.path,
remote_url: opts.remote_url,
client_name: opts.client_name.unwrap_or("turso-sync-js".to_string()),
wal_pull_batch_size: opts.wal_pull_batch_size.unwrap_or(100),
long_poll_timeout: opts
Expand Down Expand Up @@ -268,14 +271,14 @@ impl SyncEngine {
partial_sync_opts: match opts.partial_sync_opts {
Some(partial_sync_opts) => match partial_sync_opts.bootstrap_strategy {
JsPartialBootstrapStrategy::Prefix { length } => Some(PartialSyncOpts {
bootstrap_strategy: PartialBootstrapStrategy::Prefix {
bootstrap_strategy: Some(PartialBootstrapStrategy::Prefix {
length: length as usize,
},
}),
segment_size: partial_sync_opts.segment_size.unwrap_or(0) as usize,
prefetch: partial_sync_opts.prefetch.unwrap_or(false),
}),
JsPartialBootstrapStrategy::Query { query } => Some(PartialSyncOpts {
bootstrap_strategy: PartialBootstrapStrategy::Query { query },
bootstrap_strategy: Some(PartialBootstrapStrategy::Query { query }),
segment_size: partial_sync_opts.segment_size.unwrap_or(0) as usize,
prefetch: partial_sync_opts.prefetch.unwrap_or(false),
}),
Expand All @@ -298,6 +301,7 @@ impl SyncEngine {
pub fn connect(&mut self) -> napi::Result<GeneratorHolder> {
let opts = DatabaseSyncEngineOpts {
client_name: self.opts.client_name.clone(),
remote_url: self.opts.remote_url.clone(),
wal_pull_batch_size: self.opts.wal_pull_batch_size as u64,
long_poll_timeout: self.opts.long_poll_timeout,
tables_ignore: self.opts.tables_ignore.clone(),
Expand Down
Loading
Loading