Skip to content

Commit 6f2e122

Browse files
committed
Merge branch 'main' into bld-debug-flaky-go-tests
2 parents f2db619 + 364957e commit 6f2e122

File tree

13 files changed

+258
-49
lines changed

13 files changed

+258
-49
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
name: Tilt Setup & Pre-Build
2+
description: Set up Tilt prereqs and pre-build/pull images
3+
runs:
4+
using: "composite"
5+
steps:
6+
- name: Start minikube
7+
uses: medyagh/setup-minikube@latest
8+
with:
9+
driver: none # uses Docker engine on host instead of Docker-in-Docker
10+
# tilt ci can automatically build images while bringing up the cluster. However, this results in flaky runs for our usage so we split up image building and pod deployment into separate steps. See https://github.com/chroma-core/chroma/pull/4720.
11+
- name: Install Tilt, bake images, pre-pull external images
12+
shell: bash
13+
env:
14+
TILT_VERSION: "0.34.2"
15+
run: |
16+
parallel --tag --linebuffer ::: \
17+
"bash -c 'curl -fsSL https://github.com/tilt-dev/tilt/releases/download/v${TILT_VERSION}/tilt.${TILT_VERSION}.linux.x86_64.tar.gz | tar -xzv -C /usr/local/bin tilt'" \
18+
"docker buildx bake -f ${{ github.action_path }}/docker-bake.hcl --load" \
19+
"bash ${{ github.action_path }}/pull_external_images.sh"
20+
working-directory: ${{ github.action_path }}/../../../ # this allows other repos to reuse this workflow when this repo may not be the current working directory

.github/actions/tilt/action.yaml

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,8 @@ description: "This action starts Tilt services"
33
runs:
44
using: "composite"
55
steps:
6-
- name: Start minikube
7-
uses: medyagh/setup-minikube@latest
8-
with:
9-
driver: none # uses Docker engine on host instead of Docker-in-Docker
10-
# tilt ci can automatically build images while bringing up the cluster. However, this results in flaky runs for our usage so we split up image building and pod deployment into separate steps. See https://github.com/chroma-core/chroma/pull/4720.
11-
- name: Install Tilt, bake images, pre-pull external images
12-
shell: bash
13-
env:
14-
TILT_VERSION: "0.34.2"
15-
run: |
16-
parallel --tag --linebuffer ::: \
17-
"bash -c 'curl -fsSL https://github.com/tilt-dev/tilt/releases/download/v${TILT_VERSION}/tilt.${TILT_VERSION}.linux.x86_64.tar.gz | tar -xzv -C /usr/local/bin tilt'" \
18-
"docker buildx bake -f .github/actions/tilt/docker-bake.hcl --load" \
19-
"bash .github/actions/tilt/pull_external_images.sh"
6+
- name: Tilt Setup & Pre-Build
7+
uses: ./.github/actions/tilt-setup-prebuild
208
- name: Start Tilt
219
shell: bash
2210
run: tilt ci

chromadb/test/utils/wait_for_version_increase.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ def wait_for_version_increase(
2323
while curr_version == initial_version:
2424
time.sleep(TIMEOUT_INTERVAL)
2525
if time.time() - initial_time > timeout:
26-
raise TimeoutError("Model was not updated in time")
26+
collection_id = client.get_collection(collection_name).id
27+
raise TimeoutError(f"Model was not updated in time for {collection_id}")
2728
curr_version = get_collection_version(client, collection_name)
2829

2930
return curr_version

go/pkg/log/repository/log.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,13 @@ func (r *LogRepository) ForkRecords(ctx context.Context, sourceCollectionID stri
168168
}
169169
}()
170170

171+
// NOTE(rescrv): Only sourceInfo.IsSealed should be used on this struct.
172+
var sourceInfo log.Collection
173+
sourceInfo, err = queriesWithTx.GetCollection(ctx, sourceCollectionID)
174+
if err != nil {
175+
sourceInfo.IsSealed = false
176+
}
177+
171178
var sourceBounds log.GetBoundsForCollectionRow
172179
sourceBounds, err = queriesWithTx.GetBoundsForCollection(ctx, sourceCollectionID)
173180
if err != nil {
@@ -207,6 +214,7 @@ func (r *LogRepository) ForkRecords(ctx context.Context, sourceCollectionID stri
207214
ID: targetCollectionID,
208215
RecordCompactionOffsetPosition: int64(compactionOffset),
209216
RecordEnumerationOffsetPosition: int64(enumerationOffset),
217+
IsSealed: sourceInfo.IsSealed,
210218
})
211219
if err != nil {
212220
trace_log.Error("Error in updating offset for target collection", zap.Error(err), zap.String("collectionId", targetCollectionID))

go/pkg/log/repository/log_test.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,11 @@ func (suite *LogTestSuite) SetupSuite() {
3030
connectionString, err := libs2.StartPgContainer(ctx)
3131
config.DATABASE_URL = connectionString
3232
assert.NoError(suite.t, err, "Failed to start pg container")
33-
34-
err = libs2.RunMigration(ctx, connectionString)
35-
assert.NoError(suite.t, err, "Failed to run migration")
36-
37-
// Create connection AFTER migration to ensure schema visibility
3833
var conn *pgxpool.Pool
3934
conn, err = libs2.NewPgConnection(ctx, config)
4035
assert.NoError(suite.t, err, "Failed to create new pg connection")
41-
36+
err = libs2.RunMigration(ctx, connectionString)
37+
assert.NoError(suite.t, err, "Failed to run migration")
4238
suite.sysDb = sysdb.NewMockSysDB()
4339
suite.lr = NewLogRepository(conn, suite.sysDb)
4440
}
@@ -120,17 +116,17 @@ func (suite *LogTestSuite) TestUniqueConstraintPushLogs() {
120116
func (suite *LogTestSuite) TestSealedLogWontPush() {
121117
ctx := context.Background()
122118
collectionId := types.NewUniqueID()
123-
params := log.InsertCollectionParams {
124-
ID: collectionId.String(),
119+
params := log.InsertCollectionParams{
120+
ID: collectionId.String(),
125121
RecordEnumerationOffsetPosition: 1,
126-
RecordCompactionOffsetPosition: 0,
122+
RecordCompactionOffsetPosition: 0,
127123
}
128124
_, err := suite.lr.queries.InsertCollection(ctx, params)
129125
assert.NoError(suite.t, err, "Initializing log should not fail.")
130126
_, err = suite.lr.queries.SealLog(ctx, collectionId.String())
131127
assert.NoError(suite.t, err, "Sealing log should not fail.")
132128
var isSealed bool
133-
_, isSealed, err = suite.lr.InsertRecords(ctx, collectionId.String(), [][]byte{{1,2,3}})
129+
_, isSealed, err = suite.lr.InsertRecords(ctx, collectionId.String(), [][]byte{{1, 2, 3}})
134130
assert.NoError(suite.t, err, "Failed to push logs")
135131
assert.True(suite.t, isSealed, "Did not report was sealed")
136132
}

go/pkg/log/store/db/queries.sql.go

Lines changed: 26 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/pkg/log/store/queries/queries.sql

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ FROM collection
44
WHERE id = $1
55
FOR UPDATE;
66

7+
-- name: GetCollection :one
8+
SELECT *
9+
FROM collection
10+
WHERE id = $1;
11+
712
-- name: InsertRecord :copyfrom
813
INSERT INTO record_log (collection_id, "offset", record, timestamp) values($1, $2, $3, $4);
914

@@ -30,7 +35,7 @@ UPDATE collection set record_compaction_offset_position = $2 where id = $1;
3035
UPDATE collection set record_enumeration_offset_position = $2 where id = $1;
3136

3237
-- name: InsertCollection :one
33-
INSERT INTO collection (id, record_enumeration_offset_position, record_compaction_offset_position) values($1, $2, $3) returning *;
38+
INSERT INTO collection (id, record_enumeration_offset_position, record_compaction_offset_position, is_sealed) values($1, $2, $3, $4) returning *;
3439

3540
-- name: PurgeRecords :exec
3641
DELETE FROM record_log r using collection c where r.collection_id = c.id and r.offset <= c.record_compaction_offset_position;

rust/frontend/sample_configs/tilt_config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,3 +56,5 @@ circuit_breaker:
5656
requests: 1000
5757
enable_span_indexing: true
5858
default_knn_index: "spann"
59+
tenants_to_migrate_immediately:
60+
- "default_tenant"

rust/log-service/src/lib.rs

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,14 @@ impl LogServer {
896896
self.metrics
897897
.dirty_log_records_read
898898
.add(dirty_markers.len() as u64, &[]);
899+
let Some((last_record_inserted, _)) = dirty_markers.last() else {
900+
let backpressure = vec![];
901+
self.set_backpressure(&backpressure);
902+
let mut need_to_compact = self.need_to_compact.lock();
903+
let mut rollups = HashMap::new();
904+
std::mem::swap(&mut *need_to_compact, &mut rollups);
905+
return Ok(());
906+
};
899907
let mut rollups = DirtyMarker::coalesce_markers(&dirty_markers)?;
900908
self.enrich_dirty_log(&mut rollups).await?;
901909
let mut markers = vec![];
@@ -919,7 +927,8 @@ impl LogServer {
919927
markers.push(serde_json::to_string(&DirtyMarker::Cleared).map(Vec::from)?);
920928
}
921929
let mut new_cursor = cursor.clone();
922-
new_cursor.position = self.dirty_log.append_many(markers).await?;
930+
self.dirty_log.append_many(markers).await?;
931+
new_cursor.position = *last_record_inserted + 1u64;
923932
let Some(cursors) = self.dirty_log.cursors(CursorStoreOptions::default()) else {
924933
return Err(Error::CouldNotGetDirtyLogCursors);
925934
};
@@ -1212,23 +1221,11 @@ impl LogServer {
12121221
max_bytes: None,
12131222
max_records: Some(pull_logs.batch_size as u64),
12141223
};
1215-
// NOTE(rescrv): Log records are immutable, so if a manifest includes our range we can
1216-
// serve it directly from the scan_from_manifest call.
1217-
let (manifest_start, manifest_limit) = (
1218-
manifest.minimum_log_position().offset() as i64,
1219-
manifest.maximum_log_position().offset() as i64,
1220-
);
1221-
if manifest_start <= pull_logs.start_from_offset
1222-
&& pull_logs.start_from_offset + pull_logs.batch_size as i64 <= manifest_limit
1223-
{
1224-
LogReader::scan_from_manifest(
1225-
&manifest,
1226-
LogPosition::from_offset(pull_logs.start_from_offset as u64),
1227-
limits,
1228-
)
1229-
} else {
1230-
None
1231-
}
1224+
LogReader::scan_from_manifest(
1225+
&manifest,
1226+
LogPosition::from_offset(pull_logs.start_from_offset as u64),
1227+
limits,
1228+
)
12321229
} else {
12331230
None
12341231
}
@@ -1289,7 +1286,7 @@ impl LogServer {
12891286
let prefix = storage_prefix_for_log(collection_id);
12901287
if let Some(cache) = self.cache.as_ref() {
12911288
let cache_key = format!("{collection_id}::{}", fragment.path);
1292-
let cache_span = tracing::info_span!("cache get");
1289+
let cache_span = tracing::info_span!("cache get", cache_key = ?cache_key);
12931290
if let Ok(Some(answer)) = cache.get(&cache_key).instrument(cache_span).await
12941291
{
12951292
return Ok(Arc::new(answer.bytes));

rust/tracing/src/init_tracer.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub fn init_global_filter_layer() -> Box<dyn Layer<Registry> + Send + Sync> {
3838
"wal3",
3939
"worker",
4040
"garbage_collector",
41+
"continuous_verification",
4142
]
4243
.into_iter()
4344
.map(|s| s.to_string() + "=trace")
@@ -147,6 +148,10 @@ pub fn init_stdout_layer() -> Box<dyn Layer<Registry> + Send + Sync> {
147148
.module_path()
148149
.unwrap_or("")
149150
.starts_with("hosted-frontend")
151+
|| metadata
152+
.module_path()
153+
.unwrap_or("")
154+
.starts_with("continuous_verification")
150155
}))
151156
.with_filter(tracing_subscriber::filter::LevelFilter::INFO)
152157
.boxed()

0 commit comments

Comments
 (0)