|
6 | 6 | from sqlalchemy import text |
7 | 7 | from data_server.database.session import get_sync_session |
8 | 8 |
|
| 9 | +# Initialization data version - Update this version when modifying SQL files in Initialization_data directory |
| 10 | +INIT_DATA_VERSION = "1.0.0" |
| 11 | + |
| 12 | +def get_current_db_version() -> str: |
| 13 | + """ |
| 14 | + Get the current initialization data version stored in database. |
| 15 | + Returns the latest version (ordered by id DESC) or 'None' if no version is found. |
| 16 | + """ |
| 17 | + ensure_deletion_status_table() |
| 18 | + with get_sync_session() as session: |
| 19 | + try: |
| 20 | + result = session.execute( |
| 21 | + text("SELECT version FROM deletion_status ORDER BY id DESC LIMIT 1") |
| 22 | + ) |
| 23 | + version = result.scalar_one_or_none() |
| 24 | + return version if version else "None" |
| 25 | + except Exception as e: |
| 26 | + logger.warning(f"Error getting current database version: {e}") |
| 27 | + return "Error" |
| 28 | + |
9 | 29 | def ensure_deletion_status_table(): |
10 | 30 | """ |
11 | 31 | Create deletion_status table if it doesn't exist. |
| 32 | + Simplified table structure with only necessary fields: |
| 33 | + - id: Primary key |
| 34 | + - version: Initialization data version |
| 35 | + - description: Optional description |
| 36 | + - created_at: Timestamp when this version was initialized (for version history tracking) |
| 37 | + |
| 38 | + Also handles migration from old table structure (removes operation_name and executed_at). |
12 | 39 | """ |
13 | 40 | with get_sync_session() as session: |
14 | 41 | try: |
| 42 | + # Create table if not exists (new simplified structure with created_at) |
| 43 | + logger.info("Ensuring deletion_status table exists...") |
15 | 44 | session.execute(text(""" |
16 | 45 | CREATE TABLE IF NOT EXISTS deletion_status ( |
17 | 46 | id SERIAL PRIMARY KEY, |
18 | | - operation_name VARCHAR(255) UNIQUE NOT NULL, |
19 | | - executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
20 | | - description TEXT |
| 47 | + version VARCHAR(50), |
| 48 | + description TEXT, |
| 49 | + created_at TIMESTAMP DEFAULT (now() AT TIME ZONE 'Asia/Shanghai') |
21 | 50 | ) |
22 | 51 | """)) |
23 | 52 | session.commit() |
| 53 | + logger.info("deletion_status table ensured") |
| 54 | + |
| 55 | + # Add version column if it doesn't exist (for backward compatibility) |
| 56 | + try: |
| 57 | + logger.info("Ensuring version column exists...") |
| 58 | + session.execute(text(""" |
| 59 | + ALTER TABLE deletion_status |
| 60 | + ADD COLUMN IF NOT EXISTS version VARCHAR(50) |
| 61 | + """)) |
| 62 | + session.commit() |
| 63 | + logger.info("version column ensured") |
| 64 | + except Exception as e: |
| 65 | + logger.debug(f"Could not add version column: {e}") |
| 66 | + session.rollback() |
| 67 | + |
| 68 | + # Add created_at column if it doesn't exist (for backward compatibility) |
| 69 | + try: |
| 70 | + logger.info("Ensuring created_at column exists...") |
| 71 | + session.execute(text(""" |
| 72 | + ALTER TABLE deletion_status |
| 73 | + ADD COLUMN IF NOT EXISTS created_at TIMESTAMP DEFAULT (now() AT TIME ZONE 'Asia/Shanghai') |
| 74 | + """)) |
| 75 | + session.commit() |
| 76 | + logger.info("created_at column ensured") |
| 77 | + except Exception as e: |
| 78 | + logger.debug(f"Could not add created_at column: {e}") |
| 79 | + session.rollback() |
| 80 | + |
| 81 | + # Migration: Remove old columns if they exist (for backward compatibility) |
| 82 | + # This ensures smooth upgrade from old version |
| 83 | + logger.info("Checking for old columns to remove...") |
| 84 | + |
| 85 | + # Check and drop operation_name column |
| 86 | + try: |
| 87 | + session.execute(text(""" |
| 88 | + ALTER TABLE deletion_status |
| 89 | + DROP COLUMN IF EXISTS operation_name |
| 90 | + """)) |
| 91 | + session.commit() |
| 92 | + logger.info("Old column 'operation_name' removed (if existed)") |
| 93 | + except Exception as e: |
| 94 | + logger.debug(f"Could not drop operation_name: {e}") |
| 95 | + session.rollback() |
| 96 | + |
| 97 | + # Check and drop executed_at column |
| 98 | + try: |
| 99 | + session.execute(text(""" |
| 100 | + ALTER TABLE deletion_status |
| 101 | + DROP COLUMN IF EXISTS executed_at |
| 102 | + """)) |
| 103 | + session.commit() |
| 104 | + logger.info("Old column 'executed_at' removed (if existed)") |
| 105 | + except Exception as e: |
| 106 | + logger.debug(f"Could not drop executed_at: {e}") |
| 107 | + session.rollback() |
| 108 | + |
| 109 | + # Clean up old records with NULL version (from previous schema) |
| 110 | + # This removes legacy records that don't have version information |
| 111 | + try: |
| 112 | + result = session.execute(text("DELETE FROM deletion_status WHERE version IS NULL")) |
| 113 | + deleted_count = result.rowcount |
| 114 | + session.commit() |
| 115 | + if deleted_count > 0: |
| 116 | + logger.info(f"Cleaned up {deleted_count} old record(s) with NULL version") |
| 117 | + except Exception as e: |
| 118 | + logger.debug(f"Could not clean up old records: {e}") |
| 119 | + session.rollback() |
| 120 | + |
| 121 | + logger.info("Table structure migration completed") |
| 122 | + |
24 | 123 | except Exception as e: |
25 | | - logger.error(f"Error creating deletion_status table: {e}") |
| 124 | + logger.error(f"Error creating/updating deletion_status table: {e}") |
26 | 125 | session.rollback() |
27 | 126 |
|
28 | 127 | def has_deletion_been_executed() -> bool: |
29 | 128 | """ |
30 | | - Check if the one-time deletion has already been executed by checking database. |
| 129 | + Check if the one-time deletion has already been executed by checking database version. |
| 130 | + Returns False if: |
| 131 | + - No record exists in deletion_status table |
| 132 | + - Version field is NULL (old data without version) |
| 133 | + - Version doesn't match current INIT_DATA_VERSION |
| 134 | + This ensures re-initialization when SQL files are updated. |
| 135 | + |
| 136 | + Execution order guarantee: |
| 137 | + 1. First call ensure_deletion_status_table() to ensure correct table structure |
| 138 | + 2. Then query the version field value |
| 139 | + This way even old users (with old fields in table) can migrate correctly |
31 | 140 | """ |
| 141 | + # Step 1: Ensure deletion_status table structure is correct |
| 142 | + # For old users, automatically removes operation_name and executed_at fields, and adds created_at |
32 | 143 | ensure_deletion_status_table() |
| 144 | + |
| 145 | + # Step 2: Query version number (get the latest version from deletion_status table) |
33 | 146 | with get_sync_session() as session: |
34 | 147 | try: |
35 | 148 | result = session.execute( |
36 | | - text("SELECT 1 FROM deletion_status WHERE operation_name = :name"), |
37 | | - {"name": "init_data_deletion"} |
| 149 | + text("SELECT version FROM deletion_status ORDER BY id DESC LIMIT 1") |
38 | 150 | ) |
39 | | - executed = result.scalar_one_or_none() is not None |
40 | | - if executed: |
41 | | - logger.info("Deletion has already been executed (found record in deletion_status table)") |
| 151 | + db_version = result.scalar_one_or_none() |
| 152 | + |
| 153 | + # scalar_one_or_none() returns None in these cases: |
| 154 | + # 1. No records in the table |
| 155 | + # 2. Has record but version field is NULL (old users after upgrade) |
| 156 | + if db_version is None: |
| 157 | + logger.info(f"No version record found or version is NULL in database. Current code version: {INIT_DATA_VERSION}. Will execute re-initialization.") |
| 158 | + return False |
| 159 | + elif db_version != INIT_DATA_VERSION: |
| 160 | + logger.info(f"Version mismatch detected! Database version: {db_version}, Code version: {INIT_DATA_VERSION}. Will execute re-initialization.") |
| 161 | + return False |
42 | 162 | else: |
43 | | - logger.info("Deletion has not been executed yet (no record in deletion_status table)") |
44 | | - return executed |
| 163 | + logger.info(f"Version matched: {db_version}. Initialization already executed for this version.") |
| 164 | + return True |
45 | 165 | except Exception as e: |
| 166 | + # Any exception returns False, triggering initialization (safety strategy) |
46 | 167 | logger.warning(f"Error checking deletion status: {e}") |
| 168 | + logger.warning("Will trigger re-initialization as a safety measure.") |
47 | 169 | return False |
48 | 170 |
|
49 | 171 | def mark_deletion_as_executed(): |
50 | 172 | """ |
51 | | - Mark the one-time deletion as executed in database. |
| 173 | + Mark the one-time deletion as executed in database with current version. |
| 174 | + Appends a new record to keep version history (does not delete old records). |
| 175 | + Each initialization creates a new record with timestamp. |
52 | 176 | """ |
53 | 177 | with get_sync_session() as session: |
54 | 178 | try: |
| 179 | + # Insert new record with current version (append, not replace) |
| 180 | + # created_at will be automatically set to CURRENT_TIMESTAMP |
55 | 181 | session.execute( |
56 | 182 | text(""" |
57 | | - INSERT INTO deletion_status (operation_name, description) |
58 | | - VALUES (:name, :desc) |
59 | | - ON CONFLICT (operation_name) DO NOTHING |
| 183 | + INSERT INTO deletion_status (version, description) |
| 184 | + VALUES (:version, :desc) |
60 | 185 | """), |
61 | 186 | { |
62 | | - "name": "init_data_deletion", |
63 | | - "desc": "Initial data deletion from Initialization_data SQL files" |
| 187 | + "version": INIT_DATA_VERSION, |
| 188 | + "desc": f"Initialization data version {INIT_DATA_VERSION} from Initialization_data SQL files" |
64 | 189 | } |
65 | 190 | ) |
66 | 191 | session.commit() |
67 | | - logger.info("Marked deletion as executed in deletion_status table") |
| 192 | + logger.info(f"Marked deletion as executed with version {INIT_DATA_VERSION} in deletion_status table (appended to history)") |
68 | 193 | except Exception as e: |
69 | 194 | logger.error(f"Error marking deletion as executed: {e}") |
70 | 195 | session.rollback() |
71 | 196 |
|
72 | 197 | def has_table_alteration_been_executed() -> bool: |
73 | 198 | """ |
74 | | - Check if the one-time table alteration has already been executed by checking database. |
| 199 | + Check if the one-time table alteration has already been executed. |
| 200 | + Since we use ALTER TABLE ... ADD COLUMN IF NOT EXISTS, we can check column existence directly. |
| 201 | + Returns True if columns already exist (no need to execute again). |
75 | 202 | """ |
76 | | - ensure_deletion_status_table() |
77 | 203 | with get_sync_session() as session: |
78 | 204 | try: |
79 | | - result = session.execute( |
80 | | - text("SELECT 1 FROM deletion_status WHERE operation_name = :name"), |
81 | | - {"name": "table_alteration_add_description"} |
82 | | - ) |
83 | | - executed = result.scalar_one_or_none() is not None |
84 | | - if executed: |
85 | | - logger.info("Table alteration has already been executed (found record in deletion_status table)") |
| 205 | + # Check if operator_description column exists in operator_info table |
| 206 | + result = session.execute(text(""" |
| 207 | + SELECT column_name |
| 208 | + FROM information_schema.columns |
| 209 | + WHERE table_name = 'operator_info' |
| 210 | + AND column_name = 'operator_description' |
| 211 | + """)) |
| 212 | + operator_col_exists = result.scalar_one_or_none() is not None |
| 213 | + |
| 214 | + # Check if config_description column exists in operator_config table |
| 215 | + result = session.execute(text(""" |
| 216 | + SELECT column_name |
| 217 | + FROM information_schema.columns |
| 218 | + WHERE table_name = 'operator_config' |
| 219 | + AND column_name = 'config_description' |
| 220 | + """)) |
| 221 | + config_col_exists = result.scalar_one_or_none() is not None |
| 222 | + |
| 223 | + both_exist = operator_col_exists and config_col_exists |
| 224 | + |
| 225 | + if both_exist: |
| 226 | + logger.info("Table alteration columns already exist, skipping...") |
86 | 227 | else: |
87 | | - logger.info("Table alteration has not been executed yet (no record in deletion_status table)") |
88 | | - return executed |
| 228 | + logger.info("Table alteration columns do not exist, will execute...") |
| 229 | + |
| 230 | + return both_exist |
89 | 231 | except Exception as e: |
90 | 232 | logger.warning(f"Error checking table alteration status: {e}") |
91 | 233 | return False |
92 | 234 |
|
93 | 235 | def mark_table_alteration_as_executed(): |
94 | 236 | """ |
95 | | - Mark the one-time table alteration as executed in database. |
| 237 | + Mark the one-time table alteration as executed. |
| 238 | + Since we check column existence directly, this function is now a no-op. |
| 239 | + Kept for compatibility. |
96 | 240 | """ |
97 | | - with get_sync_session() as session: |
98 | | - try: |
99 | | - session.execute( |
100 | | - text(""" |
101 | | - INSERT INTO deletion_status (operation_name, description) |
102 | | - VALUES (:name, :desc) |
103 | | - ON CONFLICT (operation_name) DO NOTHING |
104 | | - """), |
105 | | - { |
106 | | - "name": "table_alteration_add_description", |
107 | | - "desc": "Added operator_description and config_description columns" |
108 | | - } |
109 | | - ) |
110 | | - session.commit() |
111 | | - logger.info("Marked table alteration as executed in deletion_status table") |
112 | | - except Exception as e: |
113 | | - logger.error(f"Error marking table alteration as executed: {e}") |
114 | | - session.rollback() |
| 241 | + logger.info("Table alteration completed (no status record needed)") |
| 242 | + pass |
115 | 243 |
|
116 | 244 | def alter_tables_add_description_columns(): |
117 | 245 | """ |
|
0 commit comments