Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions dtable_events/automations/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4922,8 +4922,6 @@ def can_condition_trigger_action(self, action):
def do_actions(self, db_session, with_test=False):
if with_test:
auto_rule_logger.info('rule: %s run_condition: %s trigger_condition: %s start, a test run', self.rule_id, self.run_condition, self.trigger.get('condition'))
else:
auto_rule_logger.info('rule: %s run_condition: %s trigger_condition: %s start', self.rule_id, self.run_condition, self.trigger.get('condition'))
if (not self.can_do_actions()) and (not with_test):
auto_rule_logger.info('rule: %s can not do actions', self.rule_id)
return
Expand Down
76 changes: 62 additions & 14 deletions dtable_events/automations/automations_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ def get_automation_rule(self, db_session, event_data):
return AutomationRule(event_data, rule.trigger, rule.actions, options)

def receive(self):
auto_rule_logger.info(f"Start to receive automation event from redis, window seconds {self.rate_limiter.window_secs} limit percent {self.rate_limiter.percent}")
auto_rule_logger.info(
"Start consuming automation events from Redis: window_secs=%s limit_percent=%s",
self.rate_limiter.window_secs,
self.rate_limiter.percent,
)
subscriber = self._redis_client.get_subscriber(self.per_update_channel)
last_pubsub_message_time = time.time()
while True:
Expand All @@ -153,7 +157,15 @@ def receive(self):
continue
last_pubsub_message_time = time.time()
event = json.loads(message['data'])
auto_rule_logger.info(f"subscribe event {event}")
auto_rule_logger.info(
"Received automation event: rule_id=%s dtable_uuid=%s op_type=%s table_id=%s row_id=%s updated_column_keys=%s",
event.get('automation_rule_id'),
event.get('dtable_uuid'),
event.get('op_type'),
event.get('table_id'),
event.get('row_id'),
event.get('updated_column_keys'),
)

db_session = self._db_session_class()
try:
Expand All @@ -164,7 +176,12 @@ def receive(self):
if not automation_rule:
continue
if not self.rate_limiter.is_allowed(owner_info['owner'], owner_info['org_id'], self.workers):
auto_rule_logger.info(f"owner {owner_info['owner']} org {owner_info['org_id']} rate limit exceed, event {event} will not trigger")
auto_rule_logger.info(
"Skip automation event: rate limited owner=%s org_id=%s rule_id=%s",
owner_info['owner'],
owner_info['org_id'],
automation_rule.rule_id,
)
automation_rule.append_warning({'type': 'exceed_system_resource_limit'})
self.results_queue.put(AutomationResult(
rule_id=automation_rule.rule_id,
Expand All @@ -182,10 +199,20 @@ def receive(self):
self.add_exceed_system_resource_limit_entity(automation_rule.owner, automation_rule.org_id)
continue
if self.automations_stats_manager.is_exceed(db_session, owner_info['owner'], owner_info['org_id']):
auto_rule_logger.info(f"owner {owner_info['owner']} org {owner_info['org_id']} trigger count limit exceed, {event} will not trigger")
auto_rule_logger.info(
"Skip automation event: trigger quota exceeded owner=%s org_id=%s rule_id=%s",
owner_info['owner'],
owner_info['org_id'],
automation_rule.rule_id,
)
continue
if not automation_rule.can_do_actions():
auto_rule_logger.info(f"owner {owner_info['owner']} org {owner_info['org_id']} trigger run condition missed, {event} will not trigger")
auto_rule_logger.info(
"Skip automation event: trigger conditions not met rule_id=%s owner=%s org_id=%s",
automation_rule.rule_id,
owner_info['owner'],
owner_info['org_id'],
)
continue
self.automations_queue.put(automation_rule)
self.realtime_trigger_count += 1
Expand All @@ -202,20 +229,37 @@ def receive(self):
continue
time.sleep(0.5)
except Exception as e:
auto_rule_logger.error('redis pubsub receive error: %s', e)
auto_rule_logger.error('Redis pubsub receive failed: %s', e)
subscriber = self._redis_client.refresh_subscriber(subscriber, self.per_update_channel, str(e))
last_pubsub_message_time = time.time()

def worker(self):
while True:
automation = self.automations_queue.get()
auto_rule_logger.info(f"Automation {automation.rule_id} with data {automation.data} triggering")
row_id = automation.data.get('row_id') if isinstance(automation.data, dict) else None
updated_column_keys = automation.data.get('updated_column_keys') if isinstance(automation.data, dict) else None
auto_rule_logger.info(
'Automation started: rule_id=%s rule_name=%s dtable_uuid=%s run_condition=%s row_id=%s updated_column_keys=%s',
automation.rule_id,
automation.rule_name,
automation.dtable_uuid,
automation.run_condition,
row_id,
updated_column_keys,
)
db_session = self._db_session_class()
try:
start_time = time.time()
result = automation.do_actions(db_session)
run_time = time.time() - start_time
auto_rule_logger.info(f"Automation {automation.rule_id} with data {automation.data} result is {result} run for {run_time}")
auto_rule_logger.info(
'Automation finished: rule_id=%s success=%s run_time=%.3fs exceed_limit=%s warnings=%s',
automation.rule_id,
result.success if result else None,
run_time,
result.is_exceed_system_resource_limit if result else None,
len(result.warnings) if result else 0
)
if result:
result.run_time = run_time
self.results_queue.put(result)
Expand Down Expand Up @@ -245,13 +289,14 @@ def scan_rules(self):
per_month_check_time = datetime.utcnow() - timedelta(days=27) # consider the least month-days 28 in February (the 2nd month) in common years
db_session = self._db_session_class()
try:
auto_rule_logger.info('Scanning scheduled automation rules...')
rules = db_session.execute(text(sql), {
'per_day_check_time': per_day_check_time,
'per_week_check_time': per_week_check_time,
'per_month_check_time': per_month_check_time
})
except Exception as e:
auto_rule_logger.exception('query regular automation rules error: %s', e)
auto_rule_logger.exception('Failed to query scheduled automation rules: %s', e)
db_session.close()
return

Expand Down Expand Up @@ -292,25 +337,28 @@ def scheduled_scan(self):
# fire at every hour in every day of week
@sched.scheduled_job('cron', day_of_week='*', hour='*', misfire_grace_time=600)
def timed_job():
auto_rule_logger.info('Starts to scan scheduled automation rules...')

try:
self.scan_rules()
except Exception as e:
auto_rule_logger.exception('error when scanning scheduled automation rules: %s', e)
auto_rule_logger.exception('Failed to scan scheduled automation rules: %s', e)

sched.start()

def stats(self):
auto_rule_logger.info("Start to stats thread")
auto_rule_logger.info("Start stats thread")
while True:
result = self.results_queue.get()
if result.run_condition == 'per_update' and not result.is_exceed_system_resource_limit:
owner = result.owner
org_id = result.org_id
run_time = result.run_time
self.rate_limiter.record_time(owner, org_id, run_time)
auto_rule_logger.debug(f"owner {owner} org_id {org_id} usage percent {self.rate_limiter.get_percent(owner, org_id, self.workers)}")
auto_rule_logger.debug(
'Automation usage percent owner=%s org_id=%s percent=%s',
owner,
org_id,
self.rate_limiter.get_percent(owner, org_id, self.workers),
)
db_session = self._db_session_class()
try:
self.automations_stats_manager.update_stats(db_session, result)
Expand Down
Loading