4646 BaseProcessor ,
4747 JobNotFoundError ,
4848 JobResultNotFoundError ,
49+ ProcessorExecuteError ,
4950 UnknownProcessError ,
5051)
5152from pygeoapi .util import (
5253 get_current_datetime ,
54+ is_request_allowed ,
5355 JobStatus ,
5456 ProcessExecutionMode ,
5557 RequestedProcessExecutionMode ,
@@ -105,7 +107,11 @@ def get_processor(self, process_id: str) -> BaseProcessor:
105107 except KeyError as err :
106108 raise UnknownProcessError ('Invalid process identifier' ) from err
107109 else :
108- return load_plugin ('process' , process_conf ['processor' ])
110+ pp = load_plugin ('process' , process_conf ['processor' ])
111+ pp .allow_internal_requests = process_conf .get (
112+ 'allow_internal_requests' , False )
113+
114+ return pp
109115
110116 def get_jobs (self ,
111117 status : JobStatus = None ,
@@ -394,13 +400,13 @@ def execute_process(
394400 """
395401
396402 job_id = str (uuid .uuid1 ())
397- processor = self .get_processor (process_id )
398- processor .set_job_id (job_id )
403+ self . processor = self .get_processor (process_id )
404+ self . processor .set_job_id (job_id )
399405 extra_execute_handler_parameters = {
400406 'requested_response' : requested_response
401407 }
402408
403- job_control_options = processor .metadata .get (
409+ job_control_options = self . processor .metadata .get (
404410 'jobControlOptions' , [])
405411
406412 if execution_mode == RequestedProcessExecutionMode .respond_async :
@@ -473,7 +479,7 @@ def execute_process(
473479 # TODO: handler's response could also be allowed to include more HTTP
474480 # headers
475481 mime_type , outputs , status = handler (
476- processor ,
482+ self . processor ,
477483 job_id ,
478484 data_dict ,
479485 requested_outputs ,
@@ -483,26 +489,37 @@ def execute_process(
483489
484490 def _send_in_progress_notification (self , subscriber : Optional [Subscriber ]):
485491 if subscriber and subscriber .in_progress_uri :
486- response = requests .post (subscriber .in_progress_uri , json = {})
487- LOGGER .debug (
488- f'In progress notification response: { response .status_code } '
489- )
492+ self .__do_subscriber_request (subscriber .in_progress_uri )
490493
491494 def _send_success_notification (
492495 self , subscriber : Optional [Subscriber ], outputs : Any
493496 ):
494- if subscriber :
495- response = requests .post (subscriber .success_uri , json = outputs )
496- LOGGER .debug (
497- f'Success notification response: { response .status_code } '
498- )
497+ if subscriber and subscriber .success_uri :
498+ self .__do_subscriber_request (subscriber .success_uri , outputs )
499499
500500 def _send_failed_notification (self , subscriber : Optional [Subscriber ]):
501501 if subscriber and subscriber .failed_uri :
502- response = requests .post (subscriber .failed_uri , json = {})
503- LOGGER .debug (
504- f'Failed notification response: { response .status_code } '
505- )
502+ self .__do_subscriber_request (subscriber .failed_uri )
503+
504+ def __do_subscriber_request (self , url : str , data : dict = {}) -> None :
505+ """
506+ Helper function to execute a subscriber URL via HTTP POST
507+
508+ :param url: `str` of URL
509+ :param data: `dict` of request payload
510+
511+ :returns: `None`
512+ """
513+
514+ if not is_request_allowed (url , self .processor .allow_internal_requests ):
515+ msg = 'URL not allowed'
516+ LOGGER .error (f'{ msg } : { url } ' )
517+ raise ProcessorExecuteError (msg )
518+
519+ response = requests .post (url , json = data )
520+ LOGGER .debug (
521+ f'Response: { response .status_code } '
522+ )
506523
507524 def __repr__ (self ):
508525 return f'<BaseManager> { self .name } '
0 commit comments