From c93802a04b4a9a70e29b76b56f7faad515f8df2a Mon Sep 17 00:00:00 2001 From: Masato Onodera Date: Thu, 21 May 2026 17:04:18 -1000 Subject: [PATCH] fix: guarantee DB connections are closed on any exit path All DB-querying functions in dbutils.py opened a connection at the start and called close() at the end, but without try/finally. An exception raised during query execution or data processing would skip the close() call, leaving connections open on the server. - Wrap all four generate_*_from_targetdb() functions in try/finally so that db.close() is guaranteed even on exception. - For psycopg2 connections (gaiadb, qadb), use try/finally for conn.close() and "with conn.cursor() as cur:" for cursor cleanup. Co-Authored-By: Claude Sonnet 4.6 --- src/pfs_design_tool/pointing_utils/dbutils.py | 704 +++++++++--------- 1 file changed, 359 insertions(+), 345 deletions(-) diff --git a/src/pfs_design_tool/pointing_utils/dbutils.py b/src/pfs_design_tool/pointing_utils/dbutils.py index f98662f..215e7fb 100644 --- a/src/pfs_design_tool/pointing_utils/dbutils.py +++ b/src/pfs_design_tool/pointing_utils/dbutils.py @@ -51,115 +51,117 @@ def generate_targets_from_targetdb( max_priority=None, ): db = connect_targetdb(conf) + # Use try/finally to guarantee the connection is closed on any exit path, + # including exceptions raised during query execution or data processing. + try: + search_radius = fp_radius_degree * fp_fudge_factor - search_radius = fp_radius_degree * fp_fudge_factor - - if extra_where is None: - extra_where = "" + if extra_where is None: + extra_where = "" - if "m" in arms: - if "r" not in arms: - extra_where = "AND is_medium_resolution IS TRUE" - else: - if "r" in arms: - extra_where = "AND is_medium_resolution IS FALSE" + if "m" in arms: + if "r" not in arms: + extra_where = "AND is_medium_resolution IS TRUE" + else: + if "r" in arms: + extra_where = "AND is_medium_resolution IS FALSE" - query_string = f"""SELECT ob_code,obj_id,c.input_catalog_id,ra,dec,epoch,priority,pmra,pmdec,parallax,effective_exptime,single_exptime,qa_reference_arm,is_medium_resolution,proposal.proposal_id,rank,grade,allocated_time_lr+allocated_time_mr as \"allocated_time\",allocated_time_lr,allocated_time_mr,filter_g,filter_r,filter_i,filter_z,filter_y,psf_flux_g,psf_flux_r,psf_flux_i,psf_flux_z,psf_flux_y,psf_flux_error_g,psf_flux_error_r,psf_flux_error_i,psf_flux_error_z,psf_flux_error_y,total_flux_g,total_flux_r,total_flux_i,total_flux_z,total_flux_y,total_flux_error_g,total_flux_error_r,total_flux_error_i,total_flux_error_z,total_flux_error_y + query_string = f"""SELECT ob_code,obj_id,c.input_catalog_id,ra,dec,epoch,priority,pmra,pmdec,parallax,effective_exptime,single_exptime,qa_reference_arm,is_medium_resolution,proposal.proposal_id,rank,grade,allocated_time_lr+allocated_time_mr as \"allocated_time\",allocated_time_lr,allocated_time_mr,filter_g,filter_r,filter_i,filter_z,filter_y,psf_flux_g,psf_flux_r,psf_flux_i,psf_flux_z,psf_flux_y,psf_flux_error_g,psf_flux_error_r,psf_flux_error_i,psf_flux_error_z,psf_flux_error_y,total_flux_g,total_flux_r,total_flux_i,total_flux_z,total_flux_y,total_flux_error_g,total_flux_error_r,total_flux_error_i,total_flux_error_z,total_flux_error_y FROM {tablename} JOIN input_catalog AS c ON {tablename}.input_catalog_id = c.input_catalog_id JOIN proposal ON {tablename}.proposal_id=proposal.proposal_id WHERE q3c_radial_query(ra, dec, {ra}, {dec}, {search_radius}) AND c.active """ - if extra_where is not None: - query_string += extra_where - - if input_catalog is not None: - query_string += ( - " AND (" - + "OR".join([f" input_catalog_id={v} " for v in input_catalog]) - + ")" - ) + if extra_where is not None: + query_string += extra_where + + if input_catalog is not None: + query_string += ( + " AND (" + + "OR".join([f" input_catalog_id={v} " for v in input_catalog]) + + ")" + ) - if proposal_id is not None: - query_string += ( - " AND (" - + "OR".join([f" {tablename}.proposal_id='{v}' " for v in proposal_id]) - + ")" - ) + if proposal_id is not None: + query_string += ( + " AND (" + + "OR".join([f" {tablename}.proposal_id='{v}' " for v in proposal_id]) + + ")" + ) - if max_priority is not None: - query_string += f" AND priority <= {max_priority}" + if max_priority is not None: + query_string += f" AND priority <= {max_priority}" - query_string += ";" + query_string += ";" - # logger.info(f"Query string for targets:\n{query_string}") + # logger.info(f"Query string for targets:\n{query_string}") - df = pd.DataFrame() + df = pd.DataFrame() - t_begin = time.time() - df = db.fetch_query(query_string) - t_end = time.time() - # logger.info(f"Time spent for querying (s): {t_end - t_begin:.3f}") - - # keep user fillers (grade BCF) for queue only; - if conf["ppp"]["mode"] == "classic": - mask_keep = (df["proposal_id"].str.startswith("S25A")) & (df["grade"].isin(["G"])) - else: - mask_keep = ( - ((df["proposal_id"].str.startswith("S26A")) & (df["grade"].isin(["C", "F"]))) - | ((df["proposal_id"].str.startswith("S25A")) & (df["grade"].isin(["G"]))) - ) - - df = df.loc[mask_keep].reset_index(drop=True) + t_begin = time.time() + df = db.fetch_query(query_string) + t_end = time.time() + # logger.info(f"Time spent for querying (s): {t_end - t_begin:.3f}") - df.loc[df["pmra"].isna(), "pmra"] = 0.0 - df.loc[df["pmdec"].isna(), "pmdec"] = 0.0 - df.loc[df["parallax"].isna(), "parallax"] = 1.0e-7 - df.loc[df["rank"] < 0, "rank"] = 10.0 # give highest rank to classic targets - # logger.info(f"Fetched target DataFrame: \n{df}") - - if force_priority is not None: - df["priority"] = force_priority - - # convert mag limits to flux (nJy) - flux_max = (mag_min * u.ABmag).to(u.nJy).value - flux_min = (mag_max * u.ABmag).to(u.nJy).value - flux_limit_17mag = (17.0 * u.ABmag).to(u.nJy).value - - # --- build mask --- - # case 1: grade == "G" → flux in desired range - mask_g = (df["grade"] == "G") & df[mag_filter].between(flux_min, flux_max) - - # case 2: grade != "G" → none of the bands brighter than 17 mag - flux_cols = [ - "total_flux_g", - "total_flux_r", - "total_flux_i", - "total_flux_z", - "total_flux_y", - ] - # --- case 2: grade != "G" --- - # we build a per-row mask that depends on proposal_id - mask_not_g = np.zeros(len(df), dtype=bool) - - for i, (_, row) in enumerate(df.iterrows()): - if row["grade"] in ["B", "C", "F"]: - # interpret as fluxes → keep if all bands ≤ flux_limit_17mag - if not np.any( - [ - (row[col] is not None) - and np.isfinite(row[col]) - and (row[col] > flux_limit_17mag) - for col in flux_cols - ] - ): - mask_not_g[i] = True + # keep user fillers (grade BCF) for queue only; + if conf["ppp"]["mode"] == "classic": + mask_keep = (df["proposal_id"].str.startswith("S25A")) & (df["grade"].isin(["G"])) else: - continue + mask_keep = ( + ((df["proposal_id"].str.startswith("S26A")) & (df["grade"].isin(["C", "F"]))) + | ((df["proposal_id"].str.startswith("S25A")) & (df["grade"].isin(["G"]))) + ) + + df = df.loc[mask_keep].reset_index(drop=True) - # --- combine both --- - df = df[mask_g | mask_not_g].reset_index(drop=True) + df.loc[df["pmra"].isna(), "pmra"] = 0.0 + df.loc[df["pmdec"].isna(), "pmdec"] = 0.0 + df.loc[df["parallax"].isna(), "parallax"] = 1.0e-7 + df.loc[df["rank"] < 0, "rank"] = 10.0 # give highest rank to classic targets + # logger.info(f"Fetched target DataFrame: \n{df}") - db.close() + if force_priority is not None: + df["priority"] = force_priority + + # convert mag limits to flux (nJy) + flux_max = (mag_min * u.ABmag).to(u.nJy).value + flux_min = (mag_max * u.ABmag).to(u.nJy).value + flux_limit_17mag = (17.0 * u.ABmag).to(u.nJy).value + + # --- build mask --- + # case 1: grade == "G" → flux in desired range + mask_g = (df["grade"] == "G") & df[mag_filter].between(flux_min, flux_max) + + # case 2: grade != "G" → none of the bands brighter than 17 mag + flux_cols = [ + "total_flux_g", + "total_flux_r", + "total_flux_i", + "total_flux_z", + "total_flux_y", + ] + # --- case 2: grade != "G" --- + # we build a per-row mask that depends on proposal_id + mask_not_g = np.zeros(len(df), dtype=bool) + + for i, (_, row) in enumerate(df.iterrows()): + if row["grade"] in ["B", "C", "F"]: + # interpret as fluxes → keep if all bands ≤ flux_limit_17mag + if not np.any( + [ + (row[col] is not None) + and np.isfinite(row[col]) + and (row[col] > flux_limit_17mag) + for col in flux_cols + ] + ): + mask_not_g[i] = True + else: + continue + + # --- combine both --- + df = df[mask_g | mask_not_g].reset_index(drop=True) + finally: + db.close() return df @@ -194,88 +196,90 @@ def generate_fluxstds_from_targetdb( fluxstd_versions = None db = connect_targetdb(conf) + # Use try/finally to guarantee the connection is closed on any exit path, + # including exceptions raised during query execution or data processing. + try: + search_radius = fp_radius_degree * fp_fudge_factor - search_radius = fp_radius_degree * fp_fudge_factor - - query_string = f"""SELECT * + query_string = f"""SELECT * FROM {tablename} WHERE q3c_radial_query(ra, dec, {ra}, {dec}, {search_radius}) """ - if extra_where is None: - extra_where = "" + if extra_where is None: + extra_where = "" - filters = [] + filters = [] - if not ignore_prob_f_star: - filters.append(f"AND prob_f_star BETWEEN {min_prob_f_star} AND 1.0") + if not ignore_prob_f_star: + filters.append(f"AND prob_f_star BETWEEN {min_prob_f_star} AND 1.0") - if good_fluxstd: - filters.append("AND flags_dist IS FALSE") - filters.append("AND flags_ebv IS FALSE") - else: - if flags_dist: + if good_fluxstd: filters.append("AND flags_dist IS FALSE") - if flags_ebv: filters.append("AND flags_ebv IS FALSE") + else: + if flags_dist: + filters.append("AND flags_dist IS FALSE") + if flags_ebv: + filters.append("AND flags_ebv IS FALSE") - if select_by_flux: - filters.append(f"AND psf_flux_{mag_filter} BETWEEN {flux_min} AND {flux_max}") - else: - filters.append(f"AND psf_mag_{mag_filter} BETWEEN {mag_min} AND {mag_max}") + if select_by_flux: + filters.append(f"AND psf_flux_{mag_filter} BETWEEN {flux_min} AND {flux_max}") + else: + filters.append(f"AND psf_mag_{mag_filter} BETWEEN {mag_min} AND {mag_max}") - if select_from_gaia: - filters.append(f"AND teff_gspphot BETWEEN {min_teff} AND {max_teff}") - else: - filters.append(f"AND teff_brutus BETWEEN {min_teff} AND {max_teff}") + if select_from_gaia: + filters.append(f"AND teff_gspphot BETWEEN {min_teff} AND {max_teff}") + else: + filters.append(f"AND teff_brutus BETWEEN {min_teff} AND {max_teff}") - if fluxstd_versions is not None: - fluxstd_version = str(fluxstd_versions) - filters.append(f"AND (version = '{fluxstd_version}')") + if fluxstd_versions is not None: + fluxstd_version = str(fluxstd_versions) + filters.append(f"AND (version = '{fluxstd_version}')") - try: - if float(fluxstd_version) >= 3.5: - filters.append("AND (is_gc_neighbor = False)") - filters.append("AND (is_dense_region = False)") - except (TypeError, ValueError): - pass + try: + if float(fluxstd_version) >= 3.5: + filters.append("AND (is_gc_neighbor = False)") + filters.append("AND (is_dense_region = False)") + except (TypeError, ValueError): + pass - query_string += extra_where - if filters: - query_string += "\n" + "\n".join(filters) + query_string += extra_where + if filters: + query_string += "\n" + "\n".join(filters) - query_string += ";" + query_string += ";" - logger.info(f"Query string for fluxstd: \n{query_string}") + logger.info(f"Query string for fluxstd: \n{query_string}") - t_begin = time.time() - df = db.fetch_query(query_string) + t_begin = time.time() + df = db.fetch_query(query_string) - if len(df) == 0: - # select gaia fstar when no PS1 fstar is selected - flux_max = (mag_min * u.ABmag).to(u.nJy).value - flux_min = (mag_max * u.ABmag).to(u.nJy).value + if len(df) == 0: + # select gaia fstar when no PS1 fstar is selected + flux_max = (mag_min * u.ABmag).to(u.nJy).value + flux_min = (mag_max * u.ABmag).to(u.nJy).value - query_string = f"""SELECT * + query_string = f"""SELECT * FROM {tablename} WHERE q3c_radial_query(ra, dec, {ra}, {dec}, {search_radius}) AND is_fstar_gaia AND teff_gspphot BETWEEN {min_teff} AND {max_teff} AND psf_flux_r BETWEEN {flux_min} AND {flux_max}; """ - logger.info(f"Query string for fluxstd (Gaia): \n{query_string}") + logger.info(f"Query string for fluxstd (Gaia): \n{query_string}") - df = db.fetch_query(query_string) + df = db.fetch_query(query_string) - t_end = time.time() - logger.info(f"Time spent for querying (s): {t_end - t_begin:.3f}") + t_end = time.time() + logger.info(f"Time spent for querying (s): {t_end - t_begin:.3f}") - df.loc[df["pmra"].isna(), "pmra"] = 0.0 - df.loc[df["pmdec"].isna(), "pmdec"] = 0.0 - df.loc[df["parallax"].isna(), "parallax"] = 1.0e-7 - logger.info(f"Fetched target DataFrame: \n{df}") - - db.close() + df.loc[df["pmra"].isna(), "pmra"] = 0.0 + df.loc[df["pmdec"].isna(), "pmdec"] = 0.0 + df.loc[df["parallax"].isna(), "parallax"] = 1.0e-7 + logger.info(f"Fetched target DataFrame: \n{df}") + finally: + db.close() """ #Check if there are clusters of flux standards, and if so, keep only a representative subset to avoid over-representation in certain regions. @@ -353,66 +357,68 @@ def generate_skyobjects_from_targetdb( # extra_where=None, ): db = connect_targetdb(conf) - - search_radius = fp_radius_degree * fp_fudge_factor - + # Use try/finally to guarantee the connection is closed on any exit path, + # including exceptions raised during query execution or data processing. try: - sky_versions = conf["targetdb"]["sky"]["version"] - except Exception: - sky_versions = None - - where_condition = f"WHERE q3c_radial_query(ra, dec, {ra}, {dec}, {search_radius})" + search_radius = fp_radius_degree * fp_fudge_factor - if sky_versions is not None: - version_condition = "(" - first_condition = True - for sky_version in sky_versions: - if first_condition: - first_condition = False - else: - version_condition += " OR " - if sky_version == "20220915": - # use only HSC sky catalog in the older version - version_condition += ( - f"(version = '{sky_version}' AND input_catalog_id=1001)" - ) - else: - version_condition += f"version = '{sky_version}'" - version_condition += ")" + try: + sky_versions = conf["targetdb"]["sky"]["version"] + except Exception: + sky_versions = None + + where_condition = f"WHERE q3c_radial_query(ra, dec, {ra}, {dec}, {search_radius})" + + if sky_versions is not None: + version_condition = "(" + first_condition = True + for sky_version in sky_versions: + if first_condition: + first_condition = False + else: + version_condition += " OR " + if sky_version == "20220915": + # use only HSC sky catalog in the older version + version_condition += ( + f"(version = '{sky_version}' AND input_catalog_id=1001)" + ) + else: + version_condition += f"version = '{sky_version}'" + version_condition += ")" - where_condition += f" AND {version_condition}" + where_condition += f" AND {version_condition}" - query_string = f"""SELECT * + query_string = f"""SELECT * FROM {tablename} {where_condition} """ - query_string += ";" - - logger.info(f"Query string for sky: \n{query_string}") + query_string += ";" - t_begin = time.time() - df = db.fetch_query(query_string) - t_end = time.time() - logger.info(f"Time spent for querying (s): {t_end - t_begin:.3f}") + logger.info(f"Query string for sky: \n{query_string}") - df["pmra"] = np.zeros(df.index.size, dtype=float) - df["pmdec"] = np.zeros(df.index.size, dtype=float) - df["parallax"] = np.full(df.index.size, 1.0e-7) - logger.info(f"Fetched target DataFrame: \n{df}") + t_begin = time.time() + df = db.fetch_query(query_string) + t_end = time.time() + logger.info(f"Time spent for querying (s): {t_end - t_begin:.3f}") - # Replacing obj_id with sky_id as currently (obj_id, cat_id) pairs can be duplicated for sky. - # In the version 20220915, obj_ids are not unique and sometimes not integer. + df["pmra"] = np.zeros(df.index.size, dtype=float) + df["pmdec"] = np.zeros(df.index.size, dtype=float) + df["parallax"] = np.full(df.index.size, 1.0e-7) + logger.info(f"Fetched target DataFrame: \n{df}") - is_old_version = df["version"] == "20220915" + # Replacing obj_id with sky_id as currently (obj_id, cat_id) pairs can be duplicated for sky. + # In the version 20220915, obj_ids are not unique and sometimes not integer. - if np.any(is_old_version): - df.loc[is_old_version, "obj_id"] = df.loc[is_old_version, "sky_id"] - logger.warning( - "obj_id is forced to be replaced to sky_id for sky objects with version=20220915" - ) + is_old_version = df["version"] == "20220915" - db.close() + if np.any(is_old_version): + df.loc[is_old_version, "obj_id"] = df.loc[is_old_version, "sky_id"] + logger.warning( + "obj_id is forced to be replaced to sky_id for sky objects with version=20220915" + ) + finally: + db.close() return df @@ -458,16 +464,17 @@ def generate_targets_from_gaiadb( write_csv=False, ): conn = connect_subaru_gaiadb(conf) - cur = conn.cursor() - - if search_radius is None: - search_radius = fp_radius_degree * fp_fudge_factor + # Use try/finally to guarantee the connection is closed on any exit path, + # including exceptions raised during query execution or data processing. + try: + if search_radius is None: + search_radius = fp_radius_degree * fp_fudge_factor - # Query for fillers: - # astrometric_excess_noise_sig (D) < 2 - # 12 <= phot_g_mean_mag <=20 + # Query for fillers: + # astrometric_excess_noise_sig (D) < 2 + # 12 <= phot_g_mean_mag <=20 - query_string = f"""SELECT + query_string = f"""SELECT source_id,ref_epoch,ra,dec,pmra,pmdec,parallax, phot_g_mean_mag,phot_bp_mean_mag,phot_rp_mean_mag, phot_g_mean_flux_over_error, phot_bp_mean_flux_over_error, phot_rp_mean_flux_over_error @@ -476,36 +483,38 @@ def generate_targets_from_gaiadb( AND {band_select} BETWEEN {mag_min} AND {mag_max} """ - if good_astrometry: - query_string += "AND astrometric_excess_noise < 1.0" - - query_string += ";" - - # logger.info(query_string) - - cur.execute(query_string) - - df_res = pd.DataFrame( - cur.fetchall(), - columns=[ - "source_id", - "ref_epoch", - "ra", - "dec", - "pmra", - "pmdec", - "parallax", - "phot_g_mean_mag", - "phot_bp_mean_mag", - "phot_rp_mean_mag", - "phot_g_mean_flux_over_error", - "phot_bp_mean_flux_over_error", - "phot_rp_mean_flux_over_error", - ], - ) + if good_astrometry: + query_string += "AND astrometric_excess_noise < 1.0" + + query_string += ";" - cur.close() - conn.close() + # logger.info(query_string) + + # psycopg2 cursor used as context manager: cur.close() is called automatically on exit + with conn.cursor() as cur: + cur.execute(query_string) + + df_res = pd.DataFrame( + cur.fetchall(), + columns=[ + "source_id", + "ref_epoch", + "ra", + "dec", + "pmra", + "pmdec", + "parallax", + "phot_g_mean_mag", + "phot_bp_mean_mag", + "phot_rp_mean_mag", + "phot_g_mean_flux_over_error", + "phot_bp_mean_flux_over_error", + "phot_rp_mean_flux_over_error", + ], + ) + finally: + # conn.close() releases the TCP connection back to the server; called even on exception + conn.close() # logger.info(df_res) if write_csv: @@ -642,16 +651,18 @@ def generate_fillers_from_targetdb( write_csv=False, ): db = connect_targetdb(conf) + # Use try/finally to guarantee the connection is closed on any exit path, + # including exceptions raised during query execution or data processing. + try: + if search_radius is None: + search_radius = fp_radius_degree * fp_fudge_factor - if search_radius is None: - search_radius = fp_radius_degree * fp_fudge_factor - - query_string = f"""SELECT + query_string = f"""SELECT ob_code,obj_id,epoch,ra,dec,pmra,pmdec,parallax,qa_reference_arm, psf_flux_g,psf_flux_r,psf_flux_i,psf_flux_z,psf_flux_y, - psf_flux_error_g, psf_flux_error_r, psf_flux_error_i, psf_flux_error_z, psf_flux_error_y, + psf_flux_error_g, psf_flux_error_r, psf_flux_error_i, psf_flux_error_z, psf_flux_error_y, total_flux_g,total_flux_r,total_flux_i,total_flux_z,total_flux_y, - total_flux_error_g, total_flux_error_r, total_flux_error_i, total_flux_error_z, total_flux_error_y, + total_flux_error_g, total_flux_error_r, total_flux_error_i, total_flux_error_z, total_flux_error_y, filter_g, filter_r, filter_i, filter_z, filter_y, proposal.proposal_id, proposal.grade, c.input_catalog_id, is_medium_resolution FROM target JOIN proposal ON target.proposal_id=proposal.proposal_id JOIN input_catalog AS c ON target.input_catalog_id = c.input_catalog_id @@ -659,96 +670,97 @@ def generate_fillers_from_targetdb( AND c.active """ - query_string += ";" - - logger.info(query_string) - - df_res = pd.DataFrame( - db.fetch_query(query_string), - columns=[ - "ob_code", - "obj_id", - "epoch", - "ra", - "dec", - "pmra", - "pmdec", - "parallax", - "qa_reference_arm", - "psf_flux_g", - "psf_flux_r", - "psf_flux_i", - "psf_flux_z", - "psf_flux_y", - "psf_flux_error_g", - "psf_flux_error_r", - "psf_flux_error_i", - "psf_flux_error_z", - "psf_flux_error_y", + query_string += ";" + + logger.info(query_string) + + df_res = pd.DataFrame( + db.fetch_query(query_string), + columns=[ + "ob_code", + "obj_id", + "epoch", + "ra", + "dec", + "pmra", + "pmdec", + "parallax", + "qa_reference_arm", + "psf_flux_g", + "psf_flux_r", + "psf_flux_i", + "psf_flux_z", + "psf_flux_y", + "psf_flux_error_g", + "psf_flux_error_r", + "psf_flux_error_i", + "psf_flux_error_z", + "psf_flux_error_y", + "total_flux_g", + "total_flux_r", + "total_flux_i", + "total_flux_z", + "total_flux_y", + "total_flux_error_g", + "total_flux_error_r", + "total_flux_error_i", + "total_flux_error_z", + "total_flux_error_y", + "filter_g", + "filter_r", + "filter_i", + "filter_z", + "filter_y", + "proposal_id", + "grade", + "input_catalog_id", + "is_medium_resolution", + ], + ) + + # df_res = df_res[df_res["grade"] != "G"] # do not include obs. fillers + + # convert mag limits to flux (nJy) + flux_max = (mag_min * u.ABmag).to(u.nJy).value + flux_min = (mag_max * u.ABmag).to(u.nJy).value + flux_limit_17mag = (17.0 * u.ABmag).to(u.nJy).value + + # --- build mask --- + # case 1: grade == "G" → flux in desired range + mask_g = (df_res["grade"] == "G") & df_res[band_select].between(flux_min, flux_max) + + # case 2: grade != "G" → none of the bands brighter than 17 mag + flux_cols = [ "total_flux_g", "total_flux_r", "total_flux_i", "total_flux_z", "total_flux_y", - "total_flux_error_g", - "total_flux_error_r", - "total_flux_error_i", - "total_flux_error_z", - "total_flux_error_y", - "filter_g", - "filter_r", - "filter_i", - "filter_z", - "filter_y", - "proposal_id", - "grade", - "input_catalog_id", - "is_medium_resolution", - ], - ) - - # df_res = df_res[df_res["grade"] != "G"] # do not include obs. fillers - - # convert mag limits to flux (nJy) - flux_max = (mag_min * u.ABmag).to(u.nJy).value - flux_min = (mag_max * u.ABmag).to(u.nJy).value - flux_limit_17mag = (17.0 * u.ABmag).to(u.nJy).value + ] - # --- build mask --- - # case 1: grade == "G" → flux in desired range - mask_g = (df_res["grade"] == "G") & df_res[band_select].between(flux_min, flux_max) + # --- case 2: grade != "G" --- + # we build a per-row mask that depends on proposal_id + mask_not_g = np.zeros(len(df_res), dtype=bool) - # case 2: grade != "G" → none of the bands brighter than 17 mag - flux_cols = [ - "total_flux_g", - "total_flux_r", - "total_flux_i", - "total_flux_z", - "total_flux_y", - ] + for i, (_, row) in enumerate(df_res.iterrows()): + if row["grade"] == "G": + continue + else: + # interpret as fluxes → keep if all bands ≤ flux_limit_17mag + if not np.any( + [ + (row[col] is not None) + and np.isfinite(row[col]) + and (row[col] > flux_limit_17mag) + for col in flux_cols + ] + ): + mask_not_g[i] = True - # --- case 2: grade != "G" --- - # we build a per-row mask that depends on proposal_id - mask_not_g = np.zeros(len(df_res), dtype=bool) - - for i, (_, row) in enumerate(df_res.iterrows()): - if row["grade"] == "G": continue - else: - # interpret as fluxes → keep if all bands ≤ flux_limit_17mag - if not np.any( - [ - (row[col] is not None) - and np.isfinite(row[col]) - and (row[col] > flux_limit_17mag) - for col in flux_cols - ] - ): - mask_not_g[i] = True - - # --- combine both --- - df_res_magcut = df_res[mask_g | mask_not_g].reset_index(drop=True) - - db.close() + # --- combine both --- + df_res_magcut = df_res[mask_g | mask_not_g].reset_index(drop=True) + finally: + db.close() # logger.info(df_res) if write_csv: @@ -841,41 +853,43 @@ def format_epoch(x): except FileNotFoundError: # query qaDB to get executed pfsdesign conn = connect_qadb(conf) - cur = conn.cursor() - - sql = """ + # Use try/finally to guarantee the connection is closed on any exit path. + try: + sql = """ SELECT * - FROM exposure_time - JOIN pfs_visit ON exposure_time.pfs_visit_id = pfs_visit.pfs_visit_id + FROM exposure_time + JOIN pfs_visit ON exposure_time.pfs_visit_id = pfs_visit.pfs_visit_id JOIN onsite_processing_status ON onsite_processing_status.pfs_visit_id = pfs_visit.pfs_visit_id WHERE pfs_visit.pfs_visit_id >=129587 ORDER BY pfs_visit.pfs_visit_id DESC; """ - cur.execute(sql) - - df_design_done = pd.DataFrame( - cur.fetchall(), - columns=[ - "pfs_visit_id", - "nominal_exposure_time", - "effective_exposure_time_r", - "effective_exposure_time_b", - "effective_exposure_time_n", - "effective_exposure_time_m", - "pfs_visit_id", - "pfs_visit_description", - "pfs_design_id", - "issued_at", - "pfs_visit_id", - "status", - "started_at", - "updated_at", - ], - ) - - cur.close() - conn.close() + # psycopg2 cursor used as context manager: cur.close() is called automatically on exit + with conn.cursor() as cur: + cur.execute(sql) + + df_design_done = pd.DataFrame( + cur.fetchall(), + columns=[ + "pfs_visit_id", + "nominal_exposure_time", + "effective_exposure_time_r", + "effective_exposure_time_b", + "effective_exposure_time_n", + "effective_exposure_time_m", + "pfs_visit_id", + "pfs_visit_description", + "pfs_design_id", + "issued_at", + "pfs_visit_id", + "status", + "started_at", + "updated_at", + ], + ) + finally: + # conn.close() releases the TCP connection back to the server; called even on exception + conn.close() # search for design files under /work/wanqqq/ and make a df of observed obs filler base_dir = "/work/wanqqq/"