|
11 | 11 | using Marten.Internal.Storage; |
12 | 12 | using Marten.Linq.QueryHandlers; |
13 | 13 | using Marten.Schema; |
| 14 | +using Marten.Storage; |
14 | 15 | using Npgsql; |
15 | 16 | using Weasel.Core; |
16 | 17 | using Weasel.Postgresql; |
@@ -73,8 +74,18 @@ public FetchAsyncPlan(EventGraph events, IEventIdentityStrategy<TId> identityStr |
73 | 74 | _storage = storage; |
74 | 75 | _aggregator = _events.Options.Projections.AggregatorFor<TDoc>(); |
75 | 76 |
|
76 | | - _versionSelectionSql = |
77 | | - $" left outer join {storage.TableName.QualifiedName} as a on d.stream_id = a.id where (a.mt_version is NULL or d.version > a.mt_version) and d.stream_id = "; |
| 77 | + if (_events.TenancyStyle == TenancyStyle.Single) |
| 78 | + { |
| 79 | + _versionSelectionSql = |
| 80 | + $" left outer join {storage.TableName.QualifiedName} as a on d.stream_id = a.id where (a.mt_version is NULL or d.version > a.mt_version) and d.stream_id = "; |
| 81 | + } |
| 82 | + else |
| 83 | + { |
| 84 | + _versionSelectionSql = |
| 85 | + $" left outer join {storage.TableName.QualifiedName} as a on d.stream_id = a.id and d.tenant_id = a.tenant_id where (a.mt_version is NULL or d.version > a.mt_version) and d.stream_id = "; |
| 86 | + } |
| 87 | + |
| 88 | + |
78 | 89 | } |
79 | 90 |
|
80 | 91 | public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase session, TId id, bool forUpdate, CancellationToken cancellation = default) |
@@ -177,6 +188,14 @@ private void writeEventFetchStatement(TId id, |
177 | 188 | builder.Append(_initialSql); |
178 | 189 | builder.Append(_versionSelectionSql); |
179 | 190 | builder.AppendParameter(id); |
| 191 | + |
| 192 | + // You must do this for performance even if the stream ids were |
| 193 | + // magically unique across tenants |
| 194 | + if (_events.TenancyStyle == TenancyStyle.Conjoined) |
| 195 | + { |
| 196 | + builder.Append(" and d.tenant_id = "); |
| 197 | + builder.AppendParameter(builder.TenantId); |
| 198 | + } |
180 | 199 | } |
181 | 200 |
|
182 | 201 | public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase session, TId id, long expectedStartingVersion, |
|
0 commit comments