Skip to content
This repository was archived by the owner on May 19, 2025. It is now read-only.
This repository was archived by the owner on May 19, 2025. It is now read-only.

Regarding Parallelizing CatSim #18

Open
@JPGlaser

Description

@JPGlaser

Hey All,

This is Joe again. So I have been working on creating a bulk python script that can compute instance catalogs through an array of observations, hopefully through parallel processing. I have included the current state of the script below.

However, I am running into a problem with the SQL galaxy database portion of my instance catalog writer. When I run the code on two processors, I am returned an error:

Traceback (most recent call last):
  File "genLSSTAGNData_Par.py", line 161, in <module>
    pool.map(CreateCatParallel, Pointing)
  File "/Users/jglaser/lsst/DarwinX86/anaconda/master-g68783b1848/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/Users/jglaser/lsst/DarwinX86/anaconda/master-g68783b1848/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
sqlalchemy.exc.OperationalError: (OperationalError) (20020, 'DB-Lib error message 20020, severity 9:\nBad token from the server: Datastream processing out of sync\n') "EXECUTE [LSST].[dbo].[GalaxySearch2015]\n                   @ApertureStr = 'REGION CIRCLE J2000 25.052981 -26.341448 3.000000', @ColumnNames = 'av_b as internalAvBulge,sedname_agn as sedFilenameAgn,dec as decJ2000,redshift as redshift,galid as galid,AGNID as AGNID,varParamStr as varParamStr,ra as raJ2000,av_d as internalAvDisk,magnorm_bulge as magNormBulge,magnorm_agn as magNormAgn,magnorm_disk as magNormDisk',\n                   @ComponentSubset = 'ALL' , @WhereClause = '(sedname_agn IS NOT NULL) AND (magnorm_agn < 24.0)'" {}

Since my personal experience with SQL is quite limited, I was wondering if any of you guys/gals know what this error is being caused by. A google search is rather inconclusive other than a possibility of too many connections being generated (though the limit should be somewhere around 32K concurrent connections). Do we know if the InstanceCatalog class closes its SQL connection each time it is ran?

~ Joe


import numpy as np
import scipy
import matplotlib.pyplot as plt
import time
import sys

###############################################################################
#                      Set Up Observation Variables                           #
###############################################################################
global AperatureRadius

AperatureType = 'circle'
AperatureRadius = 0.1 #FoV in degrees
#global AperatureRadius = 1.75 #LSST's Actual FoV in degrees
SearchRegionRA = (25.0,30.0)
SearchRegionDec = (-30.0,-25.0)
SearchAirmass = (1.0,1.5)
DesiredFilter = None


###############################################################################
#                     Search DB for the Observations                          #
###############################################################################
import eups
import os
from lsst.sims.catUtils.utils import ObservationMetaDataGenerator
from lsst.sims.catalogs.generation.db import ObservationMetaData
#help(ObservationMetaDataGenerator)

opsimdb = os.path.join(eups.productDir('sims_data'),'OpSimData','enigma_1189_sqlite.db')
gen = ObservationMetaDataGenerator(driver='sqlite', database=opsimdb)
SimObData = gen.getObservationMetaData(boundType=AperatureType, boundLength=AperatureRadius, 
                                       fieldRA=SearchRegionRA, fieldDec=SearchRegionDec,
                                       airmass=SearchAirmass, telescopeFilter=DesiredFilter)
#print SimsObData[0].__dict__


###############################################################################
#                  Output Basic Table of Returned Query                       #
###############################################################################
from prettytable import PrettyTable

NumOfObservations = len(SimObData)
UniquePointings = list({(o.unrefractedRA,o.unrefractedDec) for o in SimObData})
NumOfPointings = len(UniquePointings)
print 'Number of Unique Pointings:', NumOfPointings,'\n'
table2 = PrettyTable(["Pointing RA","Pointing Dec"])
for x in UniquePointings:
    table2.add_row([x[0], x[1]])
print table2


###############################################################################
#              Restructure Returned Query to Correct Format                   #
###############################################################################
ObMetaData = [[] for _ in xrange(NumOfPointings)]
for i in xrange(NumOfPointings):
    for o in SimObData:
        if UniquePointings[i][0] == o.unrefractedRA and UniquePointings[i][1] == o.unrefractedDec:
            ObMetaData[i].append(o)


###############################################################################
#                   Connect to the UoW Galaxy Database                        #
###############################################################################
from lsst.sims.catUtils.baseCatalogModels import GalaxyTileObj
galaxyDB = GalaxyTileObj()
#galaxyDB.show_mapped_columns()


###############################################################################
#                 Define Instance Catalog Class for AGNs                      #
###############################################################################
from lsst.sims.catalogs.measures.instance import InstanceCatalog, compound
from lsst.sims.photUtils import PhotometryGalaxies, VariabilityGalaxies

class FastVarAgnCat(InstanceCatalog, PhotometryGalaxies, VariabilityGalaxies):
    cannot_be_null = ['uAgn'] #Locating only the AGNs in the FoV.
    column_outputs = ['AGNID', 'redshift', 'raJ2000', 'decJ2000', 'ObsAgn', 'sigma_ObsAgn']
    transformations = {'raJ2000':np.degrees, 'decJ2000':np.degrees}

    #Only Append the Column of AGN Magnitudes in the Observed Bandpass
    @compound('ObsAgn', 'sigma_ObsAgn')
    def get_PhotometryCols(self):
        mag = None
        sigma = None
        if self.obs_metadata.bandpass is not None:
            if not hasattr(self.obs_metadata.bandpass, '__iter__'):
                Filter = self.obs_metadata.bandpass
                mag = self.column_by_name(Filter+'Agn')
                sigma = self.column_by_name('sigma_'+Filter+'Agn')
        return np.array([mag, sigma])

    #Don't Calculate Photometry for the Galaxy Bulge or Disk
    @compound('sedFilenameBulge', 'sedFilenameDisk')
    def get_QuickSED(self):
        ra = self.column_by_name('raJ2000') #Finding how many objects are in the column.
        names = []
        for r in ra:
            names.append('None') #Tricking the catalog into thinking these galaxies don't have bulges or disks.
        return np.array([names, names])


###############################################################################
#                  Define the Observation Header Writer                       #
###############################################################################
def ObsHeader(IC, file_handle):
    ObsHeaderTransformations = {'Unrefracted_RA':np.degrees, 'Unrefracted_Dec':np.degrees,
                               'Opsim_moonra':np.degrees, 'Opsim_moondec':np.degrees,
                               'Opsim_rotskypos':np.degrees, 'Opsim_rottelpos':np.degrees,
                               'Opsim_sunalt':np.degrees, 'Opsim_moonalt':np.degrees,
                               'Opsim_dist2moon':np.degrees, 'Opsim_altitude':np.degrees,
                               'Opsim_azimuth':np.degrees
                               }
    md = IC.obs_metadata.phoSimMetaData
    for k in md:
        if k in ObsHeaderTransformations.keys():
            file_handle.write(str(k)+" "+str(ObsHeaderTransformations[k](md[k][0]))+"\n")
        else:
            file_handle.write(str(k)+" "+str(md[k][0])+"\n")


###############################################################################
#            Define the Catalog Writer for Parallel Processes                 #
###############################################################################
def CreateCatParallel(Observation):
    PointingDir = 'RA[%.2f]_DEC[%.2f]' %(Observation.unrefractedRA, Observation.unrefractedDec)
    WorkingDir = PrimaryDir+'/'+CatDir+'/'+PointingDir
    if not os.path.exists(WorkingDir):
        os.makedirs(WorkingDir)
    CatFileName = 'LSSTAGN_%.5f.txt' %(Observation.mjd)
    if os.path.isfile(WorkingDir+'/'+CatFileName) == False:
        SQLRules = '(sedname_agn IS NOT NULL) AND (magnorm_agn < 24.0)'
        variableAgn = FastVarAgnCat(galaxyDB, obs_metadata=Observation, constraint=SQLRules)
        # Writing the Observational Meta-Data Header
        with open(WorkingDir+'/'+CatFileName, "w") as fh:
            ObsHeader(variableAgn, fh)
            fh.write("\n")
            fh.close()
        variableAgn.write_catalog(WorkingDir+'/'+CatFileName, write_mode='a')


###############################################################################
#              Create the Catalogs with Parallel Processing                   #
###############################################################################
from joblib import Parallel, delayed  
import multiprocessing

global PrimaryDir, CatDir
PrimaryDir = 'TestCats/QuickSurvey_R%.2f' %(AperatureRadius)
CatDir = 'Catalogs'

if not os.path.exists(PrimaryDir+'/'+CatDir):
        os.makedirs(PrimaryDir+'/'+CatDir)
start = time.clock()
for Pointing in ObMetaData:
    #NumUsedCores = multiprocessing.cpu_count()
    NumUsedCores = 2
    #Parallel(n_jobs=NumUsedCores, verbose=5)(delayed(CreateCatParallel)(o) for o in Pointing)
    pool = multiprocessing.Pool(processes=NumUsedCores)
    pool.map(CreateCatParallel, Pointing)
    pool.close()
end = time.clock()
print "Creating the catalogs took", end - start, "seconds."

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions