Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrate the new crossref relationships endpoint for event creation #155

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
235 changes: 126 additions & 109 deletions app/models/crossref.rb
Original file line number Diff line number Diff line change
@@ -1,152 +1,169 @@
class Crossref < Base
def self.import_by_month_dates(options={})
{
from_date: (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month,
until_date: (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month
}
end

def self.import_dates(options={})
{
from_date: options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current - 1.day,
until_date: options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current
}
end

def self.import_by_month(options = {})
from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month
until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month
dates = import_by_month_dates(options)

# get first day of every month between from_date and until_date
(from_date..until_date).select { |d| d.day == 1 }.each do |m|
CrossrefImportByMonthJob.perform_later(from_date: m.strftime("%F"),
until_date: m.end_of_month.strftime("%F"))
(dates[:from_date]..dates[:until_date]).select { |d| d.day == 1 }.each do |m|
CrossrefImportByMonthJob.perform_later(from_date: m.strftime("%F"), until_date: m.end_of_month.strftime("%F"))
end

"Queued import for DOIs updated from #{from_date.strftime('%F')} until #{until_date.strftime('%F')}."
"Queued import for DOIs updated from #{dates[:from_date].strftime('%F')} until #{dates[:until_date].strftime('%F')}."
end

def self.import(options = {})
from_date = options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current - 1.day
until_date = options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current

dates = import_dates(options)
crossref = Crossref.new
crossref.queue_jobs(crossref.unfreeze(from_date: from_date.strftime("%F"),
until_date: until_date.strftime("%F"), host: true))

crossref.queue_jobs(crossref.unfreeze(
from_date: dates[:from_date].strftime("%F"),
until_date: dates[:until_date].strftime("%F"),
host: true))
end

def source_id
"crossref"
end

def allowed_relationship_types
["cites", "references", "is-supplemented-by"]
end

def get_query_url(options = {})
params = {
source: "crossref",
"from-collected-date" => options[:from_date],
"until-collected-date" => options[:until_date],
mailto: "[email protected]",
scholix: true,
rows: options[:rows],
cursor: options[:cursor],
"not-asserted-by" => "https://ror.org/04wxnsj81",
"object.registration-agency" => "DataCite",
"from-updated-time" => options[:from_date],
"until-updated-time" => options[:until_date],
"cursor" => options[:cursor]
}.compact

"#{ENV['CROSSREF_QUERY_URL']}/v1/events?#{URI.encode_www_form(params)}"
end

def get_total(options = {})
query_url = get_query_url(options.merge(rows: 0))
result = Maremma.get(query_url, options)
message = result.body.dig("data", "message").to_h
[message["total-results"].to_i, message["next-cursor"]]
"#{ENV['CROSSREF_QUERY_URL']}/relationships?#{URI.encode_www_form(params)}"
end

def queue_jobs(options = {})
options[:offset] = options[:offset].to_i || 0
options[:rows] = options[:rows].presence || job_batch_size
options[:from_date] =
options[:from_date].presence || (Time.now.to_date - 1.day).iso8601
options[:until_date] =
options[:until_date].presence || Time.now.to_date.iso8601
options[:content_type] = "json"

total, cursor = get_total(options)

if total.positive?
# walk through results paginated via cursor
total_pages = (total.to_f / job_batch_size).ceil
error_total = 0

(0...total_pages).each do |page|
options[:offset] = page * job_batch_size
options[:total] = total
options[:cursor] = cursor
options[:from_date] = options[:from_date].presence || (Time.now.to_date - 1.day).iso8601
options[:until_date] = options[:until_date].presence || Time.now.to_date.iso8601

count, cursor = process_data(options)
total = count

if count.zero?
text = "No DOIs updated #{options[:from_date]} - #{options[:until_date]}."
else
while count.positive? && cursor.present?
count, cursor = process_data(options)
options[:cursor] = cursor
total += count
end

text = "Queued import for #{total} DOIs updated #{options[:from_date]} - #{options[:until_date]}."
else
text = "No DOIs updated #{options[:from_date]} - #{options[:until_date]}."
end

Rails.logger.info "[Event Data] #{text}"
Rails.logger.info("[Event Data] #{text}")
send_slack_notification(options, text, total)

# send slack notification
options[:level] = if total.zero?
"warning"
elsif error_total.positive?
"danger"
else
"good"
end
options[:title] = "Report for #{source_id}"
if options[:slack_webhook_url].present?
send_notification_to_slack(text,
options)
end

# return number of works queued
total
end

def push_data(result, _options = {})
return result.body.fetch("errors") if result.body.fetch("errors",
nil).present?
errors = result.body.fetch("errors", nil)

items = result.body.dig("data", "message", "events")
# Rails.logger.info "Extracting related identifiers for #{items.size} DOIs updated from #{options[:from_date]} until #{options[:until_date]}."
return errors if errors.present?

Array.wrap(items).map do |item|
CrossrefImportJob.perform_later(item)
end
items = Array.wrap(result
.body
.dig("data", "message", "relationships")
.select { |item| allowed_relationship_types.includes?(item["relationship_type"].to_s.dasherize) })

items.map { |item| CrossrefImportJob.perform_later(item) }

[items.length, result.body.dig("data", "message", "next-cursor")]
end

def self.push_item(item)
subj = cached_crossref_response(item["subj_id"])
obj = cached_datacite_response(item["obj_id"])

if ENV["STAFF_ADMIN_TOKEN"].present?
push_url = ENV["LAGOTTINO_URL"] + "/events/#{item['id']}"

data = {
"data" => {
"id" => item["id"],
"type" => "events",
"attributes" => {
"messageAction" => item["action"],
"subjId" => item["subj_id"],
"objId" => item["obj_id"],
"relationTypeId" => item["relation_type_id"].to_s.dasherize,
"sourceId" => item["source_id"].to_s.dasherize,
"sourceToken" => item["source_token"],
"occurredAt" => item["occurred_at"],
"timestamp" => item["timestamp"],
"license" => item["license"],
"subj" => subj,
"obj" => obj,
},
return if ENV["STAFF_ADMIN_TOKEN"].blank?

uuid = SecureRandom.uuid
subj_id = item.fetch("subject", "id")
obj_id = item.fetch("object", "id")
subj = cached_crossref_response(subj_id)
obj = cached_datacite_response(obj_id)
push_url = ENV["LAGOTTINO_URL"] + "/events/#{uuid}"

data = {
"data" => {
"id" => uuid,
"type" => "events",
"attributes" => {
"messageAction" => "add",
"subjId" => subj_id,
"objId" => obj_id,
"relationTypeId" => item["relationship_type"].to_s.dasherize,
"sourceId" => "crossref",
"sourceToken" => item["source_token"], # well this may very well be null now. it might be used to do a lookup on the crossref side.
"occurredAt" => item["updated_time"],
"timestamp" => Time.now.utc,
"license" => "https://creativecommons.org/publicdomain/zero/1.0/",
"subj" => subj,
"obj" => obj,
},
}

response = Maremma.put(push_url, data: data.to_json,
bearer: ENV["STAFF_ADMIN_TOKEN"],
content_type: "application/vnd.api+json",
accept: "application/vnd.api+json; version=2")

if [200, 201].include?(response.status)
Rails.logger.info "[Event Data] #{item['subj_id']} #{item['relation_type_id']} #{item['obj_id']} pushed to Event Data service."
elsif response.status == 409
Rails.logger.info "[Event Data] #{item['subj_id']} #{item['relation_type_id']} #{item['obj_id']} already pushed to Event Data service."
elsif response.body["errors"].present?
Rails.logger.error "[Event Data] #{item['subj_id']} #{item['relation_type_id']} #{item['obj_id']} had an error: #{response.body['errors']}"
Rails.logger.error data.inspect
end
},
}

response = Maremma.put(
push_url,
data: data.to_json,
bearer: ENV["STAFF_ADMIN_TOKEN"],
content_type: "application/vnd.api+json",
accept: "application/vnd.api+json; version=2")

if [200, 201].include?(response.status)
Rails.logger.info "[Event Data] #{item['subj_id']} #{item['relation_type_id']} #{item['obj_id']} pushed to Event Data service."
elsif response.status == 409
Rails.logger.info "[Event Data] #{item['subj_id']} #{item['relation_type_id']} #{item['obj_id']} already pushed to Event Data service."
elsif response.body["errors"].present?
Rails.logger.error "[Event Data] #{item['subj_id']} #{item['relation_type_id']} #{item['obj_id']} had an error: #{response.body['errors']}"
Rails.logger.error data.inspect
end
end

protected

def self.import_by_month_dates(options={})
{
from_date: (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month,
until_date: (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month
}
end

def self.import_dates(options={})
{
from_date: options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current - 1.day,
until_date: options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current
}
end

private

def send_slack_notification(options, text, total)
options[:level] = total.zero? ? "warning" : "good"
options[:title] = "Report for #{source_id}"

if options[:slack_webhook_url].present?
send_notification_to_slack(text, options)
end
end
end
2 changes: 1 addition & 1 deletion config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
ENV["LAGOTTINO_URL"] ||= "https://api.stage.datacite.org"
ENV["SASHIMI_QUERY_URL"] ||= "https://api.stage.datacite.org"
ENV["EVENTDATA_URL"] ||= "https://bus-staging.eventdata.crossref.org"
ENV["CROSSREF_QUERY_URL"] ||= "https://api.eventdata.crossref.org"
ENV["CROSSREF_QUERY_URL"] ||= "https://api.crossref.org/beta"
ENV["TRUSTED_IP"] ||= "10.0.40.1"
ENV["SLACK_WEBHOOK_URL"] ||= ""
ENV["USER_AGENT"] ||= "Mozilla/5.0 (compatible; Maremma/#{Maremma::VERSION}; mailto:[email protected])"
Expand Down
Loading