From a12cad7bc0e28469bd171388d1b0d5e52e18bf1b Mon Sep 17 00:00:00 2001 From: Alex Happy <1223408988@qq.com> Date: Wed, 10 Jun 2026 16:50:52 +0800 Subject: [PATCH 1/4] opt automation logs --- .../automations/automations_pipeline.py | 83 +++++++++++++++---- 1 file changed, 68 insertions(+), 15 deletions(-) diff --git a/dtable_events/automations/automations_pipeline.py b/dtable_events/automations/automations_pipeline.py index b8f78169..2a532f2a 100644 --- a/dtable_events/automations/automations_pipeline.py +++ b/dtable_events/automations/automations_pipeline.py @@ -141,7 +141,11 @@ def get_automation_rule(self, db_session, event_data): return AutomationRule(event_data, rule.trigger, rule.actions, options) def receive(self): - auto_rule_logger.info(f"Start to receive automation event from redis, window seconds {self.rate_limiter.window_secs} limit percent {self.rate_limiter.percent}") + auto_rule_logger.info( + "Start consuming automation events from Redis: window_secs=%s limit_percent=%s", + self.rate_limiter.window_secs, + self.rate_limiter.percent, + ) subscriber = self._redis_client.get_subscriber(self.per_update_channel) last_pubsub_message_time = time.time() while True: @@ -153,7 +157,15 @@ def receive(self): continue last_pubsub_message_time = time.time() event = json.loads(message['data']) - auto_rule_logger.info(f"subscribe event {event}") + auto_rule_logger.info( + "Received automation event: rule_id=%s dtable_uuid=%s op_type=%s table_id=%s row_id=%s updated_column_keys=%s", + event.get('automation_rule_id'), + event.get('dtable_uuid'), + event.get('op_type'), + event.get('table_id'), + event.get('row_id'), + event.get('updated_column_keys'), + ) db_session = self._db_session_class() try: @@ -164,7 +176,12 @@ def receive(self): if not automation_rule: continue if not self.rate_limiter.is_allowed(owner_info['owner'], owner_info['org_id'], self.workers): - auto_rule_logger.info(f"owner {owner_info['owner']} org {owner_info['org_id']} rate limit exceed, event {event} will not trigger") + auto_rule_logger.info( + "Skip automation event: rate limited owner=%s org_id=%s rule_id=%s", + owner_info['owner'], + owner_info['org_id'], + automation_rule.rule_id, + ) automation_rule.append_warning({'type': 'exceed_system_resource_limit'}) self.results_queue.put(AutomationResult( rule_id=automation_rule.rule_id, @@ -182,10 +199,20 @@ def receive(self): self.add_exceed_system_resource_limit_entity(automation_rule.owner, automation_rule.org_id) continue if self.automations_stats_manager.is_exceed(db_session, owner_info['owner'], owner_info['org_id']): - auto_rule_logger.info(f"owner {owner_info['owner']} org {owner_info['org_id']} trigger count limit exceed, {event} will not trigger") + auto_rule_logger.info( + "Skip automation event: trigger quota exceeded owner=%s org_id=%s rule_id=%s", + owner_info['owner'], + owner_info['org_id'], + automation_rule.rule_id, + ) continue if not automation_rule.can_do_actions(): - auto_rule_logger.info(f"owner {owner_info['owner']} org {owner_info['org_id']} trigger run condition missed, {event} will not trigger") + auto_rule_logger.info( + "Skip automation event: trigger conditions not met rule_id=%s owner=%s org_id=%s", + automation_rule.rule_id, + owner_info['owner'], + owner_info['org_id'], + ) continue self.automations_queue.put(automation_rule) self.realtime_trigger_count += 1 @@ -195,27 +222,49 @@ def receive(self): db_session.close() else: if time.time() - last_pubsub_message_time >= self._pubsub_no_message_timeout: - auto_rule_logger.info('no automation message for %ss', self._pubsub_no_message_timeout) + auto_rule_logger.info( + 'No automation events received for %ss, refreshing subscriber', + self._pubsub_no_message_timeout, + ) subscriber = self._redis_client.refresh_subscriber( subscriber, self.per_update_channel, 'no message timeout') last_pubsub_message_time = time.time() continue time.sleep(0.5) except Exception as e: - auto_rule_logger.error('redis pubsub receive error: %s', e) + auto_rule_logger.error('Redis pubsub receive failed: %s', e) subscriber = self._redis_client.refresh_subscriber(subscriber, self.per_update_channel, str(e)) last_pubsub_message_time = time.time() def worker(self): while True: automation = self.automations_queue.get() - auto_rule_logger.info(f"Automation {automation.rule_id} with data {automation.data} triggering") + row_id = automation.data.get('row_id') if isinstance(automation.data, dict) else None + updated_column_keys = automation.data.get('updated_column_keys') if isinstance(automation.data, dict) else None + auto_rule_logger.info( + 'Automation started: rule_id=%s dtable_uuid=%s run_condition=%s row_id=%s updated_column_keys=%s', + automation.rule_id, + automation.dtable_uuid, + automation.run_condition, + row_id, + updated_column_keys, + ) db_session = self._db_session_class() try: start_time = time.time() result = automation.do_actions(db_session) run_time = time.time() - start_time - auto_rule_logger.info(f"Automation {automation.rule_id} with data {automation.data} result is {result} run for {run_time}") + auto_rule_logger.info( + 'Automation finished: rule_id=%s rule_name=%s success=%s run_time=%.3fs exceed_limit=%s warnings=%s row_id=%s updated_column_keys=%s', + automation.rule_id, + automation.rule_name, + result.success if result else None, + run_time, + result.is_exceed_system_resource_limit if result else None, + len(result.warnings) if result else 0, + row_id, + updated_column_keys, + ) if result: result.run_time = run_time self.results_queue.put(result) @@ -245,13 +294,14 @@ def scan_rules(self): per_month_check_time = datetime.utcnow() - timedelta(days=27) # consider the least month-days 28 in February (the 2nd month) in common years db_session = self._db_session_class() try: + auto_rule_logger.info('Scanning scheduled automation rules...') rules = db_session.execute(text(sql), { 'per_day_check_time': per_day_check_time, 'per_week_check_time': per_week_check_time, 'per_month_check_time': per_month_check_time }) except Exception as e: - auto_rule_logger.exception('query regular automation rules error: %s', e) + auto_rule_logger.exception('Failed to query scheduled automation rules: %s', e) db_session.close() return @@ -292,17 +342,15 @@ def scheduled_scan(self): # fire at every hour in every day of week @sched.scheduled_job('cron', day_of_week='*', hour='*', misfire_grace_time=600) def timed_job(): - auto_rule_logger.info('Starts to scan scheduled automation rules...') - try: self.scan_rules() except Exception as e: - auto_rule_logger.exception('error when scanning scheduled automation rules: %s', e) + auto_rule_logger.exception('Failed to scan scheduled automation rules: %s', e) sched.start() def stats(self): - auto_rule_logger.info("Start to stats thread") + auto_rule_logger.info("Start stats thread") while True: result = self.results_queue.get() if result.run_condition == 'per_update' and not result.is_exceed_system_resource_limit: @@ -310,7 +358,12 @@ def stats(self): org_id = result.org_id run_time = result.run_time self.rate_limiter.record_time(owner, org_id, run_time) - auto_rule_logger.debug(f"owner {owner} org_id {org_id} usage percent {self.rate_limiter.get_percent(owner, org_id, self.workers)}") + auto_rule_logger.debug( + 'Automation usage percent owner=%s org_id=%s percent=%s', + owner, + org_id, + self.rate_limiter.get_percent(owner, org_id, self.workers), + ) db_session = self._db_session_class() try: self.automations_stats_manager.update_stats(db_session, result) From ecd18925d5f345ab1e41a7cd3366a65f77f275ce Mon Sep 17 00:00:00 2001 From: Alex Happy <1223408988@qq.com> Date: Thu, 11 Jun 2026 10:19:54 +0800 Subject: [PATCH 2/4] update --- dtable_events/automations/automations_pipeline.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dtable_events/automations/automations_pipeline.py b/dtable_events/automations/automations_pipeline.py index 2a532f2a..809d38f3 100644 --- a/dtable_events/automations/automations_pipeline.py +++ b/dtable_events/automations/automations_pipeline.py @@ -222,10 +222,7 @@ def receive(self): db_session.close() else: if time.time() - last_pubsub_message_time >= self._pubsub_no_message_timeout: - auto_rule_logger.info( - 'No automation events received for %ss, refreshing subscriber', - self._pubsub_no_message_timeout, - ) + auto_rule_logger.info('no automation message for %ss', self._pubsub_no_message_timeout) subscriber = self._redis_client.refresh_subscriber( subscriber, self.per_update_channel, 'no message timeout') last_pubsub_message_time = time.time() From 8d7edee5be831328e2e97965c03bb4fcf8e759da Mon Sep 17 00:00:00 2001 From: Alex Happy <1223408988@qq.com> Date: Thu, 11 Jun 2026 10:24:17 +0800 Subject: [PATCH 3/4] update --- dtable_events/automations/automations_pipeline.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dtable_events/automations/automations_pipeline.py b/dtable_events/automations/automations_pipeline.py index 809d38f3..c4633538 100644 --- a/dtable_events/automations/automations_pipeline.py +++ b/dtable_events/automations/automations_pipeline.py @@ -252,7 +252,7 @@ def worker(self): result = automation.do_actions(db_session) run_time = time.time() - start_time auto_rule_logger.info( - 'Automation finished: rule_id=%s rule_name=%s success=%s run_time=%.3fs exceed_limit=%s warnings=%s row_id=%s updated_column_keys=%s', + 'Automation finished: rule_id=%s rule_name=%s success=%s run_time=%.3fs exceed_limit=%s warnings=%s row_id=%s updated_column_keys=%s is_valid=%s valid_type=%s', automation.rule_id, automation.rule_name, result.success if result else None, @@ -261,6 +261,8 @@ def worker(self): len(result.warnings) if result else 0, row_id, updated_column_keys, + result.is_valid if result else None, + result.invalid_type if result else None ) if result: result.run_time = run_time From 3e78da9f15f384078419d607126688f51993655a Mon Sep 17 00:00:00 2001 From: Alex Happy <1223408988@qq.com> Date: Thu, 11 Jun 2026 13:34:15 +0800 Subject: [PATCH 4/4] update --- dtable_events/automations/actions.py | 2 -- dtable_events/automations/automations_pipeline.py | 12 ++++-------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/dtable_events/automations/actions.py b/dtable_events/automations/actions.py index c4e0fdb8..6f94df22 100644 --- a/dtable_events/automations/actions.py +++ b/dtable_events/automations/actions.py @@ -4922,8 +4922,6 @@ def can_condition_trigger_action(self, action): def do_actions(self, db_session, with_test=False): if with_test: auto_rule_logger.info('rule: %s run_condition: %s trigger_condition: %s start, a test run', self.rule_id, self.run_condition, self.trigger.get('condition')) - else: - auto_rule_logger.info('rule: %s run_condition: %s trigger_condition: %s start', self.rule_id, self.run_condition, self.trigger.get('condition')) if (not self.can_do_actions()) and (not with_test): auto_rule_logger.info('rule: %s can not do actions', self.rule_id) return diff --git a/dtable_events/automations/automations_pipeline.py b/dtable_events/automations/automations_pipeline.py index c4633538..3a351e00 100644 --- a/dtable_events/automations/automations_pipeline.py +++ b/dtable_events/automations/automations_pipeline.py @@ -239,8 +239,9 @@ def worker(self): row_id = automation.data.get('row_id') if isinstance(automation.data, dict) else None updated_column_keys = automation.data.get('updated_column_keys') if isinstance(automation.data, dict) else None auto_rule_logger.info( - 'Automation started: rule_id=%s dtable_uuid=%s run_condition=%s row_id=%s updated_column_keys=%s', + 'Automation started: rule_id=%s rule_name=%s dtable_uuid=%s run_condition=%s row_id=%s updated_column_keys=%s', automation.rule_id, + automation.rule_name, automation.dtable_uuid, automation.run_condition, row_id, @@ -252,17 +253,12 @@ def worker(self): result = automation.do_actions(db_session) run_time = time.time() - start_time auto_rule_logger.info( - 'Automation finished: rule_id=%s rule_name=%s success=%s run_time=%.3fs exceed_limit=%s warnings=%s row_id=%s updated_column_keys=%s is_valid=%s valid_type=%s', + 'Automation finished: rule_id=%s success=%s run_time=%.3fs exceed_limit=%s warnings=%s', automation.rule_id, - automation.rule_name, result.success if result else None, run_time, result.is_exceed_system_resource_limit if result else None, - len(result.warnings) if result else 0, - row_id, - updated_column_keys, - result.is_valid if result else None, - result.invalid_type if result else None + len(result.warnings) if result else 0 ) if result: result.run_time = run_time