-
Notifications
You must be signed in to change notification settings - Fork 140
Description
Hopefully this resolves itself, but as I am running the load_reference_data script to import test data into my local Postgres instance, I encountered the issue of https://geonames.usgs.gov/docs/federalcodes/NationalFedCodes.zip returning with a 503 error code:
urllib.error.HTTPError: HTTP Error 503: Service Unavailable
The same happens when I use my browser, with the message: "The requested service is temporarily unavailable. Please try later."
I will certainly try again later. But in the meantime, I am curious, is there another source for this data that will work as a backup or alternative? Has the resource moved perhaps? And does anyone have a copy of this zip they would be able to share so I can slot it in?
Edit:
It seems the resource may have moved here. I will try this out with FedCodes_National_Text.zip when I clock back in tomorrow.
I am able to get the above file to work, but only with the following code changes:
In usaspending_api/references/management/commands/load_city_county_state_code.py
, add the following three functions somewhere so they can be accessed in the class function, but they don't reference any class members or methods:
def translate_column(column):
column_map = {
"primary_latitude": "prim_lat_dec",
"primary_longitude": "prim_long_dec",
"state_alpha": "state_name",
}
if column in column_map:
return column_map[column]
else:
return column
def add_to_existing(to_add, existing) -> dict:
feature_id = to_add.get(translate_column("feature_id"))
state_alpha = to_add.get(translate_column("state_alpha"))
county_sequence = to_add.get(translate_column("county_sequence"))
county_numeric = to_add.get(translate_column("county_numeric"))
if state_alpha not in to_add:
existing[state_alpha] = {}
if county_sequence not in existing[state_alpha]:
existing[state_alpha][county_sequence] = {}
if county_numeric not in existing[state_alpha][county_sequence]:
existing[state_alpha][county_sequence][county_numeric] = {}
if feature_id not in existing[state_alpha][county_sequence][county_numeric]:
existing[state_alpha][county_sequence][county_numeric][feature_id] = True
return existing
def check_existing(to_test, existing) -> bool:
feature_id = to_test.get(translate_column("feature_id"))
state_alpha = to_test.get(translate_column("state_alpha"))
county_sequence = to_test.get(translate_column("county_sequence"))
county_numeric = to_test.get(translate_column("county_numeric"))
return (
existing.get(state_alpha, {})
.get(county_sequence, {})
.get(county_numeric, {})
.get(feature_id, False)
)
Then add the following function to the end of the Command class:
@staticmethod
@contextmanager
def _filter_columns(csv_file, columns):
with Timer(f"Filtering CSV file to only needed columns"):
with TemporaryDirectory() as temp_dir:
temp_file_path = str(Path(temp_dir) / "local_file_copy_2")
with open(temp_file_path, 'w', newline='\n') as new_csv:
writer = csv.writer(new_csv, delimiter='|')
with open(csv_file, encoding="utf-8-sig", newline='') as original:
reader = csv.DictReader(original, delimiter='|')
header = next(reader)
for column in columns:
if tcol := translate_column(column) not in header:
context = ""
if column != tcol:
context = f" or '{tcol}'"
raise RuntimeError(f"Column '{column}'{context} not found in csv")
existing_data = {}
for row in reader:
new_row_values = []
if check_existing(row, existing_data):
continue
existing_data = add_to_existing(row, existing_data)
for column in columns:
translated_column = translate_column(column)
if translated_column in row:
new_row_values.append(row[translated_column])
else:
new_row_values.append("")
writer.writerow(new_row_values)
yield temp_file_path
The csv
package will also need to be imported at the top of the file. Then replace the existing _import_input_file
method in Command
with:
def _import_input_file(self):
import_command = (
f'copy "{self.staging_table_name}" (feature_id, feature_name, feature_class, census_code, '
f"census_class_code, gsa_code, opm_code, state_numeric, state_alpha, county_sequence, county_numeric, "
f"county_name, primary_latitude, primary_longitude, date_created, date_edited) from stdin with "
f"(format csv, header, delimiter '|')"
)
cols_to_filter = ["feature_id", "feature_name", "feature_class",
"census_code", "census_class_code", "gsa_code", "opm_code",
"state_numeric", "state_alpha", "county_sequence", "county_numeric",
"county_name", "primary_latitude", "primary_longitude", "date_created", "date_edited"]
with self._filter_columns(self.working_file, cols_to_filter) as filtered_csv:
with Timer("Importing file to staging table"):
with connection.cursor() as cursor:
with open(filtered_csv, encoding="utf-8-sig") as csv_file:
csv_file.seek(0)
cursor.cursor.copy_expert(import_command, csv_file, size=10485760) # 10MB
logger.info(f"{cursor.cursor.rowcount:,} rows imported")
Then, in usaspending_api/references/management/commands/load_reference_data.py
, change line 48 to reference the new file: https://prd-tnm.s3.amazonaws.com/StagedProducts/GeographicNames/FederalCodes/FedCodes_National_Text.zip
Then, in usaspending_api/common/zip.py
, we need to replace the existing function to allow us to target a specific file within a Zip archive:
def extract_single_file_zip(
zip_file_path,
destination_directory_path,
*,
target_file = None,
):
"""
Accepts a zip file path and destination directory path then extracts a single file from zip file
into the destination directory. ZIP archive must contain one and only one file.
Returns the file path of the extracted file.
"""
with ZipFile(zip_file_path) as zip_file:
zip_files = zip_file.namelist()
print(f"ZIP files: {zip_files}")
file_count = len(zip_files)
file_index = 0
if target_file:
try:
file_index = zip_files.index(target_file)
except ValueError:
raise RuntimeError(
f"Could not find target file '{target_file}' in zip archive"
)
else:
if file_count < 1:
raise RuntimeError("No files found in zip archive")
if file_count > 1:
raise NotImplementedError(
"Expected no more than one file in zip archive"
)
return zip_file.extract(zip_files[file_index], path=destination_directory_path)
Once that's done, back in usaspending_api/references/management/commands/load_city_county_state_code.py
, we need to leverage the new keyword argument. The _unzip_file
method becomes:
@staticmethod
@contextmanager
def _unzip_file(file_path):
"""
ZIP file context manager. If the file pointed to by file_path is a ZIP file, extracts file to a
temporary location, yields, and cleans up afterwards. Otherwise, effectively does nothing.
"""
if zipfile.is_zipfile(file_path):
with TemporaryDirectory() as temp_dir:
with Timer("Unzip file"):
unzipped_file_path = extract_single_file_zip(
file_path,
temp_dir,
target_file="Text/FederalCodes_National.txt")
yield unzipped_file_path
else:
yield file_path
I'm adding the code here rather than submitting a PR just because I don't presume to know that the 503 is permanent or that this is the best solution with the existing codebase, but it does allow me to continue in my work so, hope that helps someone.