diff --git a/src/pfs_design_tool/pointing_utils/dbutils.py b/src/pfs_design_tool/pointing_utils/dbutils.py index 2cf2d6b..bf0c3af 100644 --- a/src/pfs_design_tool/pointing_utils/dbutils.py +++ b/src/pfs_design_tool/pointing_utils/dbutils.py @@ -51,115 +51,117 @@ def generate_targets_from_targetdb( max_priority=None, ): db = connect_targetdb(conf) + # Use try/finally to guarantee the connection is closed on any exit path, + # including exceptions raised during query execution or data processing. + try: + search_radius = fp_radius_degree * fp_fudge_factor - search_radius = fp_radius_degree * fp_fudge_factor - - if extra_where is None: - extra_where = "" + if extra_where is None: + extra_where = "" - if "m" in arms: - if "r" not in arms: - extra_where = "AND is_medium_resolution IS TRUE" - else: - if "r" in arms: - extra_where = "AND is_medium_resolution IS FALSE" + if "m" in arms: + if "r" not in arms: + extra_where = "AND is_medium_resolution IS TRUE" + else: + if "r" in arms: + extra_where = "AND is_medium_resolution IS FALSE" - query_string = f"""SELECT ob_code,obj_id,c.input_catalog_id,ra,dec,epoch,priority,pmra,pmdec,parallax,effective_exptime,single_exptime,qa_reference_arm,is_medium_resolution,proposal.proposal_id,rank,grade,allocated_time_lr+allocated_time_mr as \"allocated_time\",allocated_time_lr,allocated_time_mr,filter_g,filter_r,filter_i,filter_z,filter_y,psf_flux_g,psf_flux_r,psf_flux_i,psf_flux_z,psf_flux_y,psf_flux_error_g,psf_flux_error_r,psf_flux_error_i,psf_flux_error_z,psf_flux_error_y,total_flux_g,total_flux_r,total_flux_i,total_flux_z,total_flux_y,total_flux_error_g,total_flux_error_r,total_flux_error_i,total_flux_error_z,total_flux_error_y + query_string = f"""SELECT ob_code,obj_id,c.input_catalog_id,ra,dec,epoch,priority,pmra,pmdec,parallax,effective_exptime,single_exptime,qa_reference_arm,is_medium_resolution,proposal.proposal_id,rank,grade,allocated_time_lr+allocated_time_mr as \"allocated_time\",allocated_time_lr,allocated_time_mr,filter_g,filter_r,filter_i,filter_z,filter_y,psf_flux_g,psf_flux_r,psf_flux_i,psf_flux_z,psf_flux_y,psf_flux_error_g,psf_flux_error_r,psf_flux_error_i,psf_flux_error_z,psf_flux_error_y,total_flux_g,total_flux_r,total_flux_i,total_flux_z,total_flux_y,total_flux_error_g,total_flux_error_r,total_flux_error_i,total_flux_error_z,total_flux_error_y FROM {tablename} JOIN input_catalog AS c ON {tablename}.input_catalog_id = c.input_catalog_id JOIN proposal ON {tablename}.proposal_id=proposal.proposal_id WHERE q3c_radial_query(ra, dec, {ra}, {dec}, {search_radius}) AND c.active """ - if extra_where is not None: - query_string += extra_where + if extra_where is not None: + query_string += extra_where + + if input_catalog is not None: + query_string += ( + " AND (" + + "OR".join([f" input_catalog_id={v} " for v in input_catalog]) + + ")" + ) - if input_catalog is not None: - query_string += ( - " AND (" - + "OR".join([f" input_catalog_id={v} " for v in input_catalog]) - + ")" - ) + if proposal_id is not None: + query_string += ( + " AND (" + + "OR".join([f" {tablename}.proposal_id='{v}' " for v in proposal_id]) + + ")" + ) - if proposal_id is not None: - query_string += ( - " AND (" - + "OR".join([f" {tablename}.proposal_id='{v}' " for v in proposal_id]) - + ")" - ) + if max_priority is not None: + query_string += f" AND priority <= {max_priority}" - if max_priority is not None: - query_string += f" AND priority <= {max_priority}" + query_string += ";" - query_string += ";" + # logger.info(f"Query string for targets:\n{query_string}") - # logger.info(f"Query string for targets:\n{query_string}") + df = pd.DataFrame() - df = pd.DataFrame() + t_begin = time.time() + df = db.fetch_query(query_string) + t_end = time.time() + # logger.info(f"Time spent for querying (s): {t_end - t_begin:.3f}") - t_begin = time.time() - df = db.fetch_query(query_string) - t_end = time.time() - # logger.info(f"Time spent for querying (s): {t_end - t_begin:.3f}") - - # keep user fillers (grade BCF) for queue only; - if conf["ppp"]["mode"] == "classic": - mask_keep = (df["proposal_id"].str.startswith("S25A")) & (df["grade"].isin(["G"])) - else: - mask_keep = ( - ((df["proposal_id"].str.startswith("S26A")) & (df["grade"].isin(["C", "F"]))) - | ((df["proposal_id"].str.startswith("S25A")) & (df["grade"].isin(["G"]))) - ) - - df = df.loc[mask_keep].reset_index(drop=True) + # keep user fillers (grade BCF) for queue only; + if conf["ppp"]["mode"] == "classic": + mask_keep = (df["proposal_id"].str.startswith("S25A")) & (df["grade"].isin(["G"])) + else: + mask_keep = ( + ((df["proposal_id"].str.startswith("S26A")) & (df["grade"].isin(["C", "F"]))) + | ((df["proposal_id"].str.startswith("S25A")) & (df["grade"].isin(["G"]))) + ) - df.loc[df["pmra"].isna(), "pmra"] = 0.0 - df.loc[df["pmdec"].isna(), "pmdec"] = 0.0 - df.loc[df["parallax"].isna(), "parallax"] = 1.0e-7 - df.loc[df["rank"] < 0, "rank"] = 10.0 # give highest rank to classic targets - # logger.info(f"Fetched target DataFrame: \n{df}") + df = df.loc[mask_keep].reset_index(drop=True) - if force_priority is not None: - df["priority"] = force_priority + df.loc[df["pmra"].isna(), "pmra"] = 0.0 + df.loc[df["pmdec"].isna(), "pmdec"] = 0.0 + df.loc[df["parallax"].isna(), "parallax"] = 1.0e-7 + df.loc[df["rank"] < 0, "rank"] = 10.0 # give highest rank to classic targets + # logger.info(f"Fetched target DataFrame: \n{df}") - # convert mag limits to flux (nJy) - flux_max = (mag_min * u.ABmag).to(u.nJy).value - flux_min = (mag_max * u.ABmag).to(u.nJy).value - flux_limit_17mag = (17.0 * u.ABmag).to(u.nJy).value - - # --- build mask --- - # case 1: grade == "G" → flux in desired range - mask_g = (df["grade"] == "G") & df[mag_filter].between(flux_min, flux_max) - - # case 2: grade != "G" → none of the bands brighter than 17 mag - flux_cols = [ - "total_flux_g", - "total_flux_r", - "total_flux_i", - "total_flux_z", - "total_flux_y", - ] - # --- case 2: grade != "G" --- - # we build a per-row mask that depends on proposal_id - mask_not_g = np.zeros(len(df), dtype=bool) - - for i, (_, row) in enumerate(df.iterrows()): - if row["grade"] in ["B", "C", "F"]: - # interpret as fluxes → keep if all bands ≤ flux_limit_17mag - if not np.any( - [ - (row[col] is not None) - and np.isfinite(row[col]) - and (row[col] > flux_limit_17mag) - for col in flux_cols - ] - ): - mask_not_g[i] = True - else: - continue + if force_priority is not None: + df["priority"] = force_priority - # --- combine both --- - df = df[mask_g | mask_not_g].reset_index(drop=True) + # convert mag limits to flux (nJy) + flux_max = (mag_min * u.ABmag).to(u.nJy).value + flux_min = (mag_max * u.ABmag).to(u.nJy).value + flux_limit_17mag = (17.0 * u.ABmag).to(u.nJy).value - db.close() + # --- build mask --- + # case 1: grade == "G" → flux in desired range + mask_g = (df["grade"] == "G") & df[mag_filter].between(flux_min, flux_max) + + # case 2: grade != "G" → none of the bands brighter than 17 mag + flux_cols = [ + "total_flux_g", + "total_flux_r", + "total_flux_i", + "total_flux_z", + "total_flux_y", + ] + # --- case 2: grade != "G" --- + # we build a per-row mask that depends on proposal_id + mask_not_g = np.zeros(len(df), dtype=bool) + + for i, (_, row) in enumerate(df.iterrows()): + if row["grade"] in ["B", "C", "F"]: + # interpret as fluxes → keep if all bands ≤ flux_limit_17mag + if not np.any( + [ + (row[col] is not None) + and np.isfinite(row[col]) + and (row[col] > flux_limit_17mag) + for col in flux_cols + ] + ): + mask_not_g[i] = True + else: + continue + + # --- combine both --- + df = df[mask_g | mask_not_g].reset_index(drop=True) + finally: + db.close() return df @@ -197,112 +199,103 @@ def generate_fluxstds_from_targetdb( fluxstd_versions = None db = connect_targetdb(conf) + # Use try/finally to guarantee the connection is closed on any exit path, + # including exceptions raised during query execution or data processing. + try: + search_radius = fp_radius_degree * fp_fudge_factor - search_radius = fp_radius_degree * fp_fudge_factor - - query_string = f"""SELECT * + query_string = f"""SELECT * FROM {tablename} WHERE q3c_radial_query(ra, dec, {ra}, {dec}, {search_radius}) """ - if extra_where is None: - extra_where = "" + if extra_where is None: + extra_where = "" - filters = [] + filters = [] - if not ignore_prob_f_star: - filters.append(f"AND prob_f_star BETWEEN {min_prob_f_star} AND 1.0") + if not ignore_prob_f_star: + filters.append(f"AND prob_f_star BETWEEN {min_prob_f_star} AND 1.0") - if good_fluxstd: - filters.append("AND flags_dist IS FALSE") - filters.append("AND flags_ebv IS FALSE") - else: - if flags_dist: + if good_fluxstd: filters.append("AND flags_dist IS FALSE") - if flags_ebv: filters.append("AND flags_ebv IS FALSE") + else: + if flags_dist: + filters.append("AND flags_dist IS FALSE") + if flags_ebv: + filters.append("AND flags_ebv IS FALSE") - if select_by_flux: - filters.append(f"AND psf_flux_{mag_filter} BETWEEN {flux_min} AND {flux_max}") - else: - filters.append(f"AND psf_mag_{mag_filter} BETWEEN {mag_min} AND {mag_max}") - - if select_from_gaia: - filters.append(f"AND teff_gspphot BETWEEN {min_teff} AND {max_teff}") - else: - filters.append(f"AND teff_brutus BETWEEN {min_teff} AND {max_teff}") - - if fluxstd_versions is not None: - if isinstance(fluxstd_versions, (list, tuple, set, np.ndarray)): - fluxstd_version_list = [str(version) for version in fluxstd_versions] + if select_by_flux: + filters.append(f"AND psf_flux_{mag_filter} BETWEEN {flux_min} AND {flux_max}") else: - fluxstd_version_list = [str(fluxstd_versions)] + filters.append(f"AND psf_mag_{mag_filter} BETWEEN {mag_min} AND {mag_max}") - if len(fluxstd_version_list) == 1: - filters.append(f"AND (version = '{fluxstd_version_list[0]}')") + if select_from_gaia: + filters.append(f"AND teff_gspphot BETWEEN {min_teff} AND {max_teff}") else: - version_values = ", ".join( - f"'{version}'" for version in fluxstd_version_list - ) - filters.append(f"AND (version IN ({version_values}))") + filters.append(f"AND teff_brutus BETWEEN {min_teff} AND {max_teff}") - try: - numeric_versions = [float(version) for version in fluxstd_version_list] - if max(numeric_versions) >= 3.5: - filters.append("AND (is_gc_neighbor = False)") - filters.append("AND (is_dense_region = False)") - except (TypeError, ValueError): - pass + if fluxstd_versions is not None: + fluxstd_version = str(fluxstd_versions) + filters.append(f"AND (version = '{fluxstd_version}')") - query_string += extra_where - if filters: - query_string += "\n" + "\n".join(filters) + try: + if float(fluxstd_version) >= 3.5: + filters.append("AND (is_gc_neighbor = False)") + filters.append("AND (is_dense_region = False)") + except (TypeError, ValueError): + pass - query_string += ";" + query_string += extra_where + if filters: + query_string += "\n" + "\n".join(filters) - logger.info(f"Query string for fluxstd: \n{query_string}") + query_string += ";" - t_begin = time.time() - df = db.fetch_query(query_string) + logger.info(f"Query string for fluxstd: \n{query_string}") - if len(df) == 0 or dec < -25: - # select gaia fstar when no PS1 fstar is selected - flux_max = (mag_min * u.ABmag).to(u.nJy).value - flux_min = (mag_max * u.ABmag).to(u.nJy).value + t_begin = time.time() + df = db.fetch_query(query_string) - #query_string = f"""SELECT * - # FROM {tablename} - # WHERE q3c_radial_query(ra, dec, {ra}, {dec}, {search_radius}) - # AND (is_gc_neighbor=False) - # AND (is_dense_region=False) - # AND filter_r= 'g_gaia' - # AND is_fstar_gaia - # AND teff_gspphot BETWEEN {min_teff} AND {max_teff} - # AND psf_flux_r BETWEEN {flux_min} AND {flux_max} - # AND prob_f_star::text = 'NaN'; - # """ - query_string = f"""SELECT * - FROM {tablename} - WHERE q3c_radial_query(ra, dec, {ra}, {dec}, {search_radius}) - AND filter_r= 'g_gaia' - AND is_fstar_gaia - AND teff_gspphot BETWEEN {min_teff} AND {max_teff} - AND psf_flux_r BETWEEN {flux_min} AND {flux_max} - AND prob_f_star::text = 'NaN'; + if len(df) == 0 or dec < -25: + # select gaia fstar when no PS1 fstar is selected + flux_max = (mag_min * u.ABmag).to(u.nJy).value + flux_min = (mag_max * u.ABmag).to(u.nJy).value + + #query_string = f"""SELECT * + # FROM {tablename} + # WHERE q3c_radial_query(ra, dec, {ra}, {dec}, {search_radius}) + # AND (is_gc_neighbor=False) + # AND (is_dense_region=False) + # AND filter_r= 'g_gaia' + # AND is_fstar_gaia + # AND teff_gspphot BETWEEN {min_teff} AND {max_teff} + # AND psf_flux_r BETWEEN {flux_min} AND {flux_max} + # AND prob_f_star::text = 'NaN'; + # """ + query_string = f"""SELECT * + FROM {tablename} + WHERE q3c_radial_query(ra, dec, {ra}, {dec}, {search_radius}) + AND filter_r= 'g_gaia' + AND is_fstar_gaia + AND teff_gspphot BETWEEN {min_teff} AND {max_teff} + AND psf_flux_r BETWEEN {flux_min} AND {flux_max} + AND prob_f_star::text = 'NaN'; """ - logger.info(f"Query string for fluxstd (Gaia): \n{query_string}") + logger.info(f"Query string for fluxstd (Gaia): \n{query_string}") - df = db.fetch_query(query_string) - - t_end = time.time() - logger.info(f"Time spent for querying (s): {t_end - t_begin:.3f}") + df = db.fetch_query(query_string) - df.loc[df["pmra"].isna(), "pmra"] = 0.0 - df.loc[df["pmdec"].isna(), "pmdec"] = 0.0 - df.loc[df["parallax"].isna(), "parallax"] = 1.0e-7 - logger.info(f"Fetched target DataFrame: \n{df}") + t_end = time.time() + logger.info(f"Time spent for querying (s): {t_end - t_begin:.3f}") - db.close() + df.loc[df["pmra"].isna(), "pmra"] = 0.0 + df.loc[df["pmdec"].isna(), "pmdec"] = 0.0 + df.loc[df["parallax"].isna(), "parallax"] = 1.0e-7 + logger.info(f"Fetched target DataFrame: \n{df}") + finally: + db.close() """ #Check if there are clusters of flux standards, and if so, keep only a representative subset to avoid over-representation in certain regions. @@ -380,66 +373,68 @@ def generate_skyobjects_from_targetdb( # extra_where=None, ): db = connect_targetdb(conf) - - search_radius = fp_radius_degree * fp_fudge_factor - + # Use try/finally to guarantee the connection is closed on any exit path, + # including exceptions raised during query execution or data processing. try: - sky_versions = conf["targetdb"]["sky"]["version"] - except Exception: - sky_versions = None - - where_condition = f"WHERE q3c_radial_query(ra, dec, {ra}, {dec}, {search_radius})" + search_radius = fp_radius_degree * fp_fudge_factor - if sky_versions is not None: - version_condition = "(" - first_condition = True - for sky_version in sky_versions: - if first_condition: - first_condition = False - else: - version_condition += " OR " - if sky_version == "20220915": - # use only HSC sky catalog in the older version - version_condition += ( - f"(version = '{sky_version}' AND input_catalog_id=1001)" - ) - else: - version_condition += f"version = '{sky_version}'" - version_condition += ")" + try: + sky_versions = conf["targetdb"]["sky"]["version"] + except Exception: + sky_versions = None + + where_condition = f"WHERE q3c_radial_query(ra, dec, {ra}, {dec}, {search_radius})" + + if sky_versions is not None: + version_condition = "(" + first_condition = True + for sky_version in sky_versions: + if first_condition: + first_condition = False + else: + version_condition += " OR " + if sky_version == "20220915": + # use only HSC sky catalog in the older version + version_condition += ( + f"(version = '{sky_version}' AND input_catalog_id=1001)" + ) + else: + version_condition += f"version = '{sky_version}'" + version_condition += ")" - where_condition += f" AND {version_condition}" + where_condition += f" AND {version_condition}" - query_string = f"""SELECT * + query_string = f"""SELECT * FROM {tablename} {where_condition} """ - query_string += ";" - - logger.info(f"Query string for sky: \n{query_string}") + query_string += ";" - t_begin = time.time() - df = db.fetch_query(query_string) - t_end = time.time() - logger.info(f"Time spent for querying (s): {t_end - t_begin:.3f}") + logger.info(f"Query string for sky: \n{query_string}") - df["pmra"] = np.zeros(df.index.size, dtype=float) - df["pmdec"] = np.zeros(df.index.size, dtype=float) - df["parallax"] = np.full(df.index.size, 1.0e-7) - logger.info(f"Fetched target DataFrame: \n{df}") + t_begin = time.time() + df = db.fetch_query(query_string) + t_end = time.time() + logger.info(f"Time spent for querying (s): {t_end - t_begin:.3f}") - # Replacing obj_id with sky_id as currently (obj_id, cat_id) pairs can be duplicated for sky. - # In the version 20220915, obj_ids are not unique and sometimes not integer. + df["pmra"] = np.zeros(df.index.size, dtype=float) + df["pmdec"] = np.zeros(df.index.size, dtype=float) + df["parallax"] = np.full(df.index.size, 1.0e-7) + logger.info(f"Fetched target DataFrame: \n{df}") - is_old_version = df["version"] == "20220915" + # Replacing obj_id with sky_id as currently (obj_id, cat_id) pairs can be duplicated for sky. + # In the version 20220915, obj_ids are not unique and sometimes not integer. - if np.any(is_old_version): - df.loc[is_old_version, "obj_id"] = df.loc[is_old_version, "sky_id"] - logger.warning( - "obj_id is forced to be replaced to sky_id for sky objects with version=20220915" - ) + is_old_version = df["version"] == "20220915" - db.close() + if np.any(is_old_version): + df.loc[is_old_version, "obj_id"] = df.loc[is_old_version, "sky_id"] + logger.warning( + "obj_id is forced to be replaced to sky_id for sky objects with version=20220915" + ) + finally: + db.close() return df @@ -485,16 +480,17 @@ def generate_targets_from_gaiadb( write_csv=False, ): conn = connect_subaru_gaiadb(conf) - cur = conn.cursor() - - if search_radius is None: - search_radius = fp_radius_degree * fp_fudge_factor + # Use try/finally to guarantee the connection is closed on any exit path, + # including exceptions raised during query execution or data processing. + try: + if search_radius is None: + search_radius = fp_radius_degree * fp_fudge_factor - # Query for fillers: - # astrometric_excess_noise_sig (D) < 2 - # 12 <= phot_g_mean_mag <=20 + # Query for fillers: + # astrometric_excess_noise_sig (D) < 2 + # 12 <= phot_g_mean_mag <=20 - query_string = f"""SELECT + query_string = f"""SELECT source_id,ref_epoch,ra,dec,pmra,pmdec,parallax, phot_g_mean_mag,phot_bp_mean_mag,phot_rp_mean_mag, phot_g_mean_flux_over_error, phot_bp_mean_flux_over_error, phot_rp_mean_flux_over_error @@ -503,36 +499,38 @@ def generate_targets_from_gaiadb( AND {band_select} BETWEEN {mag_min} AND {mag_max} """ - if good_astrometry: - query_string += "AND astrometric_excess_noise < 1.0" - - query_string += ";" - - # logger.info(query_string) - - cur.execute(query_string) - - df_res = pd.DataFrame( - cur.fetchall(), - columns=[ - "source_id", - "ref_epoch", - "ra", - "dec", - "pmra", - "pmdec", - "parallax", - "phot_g_mean_mag", - "phot_bp_mean_mag", - "phot_rp_mean_mag", - "phot_g_mean_flux_over_error", - "phot_bp_mean_flux_over_error", - "phot_rp_mean_flux_over_error", - ], - ) + if good_astrometry: + query_string += "AND astrometric_excess_noise < 1.0" - cur.close() - conn.close() + query_string += ";" + + # logger.info(query_string) + + # psycopg2 cursor used as context manager: cur.close() is called automatically on exit + with conn.cursor() as cur: + cur.execute(query_string) + + df_res = pd.DataFrame( + cur.fetchall(), + columns=[ + "source_id", + "ref_epoch", + "ra", + "dec", + "pmra", + "pmdec", + "parallax", + "phot_g_mean_mag", + "phot_bp_mean_mag", + "phot_rp_mean_mag", + "phot_g_mean_flux_over_error", + "phot_bp_mean_flux_over_error", + "phot_rp_mean_flux_over_error", + ], + ) + finally: + # conn.close() releases the TCP connection back to the server; called even on exception + conn.close() # logger.info(df_res) if write_csv: @@ -669,115 +667,104 @@ def generate_fillers_from_targetdb( write_csv=False, ): db = connect_targetdb(conf) + # Use try/finally to guarantee the connection is closed on any exit path, + # including exceptions raised during query execution or data processing. + try: + if search_radius is None: + search_radius = fp_radius_degree * fp_fudge_factor + + query_string = f"""SELECT + ob_code,obj_id,epoch,ra,dec,pmra,pmdec,parallax,priority,effective_exptime,qa_reference_arm, + psf_flux_g,psf_flux_r,psf_flux_i,psf_flux_z,psf_flux_y, + psf_flux_error_g, psf_flux_error_r, psf_flux_error_i, psf_flux_error_z, psf_flux_error_y, + total_flux_g,total_flux_r,total_flux_i,total_flux_z,total_flux_y, + total_flux_error_g, total_flux_error_r, total_flux_error_i, total_flux_error_z, total_flux_error_y, + filter_g, filter_r, filter_i, filter_z, filter_y, + proposal.proposal_id, proposal.grade, c.input_catalog_id, is_medium_resolution + FROM target JOIN proposal ON target.proposal_id=proposal.proposal_id JOIN input_catalog AS c ON target.input_catalog_id = c.input_catalog_id + WHERE q3c_radial_query(ra, dec, {ra}, {dec}, {search_radius}) + AND c.active + """ + + query_string += ";" + + logger.info(query_string) + + df_res = pd.DataFrame( + db.fetch_query(query_string), + columns=[ + "ob_code", + "obj_id", + "epoch", + "ra", + "dec", + "pmra", + "pmdec", + "parallax", + "priority", + "effective_exptime", + "qa_reference_arm", + "psf_flux_g", + "psf_flux_r", + "psf_flux_i", + "psf_flux_z", + "psf_flux_y", + "psf_flux_error_g", + "psf_flux_error_r", + "psf_flux_error_i", + "psf_flux_error_z", + "psf_flux_error_y", + "total_flux_g", + "total_flux_r", + "total_flux_i", + "total_flux_z", + "total_flux_y", + ], + ) - if search_radius is None: - search_radius = fp_radius_degree * fp_fudge_factor + # df_res = df_res[df_res["grade"] != "G"] # do not include obs. fillers - query_string = f"""SELECT - ob_code,obj_id,epoch,ra,dec,pmra,pmdec,parallax,priority,effective_exptime,qa_reference_arm, - psf_flux_g,psf_flux_r,psf_flux_i,psf_flux_z,psf_flux_y, - psf_flux_error_g, psf_flux_error_r, psf_flux_error_i, psf_flux_error_z, psf_flux_error_y, - total_flux_g,total_flux_r,total_flux_i,total_flux_z,total_flux_y, - total_flux_error_g, total_flux_error_r, total_flux_error_i, total_flux_error_z, total_flux_error_y, - filter_g, filter_r, filter_i, filter_z, filter_y, - proposal.proposal_id, proposal.grade, c.input_catalog_id, is_medium_resolution - FROM target JOIN proposal ON target.proposal_id=proposal.proposal_id JOIN input_catalog AS c ON target.input_catalog_id = c.input_catalog_id - WHERE q3c_radial_query(ra, dec, {ra}, {dec}, {search_radius}) - AND c.active - """ + # convert mag limits to flux (nJy) + flux_max = (mag_min * u.ABmag).to(u.nJy).value + flux_min = (mag_max * u.ABmag).to(u.nJy).value + flux_limit_17mag = (17.0 * u.ABmag).to(u.nJy).value + + # --- build mask --- + # case 1: grade == "G" → flux in desired range + mask_g = (df_res["grade"] == "G") & df_res[band_select].between(flux_min, flux_max) - query_string += ";" - - logger.info(query_string) - - df_res = pd.DataFrame( - db.fetch_query(query_string), - columns=[ - "ob_code", - "obj_id", - "epoch", - "ra", - "dec", - "pmra", - "pmdec", - "parallax", - "priority", - "effective_exptime", - "qa_reference_arm", - "psf_flux_g", - "psf_flux_r", - "psf_flux_i", - "psf_flux_z", - "psf_flux_y", - "psf_flux_error_g", - "psf_flux_error_r", - "psf_flux_error_i", - "psf_flux_error_z", - "psf_flux_error_y", + # case 2: grade != "G" → none of the bands brighter than 17 mag + flux_cols = [ "total_flux_g", "total_flux_r", "total_flux_i", "total_flux_z", "total_flux_y", - "total_flux_error_g", - "total_flux_error_r", - "total_flux_error_i", - "total_flux_error_z", - "total_flux_error_y", - "filter_g", - "filter_r", - "filter_i", - "filter_z", - "filter_y", - "proposal_id", - "grade", - "input_catalog_id", - "is_medium_resolution", - ], - ) - - # df_res = df_res[df_res["grade"] != "G"] # do not include obs. fillers + ] - # convert mag limits to flux (nJy) - flux_max = (mag_min * u.ABmag).to(u.nJy).value - flux_min = (mag_max * u.ABmag).to(u.nJy).value - flux_limit_17mag = (17.0 * u.ABmag).to(u.nJy).value - - # --- build mask --- - # case 1: grade == "G" → flux in desired range - mask_g = (df_res["grade"] == "G") & df_res[band_select].between(flux_min, flux_max) - - # case 2: grade != "G" → none of the bands brighter than 17 mag - flux_cols = [ - "total_flux_g", - "total_flux_r", - "total_flux_i", - "total_flux_z", - "total_flux_y", - ] + # --- case 2: grade != "G" --- + # we build a per-row mask that depends on proposal_id + mask_not_g = np.zeros(len(df_res), dtype=bool) - # --- case 2: grade != "G" --- - # we build a per-row mask that depends on proposal_id - mask_not_g = np.zeros(len(df_res), dtype=bool) + for i, (_, row) in enumerate(df_res.iterrows()): + if row["grade"] == "G": + continue + else: + # interpret as fluxes → keep if all bands ≤ flux_limit_17mag + if not np.any( + [ + (row[col] is not None) + and np.isfinite(row[col]) + and (row[col] > flux_limit_17mag) + for col in flux_cols + ] + ): + mask_not_g[i] = True - for i, (_, row) in enumerate(df_res.iterrows()): - if row["grade"] == "G": continue - else: - # interpret as fluxes → keep if all bands ≤ flux_limit_17mag - if not np.any( - [ - (row[col] is not None) - and np.isfinite(row[col]) - and (row[col] > flux_limit_17mag) - for col in flux_cols - ] - ): - mask_not_g[i] = True - - # --- combine both --- - df_res_magcut = df_res[mask_g | mask_not_g].reset_index(drop=True) - - db.close() + # --- combine both --- + df_res_magcut = df_res[mask_g | mask_not_g].reset_index(drop=True) + finally: + db.close() # logger.info(df_res) if write_csv: @@ -876,41 +863,43 @@ def format_epoch(x): except FileNotFoundError: # query qaDB to get executed pfsdesign conn = connect_qadb(conf) - cur = conn.cursor() - - sql = """ + # Use try/finally to guarantee the connection is closed on any exit path. + try: + sql = """ SELECT * - FROM exposure_time - JOIN pfs_visit ON exposure_time.pfs_visit_id = pfs_visit.pfs_visit_id + FROM exposure_time + JOIN pfs_visit ON exposure_time.pfs_visit_id = pfs_visit.pfs_visit_id JOIN onsite_processing_status ON onsite_processing_status.pfs_visit_id = pfs_visit.pfs_visit_id WHERE pfs_visit.pfs_visit_id >=129587 ORDER BY pfs_visit.pfs_visit_id DESC; """ - cur.execute(sql) - - df_design_done = pd.DataFrame( - cur.fetchall(), - columns=[ - "pfs_visit_id", - "nominal_exposure_time", - "effective_exposure_time_r", - "effective_exposure_time_b", - "effective_exposure_time_n", - "effective_exposure_time_m", - "pfs_visit_id", - "pfs_visit_description", - "pfs_design_id", - "issued_at", - "pfs_visit_id", - "status", - "started_at", - "updated_at", - ], - ) - - cur.close() - conn.close() + # psycopg2 cursor used as context manager: cur.close() is called automatically on exit + with conn.cursor() as cur: + cur.execute(sql) + + df_design_done = pd.DataFrame( + cur.fetchall(), + columns=[ + "pfs_visit_id", + "nominal_exposure_time", + "effective_exposure_time_r", + "effective_exposure_time_b", + "effective_exposure_time_n", + "effective_exposure_time_m", + "pfs_visit_id", + "pfs_visit_description", + "pfs_design_id", + "issued_at", + "pfs_visit_id", + "status", + "started_at", + "updated_at", + ], + ) + finally: + # conn.close() releases the TCP connection back to the server; called even on exception + conn.close() # search for design files under /work/wanqqq/ and make a df of observed obs filler base_dir = "/work/wanqqq/"