Skip to content

Commit c5e6c08

Browse files
committed
url-encoder service is added.
1 parent 05adab6 commit c5e6c08

File tree

14 files changed

+303
-32
lines changed

14 files changed

+303
-32
lines changed

.idea/url-shortener.iml

+9
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/vcs.xml

+6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/event_handler/main.go

+24-15
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"encoding/json"
45
"fmt"
56
"github.com/apache/pulsar-client-go/pulsar"
67
"github.com/cemayan/url-shortener/common"
@@ -11,7 +12,7 @@ import (
1112
"github.com/cemayan/url-shortener/internal/event_handler/adapter/database"
1213
"github.com/cemayan/url-shortener/internal/event_handler/domain/model"
1314
cockroack_output "github.com/cemayan/url-shortener/internal/event_handler/domain/port/output"
14-
"github.com/cemayan/url-shortener/internal/event_handler/util"
15+
"github.com/cemayan/url-shortener/internal/event_handler/helper"
1516
"github.com/cemayan/url-shortener/managers/db"
1617
"github.com/cemayan/url-shortener/managers/hook"
1718
"github.com/cemayan/url-shortener/managers/mq"
@@ -53,7 +54,7 @@ func init() {
5354
_db := dbHandler.New()
5455
database.DB = _db
5556

56-
util.MigrateDB(_db, _log.WithFields(logrus.Fields{"service": "database"}))
57+
helper.MigrateDB(_db, _log.WithFields(logrus.Fields{"service": "database"}))
5758

5859
fluentdHook := hook.NewFluentdHook()
5960
_log.Hooks.Add(fluentdHook)
@@ -92,23 +93,31 @@ func main() {
9293
_log.WithFields(logrus.Fields{"method": "GetSchemaValue", "message": err.Error(), "data": string(msg.Payload())}).Log(logrus.ErrorLevel)
9394
}
9495

95-
_log.Println(event)
96+
if event.EventName == common.UrlCreated {
9697

97-
err = consumer.Ack(msg)
98-
if err != nil {
99-
_log.WithFields(logrus.Fields{"method": "ConsumerAck", "message": err.Error(), "data": fmt.Sprintf("%v||%v", msg.Topic(), msg.ID().EntryID())}).Log(logrus.ErrorLevel)
100-
}
98+
var cockroachPort cockroack_output.CockroachPort
99+
cockroachPort = cockroach.NewUserUrlRepo(database.DB, _log.WithFields(logrus.Fields{"service": "cockroach-repo"}))
101100

102-
var cockroachPort cockroack_output.CockroachPort
103-
cockroachPort = cockroach.NewUserUrlRepo(database.DB, _log.WithFields(logrus.Fields{"service": "cockroach-repo"}))
101+
var eventDetail common.EventDataDetail
104102

105-
_, err = cockroachPort.CreateUserUrl(&model.UserUrl{
106-
UserId: "",
107-
ShortUrl: "short",
108-
LongUrl: "long",
109-
})
103+
err := json.Unmarshal(event.EventData, &eventDetail)
104+
if err != nil {
105+
return
106+
}
107+
108+
_, err = cockroachPort.CreateUserUrl(&model.UserUrl{
109+
UserId: event.UserId,
110+
ShortUrl: eventDetail.ShortUrl,
111+
LongUrl: eventDetail.LongUrl,
112+
})
113+
if err != nil {
114+
return
115+
}
116+
}
117+
118+
err = consumer.Ack(msg)
110119
if err != nil {
111-
return
120+
_log.WithFields(logrus.Fields{"method": "ConsumerAck", "message": err.Error(), "data": fmt.Sprintf("%v||%v", msg.Topic(), msg.ID().EntryID())}).Log(logrus.ErrorLevel)
112121
}
113122

114123
}

cmd/url_encoder/main.go

+153
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"github.com/apache/pulsar-client-go/pulsar"
7+
"github.com/cemayan/url-shortener/common"
8+
pulsar_handler "github.com/cemayan/url-shortener/common/adapters/pulsar"
9+
"github.com/cemayan/url-shortener/common/ports/output"
10+
"github.com/cemayan/url-shortener/config/url_encoder"
11+
"github.com/cemayan/url-shortener/internal/api/write/adapter/mongodb"
12+
"github.com/cemayan/url-shortener/internal/api/write/domain/model"
13+
"github.com/cemayan/url-shortener/managers/db"
14+
"github.com/cemayan/url-shortener/managers/hook"
15+
"github.com/cemayan/url-shortener/managers/mq"
16+
"github.com/cemayan/url-shortener/util"
17+
"github.com/sirupsen/logrus"
18+
"github.com/spf13/viper"
19+
"go.mongodb.org/mongo-driver/bson"
20+
"go.mongodb.org/mongo-driver/bson/primitive"
21+
"math/rand"
22+
"os"
23+
"time"
24+
)
25+
26+
var _log *logrus.Logger
27+
var configs *url_encoder.AppConfig
28+
var v *viper.Viper
29+
30+
func init() {
31+
_log = logrus.New()
32+
if os.Getenv("env") == "dev" {
33+
_log.SetFormatter(&logrus.TextFormatter{
34+
DisableColors: false,
35+
FullTimestamp: true,
36+
})
37+
} else {
38+
_log.SetFormatter(&logrus.JSONFormatter{})
39+
_log.SetOutput(os.Stdout)
40+
}
41+
_log.Out = os.Stdout
42+
43+
v = viper.New()
44+
_configs := url_encoder.NewConfig(v)
45+
46+
env := os.Getenv("env")
47+
appConfig, err := _configs.GetConfig(env)
48+
configs = appConfig
49+
if err != nil {
50+
return
51+
}
52+
53+
fluentdHook := hook.NewFluentdHook()
54+
_log.Hooks.Add(fluentdHook)
55+
}
56+
57+
func main() {
58+
59+
var mongoManager db.MongodbManager
60+
mongoManager = db.NewMongodbManager(configs.Mongo, _log.WithFields(logrus.Fields{"service": "mongo-db-manager"}))
61+
62+
var mongoPort output.MongoPort
63+
mongoPort = mongodb.NewApiRepo(mongoManager.New(), configs.Mongo, _log.WithFields(logrus.Fields{"service": "mongo-db-manager"}))
64+
65+
client, err := pulsar.NewClient(pulsar.ClientOptions{
66+
URL: configs.Pulsar.Url,
67+
OperationTimeout: 30 * time.Second,
68+
ConnectionTimeout: 30 * time.Second,
69+
})
70+
if err != nil {
71+
_log.WithFields(logrus.Fields{"method": "NewPulsarClient", "message": err.Error()}).Log(logrus.FatalLevel)
72+
return
73+
}
74+
75+
var pulsarManager mq.PulsarManager
76+
pulsarManager = mq.NewPulsarManager(client, configs.Pulsar, _log.WithFields(logrus.Fields{"service": "event-handler"}))
77+
78+
var pulsarPort output.PulsarPort
79+
pulsarPort = pulsar_handler.NewPulsarHandler(pulsarManager, configs.Pulsar, _log.WithFields(logrus.Fields{"service": "event-handler"}))
80+
81+
channel := make(chan pulsar.ConsumerMessage, 10)
82+
83+
consumer := pulsarPort.ConsumeEvent(configs.Pulsar.Topic, fmt.Sprintf("%s-%v", "sub", rand.Int63()), channel)
84+
defer consumer.Close()
85+
86+
for cm := range channel {
87+
consumer := cm.Consumer
88+
msg := cm.Message
89+
var event common.EventModel
90+
91+
err := msg.GetSchemaValue(&event)
92+
if err != nil {
93+
_log.WithFields(logrus.Fields{"method": "GetSchemaValue", "message": err.Error(), "data": string(msg.Payload())}).Log(logrus.ErrorLevel)
94+
}
95+
96+
if event.EventName == common.UrlEncoded {
97+
98+
metadata := util.GetEventMetadata()
99+
100+
var eventDetail common.EventDataDetail
101+
102+
err := json.Unmarshal(event.EventData, &eventDetail)
103+
if err != nil {
104+
return
105+
}
106+
107+
eventDetail.ShortUrl = "shortttt"
108+
109+
err = mongoPort.CreateEvent(model.Events{
110+
ID: primitive.NewObjectID(),
111+
EventData: bson.M{
112+
"shortUrl": "shortttt",
113+
"longUrl": eventDetail.LongUrl,
114+
},
115+
EventDate: metadata.EventDate,
116+
EventName: common.UrlCreated,
117+
AggregateType: "",
118+
AggregateId: metadata.AggregateId,
119+
UserId: event.UserId,
120+
})
121+
if err != nil {
122+
//TODO
123+
return
124+
} else {
125+
126+
eventDetailBytes, err := json.Marshal(eventDetail)
127+
if err != nil {
128+
return
129+
}
130+
131+
err = pulsarPort.PublishEvent(common.EventModel{
132+
AggregateId: metadata.AggregateId,
133+
AggregateType: event.AggregateType,
134+
EventData: eventDetailBytes,
135+
EventDate: metadata.EventDate,
136+
EventName: common.UrlCreated,
137+
UserId: event.UserId,
138+
}, configs.Pulsar.Topic)
139+
if err != nil {
140+
//TODO
141+
return
142+
}
143+
}
144+
145+
}
146+
147+
err = consumer.Ack(msg)
148+
if err != nil {
149+
_log.WithFields(logrus.Fields{"method": "ConsumerAck", "message": err.Error(), "data": fmt.Sprintf("%v||%v", msg.Topic(), msg.ID().EntryID())}).Log(logrus.ErrorLevel)
150+
}
151+
152+
}
153+
}

config/event_handler/config-dev.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ cockroach:
44
port: 26257
55
user: root
66
mongo:
7-
dbName: eseventstore
8-
url: mongodb://localhost:27017
7+
dbName: urlShortener
8+
uri: mongodb://localhost:27017
99
pulsar:
1010
url: pulsar://localhost:6650
1111
topic: events

config/url_encoder/config-dev.yml

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
cockroach:
2+
host: localhost
3+
name: us_metadata
4+
port: 26257
5+
user: root
6+
mongo:
7+
dbName: urlShortener
8+
uri: mongodb://localhost:27017
9+
pulsar:
10+
url: pulsar://localhost:6650
11+
topic: events
12+
secret: secret
13+

config/url_encoder/config.go

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package url_encoder
2+
3+
import (
4+
"errors"
5+
"github.com/cemayan/url-shortener/common"
6+
log "github.com/sirupsen/logrus"
7+
"github.com/spf13/viper"
8+
)
9+
10+
type Configer interface {
11+
LoadLocalFileConfig(filename string) (*viper.Viper, error)
12+
ParseConfig(v *viper.Viper) (*AppConfig, error)
13+
GetConfig(configPath string) (*AppConfig, error)
14+
}
15+
16+
type Config struct {
17+
viper *viper.Viper
18+
}
19+
20+
// AppConfig is representation of a OS Env values
21+
type AppConfig struct {
22+
Secret string `json:"secret"`
23+
Cockroach common.Cockroach `json:"cockroach" `
24+
Kafka common.Kafka `json:"kafka"`
25+
Pulsar common.Pulsar `json:"pulsar"`
26+
Mongo common.Mongo `json:"mongo"`
27+
}
28+
29+
// Load config file from given path
30+
func (cfg Config) LoadLocalFileConfig(filename string) (*viper.Viper, error) {
31+
32+
cfg.viper.SetConfigType("yaml")
33+
cfg.viper.AddConfigPath(".")
34+
cfg.viper.SetConfigName(filename)
35+
36+
cfg.viper.AutomaticEnv()
37+
38+
if err := cfg.viper.ReadInConfig(); err != nil {
39+
if _, ok := err.(viper.ConfigFileNotFoundError); ok {
40+
return nil, errors.New("config file not found")
41+
}
42+
return nil, err
43+
}
44+
45+
return cfg.viper, nil
46+
}
47+
48+
// Parse config file
49+
func (cfg Config) ParseConfig(v *viper.Viper) (*AppConfig, error) {
50+
var c AppConfig
51+
52+
err := v.Unmarshal(&c)
53+
if err != nil {
54+
log.Printf("unable to decode into struct, %v", err)
55+
return nil, err
56+
}
57+
58+
return &c, nil
59+
}
60+
61+
// Get config
62+
func (cfg Config) GetConfig(env string) (*AppConfig, error) {
63+
64+
var path string
65+
if env == "dev" {
66+
path = "./config/url_encoder/config-dev"
67+
}
68+
69+
v, err := cfg.LoadLocalFileConfig(path)
70+
if err != nil {
71+
return nil, err
72+
}
73+
74+
_cfg, err := cfg.ParseConfig(v)
75+
if err != nil {
76+
return nil, err
77+
}
78+
return _cfg, nil
79+
}
80+
81+
func NewConfig(viper *viper.Viper) Configer {
82+
return &Config{viper: viper}
83+
}

internal/api/write/adapter/mongodb/api_repo.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
package mongodb
22

33
import (
4-
"github.com/cemayan/url-shortener/config/api"
4+
"github.com/cemayan/url-shortener/common"
5+
"github.com/cemayan/url-shortener/common/ports/output"
56
"github.com/cemayan/url-shortener/internal/api/write/adapter/database"
67
"github.com/cemayan/url-shortener/internal/api/write/domain/model"
7-
"github.com/cemayan/url-shortener/internal/api/write/domain/port/output"
88
log "github.com/sirupsen/logrus"
99
"go.mongodb.org/mongo-driver/mongo"
1010
)
1111

1212
type ApiRepoSvc struct {
1313
mongoClient *mongo.Client
14-
configs *api.AppConfig
14+
configs common.Mongo
1515
log *log.Entry
1616
}
1717

1818
func (a ApiRepoSvc) CreateEvent(event model.Events) error {
19-
collection := a.mongoClient.Database(a.configs.Mongo.DbName).Collection("events")
19+
collection := a.mongoClient.Database(a.configs.DbName).Collection("events")
2020
_, err := collection.InsertOne(database.MongoDBContext, &event)
2121
if err != nil {
2222
a.log.WithFields(log.Fields{"method": "SaveRecord"}).Errorf("An errror occurred %v", err.Error())
@@ -26,7 +26,7 @@ func (a ApiRepoSvc) CreateEvent(event model.Events) error {
2626
return nil
2727
}
2828

29-
func NewApiRepo(mongoClient *mongo.Client, configs *api.AppConfig, log *log.Entry) output.MongoPort {
29+
func NewApiRepo(mongoClient *mongo.Client, configs common.Mongo, log *log.Entry) output.MongoPort {
3030
return &ApiRepoSvc{
3131
mongoClient: mongoClient,
3232
configs: configs,

0 commit comments

Comments
 (0)