Skip to content

Commit

Permalink
refactor CSV parser to use foreach instead of reading whole file into…
Browse files Browse the repository at this point in the history
… memory
  • Loading branch information
Julie Allinson committed Jul 9, 2019
1 parent ade5183 commit db167e5
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 45 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ gem "blacklight", "~> 6.20.0"

group :development, :test do
gem 'solr_wrapper', '>= 0.3'
gem 'database_cleaner'
end

gem 'rsolr', '>= 1.0'
Expand Down
4 changes: 3 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ GEM
concurrent-ruby (1.1.5)
connection_pool (2.2.2)
crass (1.0.4)
database_cleaner (1.7.0)
declarative (0.0.10)
declarative-option (0.1.0)
deprecation (1.0.0)
Expand Down Expand Up @@ -710,6 +711,7 @@ DEPENDENCIES
bootstrap-sass (~> 3.0)
bulkrax!
byebug
database_cleaner
factory_bot_rails
hyrax (~> 2.3)
oai
Expand All @@ -721,4 +723,4 @@ DEPENDENCIES
twitter-typeahead-rails (= 0.11.1.pre.corejavascript)

BUNDLED WITH
1.17.2
1.17.3
23 changes: 12 additions & 11 deletions app/factories/bulkrax/object_factory.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# TODO require 'importer/log_subscriber'
# TODO: require 'importer/log_subscriber'
module Bulkrax
class ObjectFactory
extend ActiveModel::Callbacks
Expand All @@ -16,9 +16,9 @@ def initialize(attributes, files_dir = nil, files = [], user = nil)
def run
arg_hash = { id: attributes[:id], name: 'UPDATE', klass: klass }
@object = find
if @object
@object.reindex_extent = Hyrax::Adapters::NestingIndexAdapter::LIMITED_REINDEX
ActiveSupport::Notifications.instrument('import.importer', arg_hash) { update }
if @object
@object.reindex_extent = Hyrax::Adapters::NestingIndexAdapter::LIMITED_REINDEX
ActiveSupport::Notifications.instrument('import.importer', arg_hash) { update }
else
ActiveSupport::Notifications.instrument('import.importer', arg_hash.merge(name: 'CREATE')) { create }
end
Expand All @@ -28,6 +28,7 @@ def run

def update
raise "Object doesn't exist" unless object

run_callbacks(:save) do
work_actor.update(environment(update_attributes))
end
Expand Down Expand Up @@ -106,7 +107,7 @@ def create_collection(attrs)
# a way that is compatible with how the factory needs them.
def transform_attributes
attributes.slice(*permitted_attributes)
.merge(file_attributes)
.merge(file_attributes)
end

# Find existing files or upload new files. This assumes a Work will have unique file titles;
Expand All @@ -117,7 +118,7 @@ def transform_attributes
# support multiple files; ensure attributes[:file] is an Array
def upload_ids
attributes[:file] = Array.wrap(attributes[:file])
work_files_titles = object.file_sets.map(&:title).map(&:first) if object.present? && object.file_sets.present?
work_files_titles = object.file_sets.map { |t| t.title.to_a }.flatten if object.present? && object.file_sets.present?
work_files_titles && (work_files_titles & attributes[:file]).present? ? [] : import_files
end

Expand All @@ -130,21 +131,21 @@ def file_attributes

def new_remote_files
@new_remote_files ||= if attributes[:remote_files].present? && object.present? && object.file_sets.present?
attributes[:remote_files].select do |file|
existing = object.file_sets.detect {|f| f.import_url && f.import_url == file[:url]}
!existing
attributes[:remote_files].reject do |file|
existing = object.file_sets.detect { |f| f.import_url && f.import_url == file[:url] }
existing
end
elsif attributes[:remote_files].present?
attributes[:remote_files]
end
end

def file_paths
attributes[:file].map { |file_name| File.join(files_directory, file_name) } if attributes[:file]
attributes[:file]&.map { |file_name| File.join(files_directory, file_name) }
end

def import_files
file_paths.map { | path | import_file(path) }
file_paths.map { |path| import_file(path) }
end

def import_file(path)
Expand Down
37 changes: 15 additions & 22 deletions app/parsers/bulkrax/csv_parser.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Bulkrax
class CsvParser < ApplicationParser
delegate :errors, to: :importer
delegate :errors, :increment_counters, :parser_fields, to: :importer

def self.parser_fields
{
Expand All @@ -10,44 +10,35 @@ def self.parser_fields
}
end

def initialize(importer)
super
def run
create_works
end

def records(_opts = {})
csv = CSV.open(
importer.parser_fields['csv_path'],
CSV.foreach(
parser_fields['csv_path'],
headers: true,
header_converters: :symbol,
encoding: 'utf-8'
)
csv_data = csv.read
raise StandardError, 'Identifier column is required' if csv_data.headers.include?(:identifier) == false

# skip rows without an identifier; remove any nil values with compact
csv_data.map { |row| row.to_h.compact! unless row[:identifier].blank? }.reject(&:blank?)
rescue StandardError => e
errors.add(:base, e.class.to_s.to_sym, message: e.message)
[]
end

def run
create_works
end

def create_works
records.each_with_index do |record, index|
records.with_index(0) do |record, index|
next if record[:identifier].blank?
break if !limit.nil? && index >= limit

seen[record[:identifier]] = true
new_entry = entry_class.where(importer: importer, identifier: record[:identifier], raw_metadata: record).first_or_create!
new_entry = entry_class.where(importer: importer, identifier: record[:identifier], raw_metadata: record.to_h.compact).first_or_create!
ImportWorkJob.perform_later(new_entry.id, importer.current_importer_run.id)
importer.increment_counters(index)
increment_counters(index)
end
rescue StandardError => e
errors.add(:base, e.class.to_s.to_sym, message: e.message)
end

def files_path
arr = importer.parser_fields['csv_path'].split('/')
arr = parser_fields['csv_path'].split('/')
arr.pop
arr << 'files'
arr.join('/')
Expand All @@ -57,8 +48,10 @@ def entry_class
CsvEntry
end

# See https://stackoverflow.com/questions/2650517/count-the-number-of-lines-in-a-file-without-reading-entire-file-into-memory
# (old but thorough analysis)
def total
@total ||= records.count
@total ||= `wc -l #{importer.parser_fields['csv_path']}`.to_i
rescue StandardError
@total = 0
end
Expand Down
2 changes: 1 addition & 1 deletion spec/factories/bulkrax/entries.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FactoryBot.define do
factory :bulkrax_entry, class: 'Entry' do
factory :bulkrax_entry, class: 'Bulkrax::Entry' do
identifier { "MyString" }
type { "" }
importer { nil }
Expand Down
3 changes: 2 additions & 1 deletion spec/fixtures/csv/good.csv
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
identifier,title,
1,Lovely Title
1,Lovely Title
2,Another Title
25 changes: 16 additions & 9 deletions spec/parsers/bulkrax/csv_parser_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,24 @@
module Bulkrax
RSpec.describe CsvParser do
describe '#create_works' do
let(:importer) { FactoryBot.build(:bulkrax_importer_csv) }
let(:importer) { FactoryBot.create(:bulkrax_importer_csv) }
let(:entry) { FactoryBot.create(:bulkrax_entry, importer: importer) }
subject { described_class.new(importer) }

before(:each) do
allow(Bulkrax::CsvEntry).to receive_message_chain(:where, :first_or_create!).and_return(entry)
allow(entry).to receive(:id)
allow(Bulkrax::ImportWorkJob).to receive(:perform_later)
end

context 'with malformed CSV' do
before(:each) do
importer.parser_fields = { csv_path: './spec/fixtures/csv/malformed.csv' }
end

it 'returns and empty array, and records the error on the importer' do
subject.records
subject.create_works
expect(importer.errors.details[:base].first[:error]).to eq('CSV::MalformedCSVError'.to_sym)
expect(subject.records).to eq([])
end
end

Expand All @@ -23,10 +29,9 @@ module Bulkrax
importer.parser_fields = { csv_path: './spec/fixtures/csv/bad.csv' }
end

it 'returns and empty array, and records the error on the importer' do
subject.records
expect(importer.errors[:base].first).to eq('Identifier column is required')
expect(subject.records).to eq([])
it 'skips all of the lines' do
expect(subject.importer).not_to receive(:increment_counters)
subject.create_works
end
end

Expand All @@ -36,7 +41,8 @@ module Bulkrax
end

it 'skips the bad line' do
expect(subject.records).to eq([{ identifier: '2', title: 'Another Title' }])
expect(subject).to receive(:increment_counters).once
subject.create_works
end
end

Expand All @@ -46,7 +52,8 @@ module Bulkrax
end

it 'processes the line' do
expect(subject.records).to eq([{ identifier: '1', title: 'Lovely Title' }])
expect(subject).to receive(:increment_counters).twice
subject.create_works
end
end
end
Expand Down
4 changes: 4 additions & 0 deletions spec/rails_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
RSpec.configure do |config|
config.include FactoryBot::Syntax::Methods

config.after(:each) do
DatabaseCleaner.clean
end

# Remove this line if you're not using ActiveRecord or ActiveRecord fixtures
config.fixture_path = "#{::Rails.root}/spec/fixtures"

Expand Down

0 comments on commit db167e5

Please sign in to comment.