Skip to content

Commit 7cc39aa

Browse files
committed
feature: Postgres support
1 parent e82d246 commit 7cc39aa

File tree

12 files changed

+1719
-2
lines changed

12 files changed

+1719
-2
lines changed

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ run:
4848
run_mongodb:
4949
docker-compose up -d mongodb
5050

51+
.PHONY: run_postgres
52+
run_postgres:
53+
docker-compose up -d postgres
54+
5155
.PHONY: run_gpubsub
5256
run_gpubsub:
5357
docker-compose up -d gpubsub

docker-compose.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ services:
4343
volumes:
4444
- mongodb:/data/db
4545

46+
postgres:
47+
image: postgres:14
48+
ports:
49+
- "5432:5432"
50+
environment:
51+
POSTGRES_PASSWORD: "password"
52+
# PGDATA: "/data/postgres"
53+
# volumes:
54+
# - postgres:/data/postgres
55+
4656
gpubsub:
4757
image: gcr.io/google.com/cloudsdktool/cloud-sdk:355.0.0-emulators
4858
ports:
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// Copyright (c) 2021 - The Event Horizon authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package postgres
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
"go.mongodb.org/mongo-driver/bson"
22+
"go.mongodb.org/mongo-driver/mongo"
23+
24+
// Register uuid.UUID as BSON type.
25+
_ "github.com/looplab/eventhorizon/codec/bson"
26+
27+
eh "github.com/looplab/eventhorizon"
28+
)
29+
30+
// Replace implements the Replace method of the eventhorizon.EventStore interface.
31+
func (s *EventStore) Replace(ctx context.Context, event eh.Event) error {
32+
sess, err := s.client.StartSession(nil)
33+
if err != nil {
34+
return eh.EventStoreError{
35+
Err: eh.ErrCouldNotSaveEvents,
36+
BaseErr: err,
37+
}
38+
}
39+
defer sess.EndSession(ctx)
40+
41+
if _, err := sess.WithTransaction(ctx, func(txCtx mongo.SessionContext) (interface{}, error) {
42+
// First check if the aggregate exists, the not found error in the update
43+
// query can mean both that the aggregate or the event is not found.
44+
if n, err := s.events.CountDocuments(ctx,
45+
bson.M{"aggregate_id": event.AggregateID()}); n == 0 {
46+
return nil, eh.ErrAggregateNotFound
47+
} else if err != nil {
48+
return nil, err
49+
}
50+
51+
// Create the event record for the Database.
52+
e, err := newEvt(ctx, event)
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
// Copy the event position from the old event (and set in metadata).
58+
res := s.events.FindOne(ctx, bson.M{
59+
"aggregate_id": event.AggregateID(),
60+
"version": event.Version(),
61+
})
62+
if res.Err() != nil {
63+
if res.Err() == mongo.ErrNoDocuments {
64+
return nil, eh.ErrInvalidEvent
65+
}
66+
return nil, fmt.Errorf("could not find event to replace: %w", res.Err())
67+
}
68+
var eventToReplace evt
69+
if err := res.Decode(&eventToReplace); err != nil {
70+
return nil, fmt.Errorf("could not decode event to replace: %w", err)
71+
}
72+
e.Position = eventToReplace.Position
73+
e.Metadata["position"] = eventToReplace.Position
74+
75+
// Find and replace the event.
76+
if r, err := s.events.ReplaceOne(ctx, bson.M{
77+
"aggregate_id": event.AggregateID(),
78+
"version": event.Version(),
79+
}, e); err != nil {
80+
return nil, err
81+
} else if r.MatchedCount == 0 {
82+
return nil, eh.ErrInvalidEvent
83+
}
84+
return nil, nil
85+
}); err != nil {
86+
// Return some errors intact.
87+
if err == eh.ErrAggregateNotFound || err == eh.ErrInvalidEvent {
88+
return err
89+
}
90+
return eh.EventStoreError{
91+
Err: eh.ErrCouldNotSaveEvents,
92+
BaseErr: err,
93+
}
94+
}
95+
96+
return nil
97+
}
98+
99+
// RenameEvent implements the RenameEvent method of the eventhorizon.EventStore interface.
100+
func (s *EventStore) RenameEvent(ctx context.Context, from, to eh.EventType) error {
101+
// Find and rename all events.
102+
// TODO: Maybe use change info.
103+
if _, err := s.events.UpdateMany(ctx,
104+
bson.M{
105+
"event_type": from.String(),
106+
},
107+
bson.M{
108+
"$set": bson.M{"event_type": to.String()},
109+
},
110+
); err != nil {
111+
return eh.EventStoreError{
112+
Err: eh.ErrCouldNotSaveEvents,
113+
BaseErr: err,
114+
}
115+
}
116+
117+
return nil
118+
}
119+
120+
// Clear clears the event storage.
121+
func (s *EventStore) Clear(ctx context.Context) error {
122+
if err := s.events.Drop(ctx); err != nil {
123+
return eh.EventStoreError{
124+
Err: fmt.Errorf("could not clear events collection: %w", err),
125+
}
126+
}
127+
if err := s.streams.Drop(ctx); err != nil {
128+
return eh.EventStoreError{
129+
Err: fmt.Errorf("could not clear streams collection: %w", err),
130+
}
131+
}
132+
return nil
133+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright (c) 2021 - The Event Horizon authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package postgres
16+
17+
import (
18+
"context"
19+
"crypto/rand"
20+
"encoding/hex"
21+
"os"
22+
"testing"
23+
24+
"github.com/looplab/eventhorizon/eventstore"
25+
)
26+
27+
func TestEventStoreMaintenanceIntegration(t *testing.T) {
28+
if testing.Short() {
29+
t.Skip("skipping integration test")
30+
}
31+
32+
// Use MongoDB in Docker with fallback to localhost.
33+
addr := os.Getenv("MONGODB_ADDR")
34+
if addr == "" {
35+
addr = "localhost:27017"
36+
}
37+
url := "mongodb://" + addr
38+
39+
// Get a random DB name.
40+
b := make([]byte, 4)
41+
if _, err := rand.Read(b); err != nil {
42+
t.Fatal(err)
43+
}
44+
db := "test-" + hex.EncodeToString(b)
45+
t.Log("using DB:", db)
46+
47+
store, err := NewEventStore(url, db)
48+
if err != nil {
49+
t.Fatal("there should be no error:", err)
50+
}
51+
if store == nil {
52+
t.Fatal("there should be a store")
53+
}
54+
defer store.Close(context.Background())
55+
56+
eventstore.MaintenanceAcceptanceTest(t, store, store, context.Background())
57+
}

0 commit comments

Comments
 (0)