Skip to content

Commit

Permalink
Merge branch 'develop' into release/1.10.0
Browse files Browse the repository at this point in the history
  • Loading branch information
andre-urbani committed Dec 11, 2020
2 parents 5671621 + 802dee2 commit 4556224
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 25 deletions.
3 changes: 2 additions & 1 deletion api/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package api
import (
"context"
"errors"

"github.com/ONSdigital/dp-graph/v2/graph"
"github.com/ONSdigital/dp-healthcheck/healthcheck"
kafka "github.com/ONSdigital/dp-kafka"
kafka "github.com/ONSdigital/dp-kafka/v2"
dphttp "github.com/ONSdigital/dp-net/http"
"github.com/ONSdigital/log.go/log"
"github.com/gorilla/mux"
Expand Down
62 changes: 43 additions & 19 deletions cmd/dp-import-tracker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/ONSdigital/dp-import-tracker/api"
"github.com/ONSdigital/dp-import-tracker/config"
"github.com/ONSdigital/dp-import/events"
kafka "github.com/ONSdigital/dp-kafka"
kafka "github.com/ONSdigital/dp-kafka/v2"
"github.com/ONSdigital/log.go/log"
)

Expand All @@ -31,6 +31,8 @@ var (
Version string
)

var bufferSize = 1

type insertResult int

const (
Expand Down Expand Up @@ -443,15 +445,27 @@ func main() {
// sensitive fields are omitted from config.String().
log.Event(ctx, "loaded config", log.INFO, log.Data{"config": cfg})

kafkaOffset := kafka.OffsetNewest

if cfg.KafkaOffsetOldest {
kafkaOffset = kafka.OffsetOldest
}

cgConfig := &kafka.ConsumerGroupConfig{
Offset: &kafkaOffset,
KafkaVersion: &cfg.KafkaVersion,
}

// Create InstanceEvent kafka consumer - exit on channel validation error. Non-initialised consumers will not error at creation time.
newInstanceEventConsumer, err := kafka.NewConsumerGroup(
ctx,
cfg.Brokers,
cfg.NewInstanceTopic,
cfg.NewInstanceConsumerGroup,
kafka.OffsetNewest,
true,
kafka.CreateConsumerGroupChannels(true))
kafka.CreateConsumerGroupChannels(bufferSize),
cgConfig,
)

if err != nil {
logFatal(ctx, "could not obtain consumer", err, log.Data{"topic": cfg.NewInstanceTopic})
}
Expand All @@ -462,9 +476,10 @@ func main() {
cfg.Brokers,
cfg.ObservationsInsertedTopic,
cfg.ObservationsInsertedConsumerGroup,
kafka.OffsetNewest,
true,
kafka.CreateConsumerGroupChannels(true))
kafka.CreateConsumerGroupChannels(bufferSize),
cgConfig,
)

if err != nil {
logFatal(ctx, "could not obtain consumer", err, log.Data{"topic": cfg.ObservationsInsertedTopic})
}
Expand All @@ -475,9 +490,10 @@ func main() {
cfg.Brokers,
cfg.HierarchyBuiltTopic,
cfg.HierarchyBuiltConsumerGroup,
kafka.OffsetNewest,
true,
kafka.CreateConsumerGroupChannels(true))
kafka.CreateConsumerGroupChannels(bufferSize),
cgConfig,
)

if err != nil {
logFatal(ctx, "could not obtain consumer", err, log.Data{"topic": cfg.HierarchyBuiltTopic})
}
Expand All @@ -488,20 +504,28 @@ func main() {
cfg.Brokers,
cfg.SearchBuiltTopic,
cfg.SearchBuiltConsumerGroup,
kafka.OffsetNewest,
true,
kafka.CreateConsumerGroupChannels(true))
kafka.CreateConsumerGroupChannels(bufferSize),
cgConfig,
)

if err != nil {
logFatal(ctx, "could not obtain consumer", err, log.Data{"topic": cfg.SearchBuiltTopic})
}

// Create DataImportComplete kafka producer - exit on channel validation error. Non-initialised producers will not error at creation time.

pChannels := kafka.CreateProducerChannels()
pConfig := &kafka.ProducerConfig{
KafkaVersion: &cfg.KafkaVersion,
}

dataImportCompleteProducer, err := kafka.NewProducer(
ctx,
cfg.Brokers,
cfg.DataImportCompleteTopic,
0,
kafka.CreateProducerChannels())
pChannels,
pConfig,
)
if err != nil {
logFatal(ctx, "observation import complete kafka producer error", err, nil)
}
Expand Down Expand Up @@ -590,7 +614,7 @@ func main() {
} else {
createInstanceChan <- newInstanceEvent
}
newInstanceEventConsumer.CommitAndRelease(newInstanceMessage)
newInstanceMessage.CommitAndRelease()
case insertedMessage := <-observationsInsertedEventConsumer.Channels().Upstream:
// This context will be obtained from the received kafka message in the future
kafkaContext := context.Background()
Expand All @@ -607,7 +631,7 @@ func main() {
continue
}
}
observationsInsertedEventConsumer.CommitAndRelease(insertedMessage)
insertedMessage.CommitAndRelease()
case hierarchyBuiltMessage := <-hierarchyBuiltConsumer.Channels().Upstream:
// This context will be obtained from the received kafka message in the future
kafkaContext := context.Background()
Expand All @@ -634,7 +658,7 @@ func main() {
}
log.Event(kafkaContext, "updated instance with hierarchy built. committing message", log.INFO, logData)
}
hierarchyBuiltConsumer.CommitAndRelease(hierarchyBuiltMessage)
hierarchyBuiltMessage.CommitAndRelease()

case searchBuiltMessage := <-searchBuiltConsumer.Channels().Upstream:
// This context will be obtained from the received kafka message in the future
Expand Down Expand Up @@ -662,7 +686,7 @@ func main() {
}
log.Event(kafkaContext, "updated instance with search index built. committing message", log.INFO, logData)
}
searchBuiltConsumer.CommitAndRelease(searchBuiltMessage)
searchBuiltMessage.CommitAndRelease()
}

}
Expand Down
7 changes: 5 additions & 2 deletions cmd/input-file-available-producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/ONSdigital/dp-import/events"
kafka "github.com/ONSdigital/dp-kafka"
kafka "github.com/ONSdigital/dp-kafka/v2"
"github.com/ONSdigital/log.go/log"
)

Expand All @@ -23,10 +23,13 @@ func main() {
flag.Parse()
ctx := context.Background()

var pConfig *kafka.ProducerConfig
var pChannels *kafka.ProducerChannels

var brokers []string
brokers = append(brokers, *kafkaHost)

producer, err := kafka.NewProducer(ctx, brokers, *topic, int(2000000), kafka.CreateProducerChannels())
producer, err := kafka.NewProducer(ctx, brokers, *topic, pChannels, pConfig)
if err != nil {
log.Event(ctx, "Error creating Kafka Producer", log.FATAL, log.Error(err))
os.Exit(1)
Expand Down
7 changes: 5 additions & 2 deletions cmd/observations-inserted-producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/ONSdigital/dp-import/events"
kafka "github.com/ONSdigital/dp-kafka"
kafka "github.com/ONSdigital/dp-kafka/v2"
"github.com/ONSdigital/log.go/log"
)

Expand All @@ -22,10 +22,13 @@ func main() {
flag.Parse()
ctx := context.Background()

var pConfig *kafka.ProducerConfig
var pChannels *kafka.ProducerChannels

var brokers []string
brokers = append(brokers, *kafkaHost)

producer, err := kafka.NewProducer(ctx, brokers, *topic, int(2000000), kafka.CreateProducerChannels())
producer, err := kafka.NewProducer(ctx, brokers, *topic, pChannels, pConfig)
if err != nil {
log.Event(ctx, "Error creating Kafka Producer", log.FATAL, log.Error(err))
os.Exit(1)
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Config struct {
InitialiseListAttempts int `envconfig:"INITIALISE_LIST_ATTEMPTS"`
HealthCheckInterval time.Duration `envconfig:"HEALTHCHECK_INTERVAL"`
HealthCheckCriticalTimeout time.Duration `envconfig:"HEALTHCHECK_CRITICAL_TIMEOUT"`
KafkaVersion string `envconfig:"KAFKA_VERSION"`
KafkaOffsetOldest bool `envconfig:"KAFKA_OFFSET_OLDEST"`
}

// NewConfig creates the config object
Expand All @@ -55,6 +57,8 @@ func NewConfig() (*Config, error) {
InitialiseListAttempts: 20,
HealthCheckInterval: 30 * time.Second,
HealthCheckCriticalTimeout: 90 * time.Second,
KafkaVersion: "1.0.2",
KafkaOffsetOldest: true,
}
if err := envconfig.Process("", &cfg); err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ require (
github.com/ONSdigital/dp-api-clients-go v1.30.0
github.com/ONSdigital/dp-graph/v2 v2.2.2
github.com/ONSdigital/dp-healthcheck v1.0.5
github.com/ONSdigital/dp-hierarchy-api v1.5.0
github.com/ONSdigital/dp-import v0.0.0-20180202121531-d3cc28e452c3
github.com/ONSdigital/dp-kafka v1.1.7
github.com/ONSdigital/dp-kafka/v2 v2.1.1
github.com/ONSdigital/dp-net v1.0.9
github.com/ONSdigital/go-ns v0.0.0-20200205115900-a11716f93bad // indirect
github.com/ONSdigital/log.go v1.0.1
Expand Down
Loading

0 comments on commit 4556224

Please sign in to comment.