diff --git a/dtable_events/automations/actions.py b/dtable_events/automations/actions.py index c4e0fdb8..6f94df22 100644 --- a/dtable_events/automations/actions.py +++ b/dtable_events/automations/actions.py @@ -4922,8 +4922,6 @@ def can_condition_trigger_action(self, action): def do_actions(self, db_session, with_test=False): if with_test: auto_rule_logger.info('rule: %s run_condition: %s trigger_condition: %s start, a test run', self.rule_id, self.run_condition, self.trigger.get('condition')) - else: - auto_rule_logger.info('rule: %s run_condition: %s trigger_condition: %s start', self.rule_id, self.run_condition, self.trigger.get('condition')) if (not self.can_do_actions()) and (not with_test): auto_rule_logger.info('rule: %s can not do actions', self.rule_id) return diff --git a/dtable_events/automations/automations_pipeline.py b/dtable_events/automations/automations_pipeline.py index b8f78169..3a351e00 100644 --- a/dtable_events/automations/automations_pipeline.py +++ b/dtable_events/automations/automations_pipeline.py @@ -141,7 +141,11 @@ def get_automation_rule(self, db_session, event_data): return AutomationRule(event_data, rule.trigger, rule.actions, options) def receive(self): - auto_rule_logger.info(f"Start to receive automation event from redis, window seconds {self.rate_limiter.window_secs} limit percent {self.rate_limiter.percent}") + auto_rule_logger.info( + "Start consuming automation events from Redis: window_secs=%s limit_percent=%s", + self.rate_limiter.window_secs, + self.rate_limiter.percent, + ) subscriber = self._redis_client.get_subscriber(self.per_update_channel) last_pubsub_message_time = time.time() while True: @@ -153,7 +157,15 @@ def receive(self): continue last_pubsub_message_time = time.time() event = json.loads(message['data']) - auto_rule_logger.info(f"subscribe event {event}") + auto_rule_logger.info( + "Received automation event: rule_id=%s dtable_uuid=%s op_type=%s table_id=%s row_id=%s updated_column_keys=%s", + event.get('automation_rule_id'), + event.get('dtable_uuid'), + event.get('op_type'), + event.get('table_id'), + event.get('row_id'), + event.get('updated_column_keys'), + ) db_session = self._db_session_class() try: @@ -164,7 +176,12 @@ def receive(self): if not automation_rule: continue if not self.rate_limiter.is_allowed(owner_info['owner'], owner_info['org_id'], self.workers): - auto_rule_logger.info(f"owner {owner_info['owner']} org {owner_info['org_id']} rate limit exceed, event {event} will not trigger") + auto_rule_logger.info( + "Skip automation event: rate limited owner=%s org_id=%s rule_id=%s", + owner_info['owner'], + owner_info['org_id'], + automation_rule.rule_id, + ) automation_rule.append_warning({'type': 'exceed_system_resource_limit'}) self.results_queue.put(AutomationResult( rule_id=automation_rule.rule_id, @@ -182,10 +199,20 @@ def receive(self): self.add_exceed_system_resource_limit_entity(automation_rule.owner, automation_rule.org_id) continue if self.automations_stats_manager.is_exceed(db_session, owner_info['owner'], owner_info['org_id']): - auto_rule_logger.info(f"owner {owner_info['owner']} org {owner_info['org_id']} trigger count limit exceed, {event} will not trigger") + auto_rule_logger.info( + "Skip automation event: trigger quota exceeded owner=%s org_id=%s rule_id=%s", + owner_info['owner'], + owner_info['org_id'], + automation_rule.rule_id, + ) continue if not automation_rule.can_do_actions(): - auto_rule_logger.info(f"owner {owner_info['owner']} org {owner_info['org_id']} trigger run condition missed, {event} will not trigger") + auto_rule_logger.info( + "Skip automation event: trigger conditions not met rule_id=%s owner=%s org_id=%s", + automation_rule.rule_id, + owner_info['owner'], + owner_info['org_id'], + ) continue self.automations_queue.put(automation_rule) self.realtime_trigger_count += 1 @@ -202,20 +229,37 @@ def receive(self): continue time.sleep(0.5) except Exception as e: - auto_rule_logger.error('redis pubsub receive error: %s', e) + auto_rule_logger.error('Redis pubsub receive failed: %s', e) subscriber = self._redis_client.refresh_subscriber(subscriber, self.per_update_channel, str(e)) last_pubsub_message_time = time.time() def worker(self): while True: automation = self.automations_queue.get() - auto_rule_logger.info(f"Automation {automation.rule_id} with data {automation.data} triggering") + row_id = automation.data.get('row_id') if isinstance(automation.data, dict) else None + updated_column_keys = automation.data.get('updated_column_keys') if isinstance(automation.data, dict) else None + auto_rule_logger.info( + 'Automation started: rule_id=%s rule_name=%s dtable_uuid=%s run_condition=%s row_id=%s updated_column_keys=%s', + automation.rule_id, + automation.rule_name, + automation.dtable_uuid, + automation.run_condition, + row_id, + updated_column_keys, + ) db_session = self._db_session_class() try: start_time = time.time() result = automation.do_actions(db_session) run_time = time.time() - start_time - auto_rule_logger.info(f"Automation {automation.rule_id} with data {automation.data} result is {result} run for {run_time}") + auto_rule_logger.info( + 'Automation finished: rule_id=%s success=%s run_time=%.3fs exceed_limit=%s warnings=%s', + automation.rule_id, + result.success if result else None, + run_time, + result.is_exceed_system_resource_limit if result else None, + len(result.warnings) if result else 0 + ) if result: result.run_time = run_time self.results_queue.put(result) @@ -245,13 +289,14 @@ def scan_rules(self): per_month_check_time = datetime.utcnow() - timedelta(days=27) # consider the least month-days 28 in February (the 2nd month) in common years db_session = self._db_session_class() try: + auto_rule_logger.info('Scanning scheduled automation rules...') rules = db_session.execute(text(sql), { 'per_day_check_time': per_day_check_time, 'per_week_check_time': per_week_check_time, 'per_month_check_time': per_month_check_time }) except Exception as e: - auto_rule_logger.exception('query regular automation rules error: %s', e) + auto_rule_logger.exception('Failed to query scheduled automation rules: %s', e) db_session.close() return @@ -292,17 +337,15 @@ def scheduled_scan(self): # fire at every hour in every day of week @sched.scheduled_job('cron', day_of_week='*', hour='*', misfire_grace_time=600) def timed_job(): - auto_rule_logger.info('Starts to scan scheduled automation rules...') - try: self.scan_rules() except Exception as e: - auto_rule_logger.exception('error when scanning scheduled automation rules: %s', e) + auto_rule_logger.exception('Failed to scan scheduled automation rules: %s', e) sched.start() def stats(self): - auto_rule_logger.info("Start to stats thread") + auto_rule_logger.info("Start stats thread") while True: result = self.results_queue.get() if result.run_condition == 'per_update' and not result.is_exceed_system_resource_limit: @@ -310,7 +353,12 @@ def stats(self): org_id = result.org_id run_time = result.run_time self.rate_limiter.record_time(owner, org_id, run_time) - auto_rule_logger.debug(f"owner {owner} org_id {org_id} usage percent {self.rate_limiter.get_percent(owner, org_id, self.workers)}") + auto_rule_logger.debug( + 'Automation usage percent owner=%s org_id=%s percent=%s', + owner, + org_id, + self.rate_limiter.get_percent(owner, org_id, self.workers), + ) db_session = self._db_session_class() try: self.automations_stats_manager.update_stats(db_session, result)