This repository was archived by the owner on Apr 2, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmsp.py
More file actions
2199 lines (1958 loc) · 101 KB
/
Copy pathmsp.py
File metadata and controls
2199 lines (1958 loc) · 101 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Common functionality used in the MSP ETL scripts used to prepare and import metadata from
MER guidance, DATIM, iHUB, and related systems.
Update these for each update of MSP:
1. Run get_codelist_collections_formatted_for_display to provide MSP with codelist definitions
"""
import json
import csv
import datetime
import re
import requests
import ocldev.oclcsvtojsonconverter
import ocldev.oclconstants
import ocldev.oclresourcelist
# Constants for OCL mappings
MSP_MAP_TYPE_REF_INDICATOR_TO_DE = 'Has Data Element'
MSP_MAP_TYPE_REF_INDICATOR_TO_DATIM_INDICATOR = 'Has DATIM Indicator'
MSP_MAP_TYPE_DE_TO_COC = 'Has Option'
MSP_MAP_TYPE_REPLACES = 'Replaces'
MSP_MAP_TYPE_DERIVED_FROM = 'Derived From'
MSP_MAP_TYPES = [
MSP_MAP_TYPE_REF_INDICATOR_TO_DE,
MSP_MAP_TYPE_DE_TO_COC,
MSP_MAP_TYPE_REPLACES,
]
MSP_MAP_ID_FORMAT_DE_COC = 'MAP_DE_COC_%s_%s'
MSP_MAP_ID_FORMAT_REFIND_DE = 'MAP_REFIND_DE_%s_%s'
MSP_MAP_ID_FORMAT_REFIND_IND = 'MAP_REFIND_IND_%s_%s'
# Constants for iHUB source spreadsheet
IHUB_COLUMN_INDICATOR = 'indicator'
IHUB_COLUMN_SOURCE_KEY = 'source_srgt_key'
IHUB_COLUMN_DISAGGREGATE = 'disaggregate'
IHUB_COLUMN_STANDARDIZED_DISAGGREGATE = 'standardized_disaggregate'
IHUB_COLUMN_DERIVED_DATA_ELEMENT_UID = 'derived_data_element_uid'
IHUB_COLUMN_DERIVED_DATA_ELEMENT_NAME = 'derived_data_element_name'
IHUB_COLUMN_DERIVED_COC_UID = 'derived_category_option_combo'
IHUB_COLUMN_DERIVED_COC_NAME = 'derived_category_option_combo_name'
IHUB_COLUMN_SOURCE_DATA_ELEMENT_UID = 'source_data_element_uid'
IHUB_COLUMN_SOURCE_DATA_ELEMENT_NAME = 'source_data_element_name'
IHUB_COLUMN_SOURCE_DISAGGREGATE = 'source_disaggregate'
IHUB_COLUMN_SOURCE_COC_UID = 'source_category_option_combo_uid'
IHUB_COLUMN_SOURCE_COC_NAME = 'source_category_option_combo_name'
IHUB_COLUMN_RULE_BEGIN_PERIOD = 'rule_begin_period'
IHUB_COLUMN_RULE_END_PERIOD = 'rule_end_period'
IHUB_COLUMN_ADD_OR_SUBTRACT = 'add_or_subtract'
IHUB_COLUMN_RESULT_TARGET = 'result_target'
IHUB_COLUMN_RUN_SEQUENCE = 'derived_level_run_seq'
IHUB_COLUMN_RULE_ID = 'rule_id'
IHUB_COLUMNS = [
IHUB_COLUMN_INDICATOR,
IHUB_COLUMN_SOURCE_KEY,
IHUB_COLUMN_DISAGGREGATE,
IHUB_COLUMN_STANDARDIZED_DISAGGREGATE,
IHUB_COLUMN_DERIVED_DATA_ELEMENT_UID,
IHUB_COLUMN_DERIVED_DATA_ELEMENT_NAME,
IHUB_COLUMN_DERIVED_COC_UID,
IHUB_COLUMN_DERIVED_COC_NAME,
IHUB_COLUMN_SOURCE_DATA_ELEMENT_UID,
IHUB_COLUMN_SOURCE_DATA_ELEMENT_NAME,
IHUB_COLUMN_SOURCE_DISAGGREGATE,
IHUB_COLUMN_SOURCE_COC_UID,
IHUB_COLUMN_SOURCE_COC_NAME,
IHUB_COLUMN_RULE_BEGIN_PERIOD,
IHUB_COLUMN_RULE_END_PERIOD,
IHUB_COLUMN_ADD_OR_SUBTRACT,
IHUB_COLUMN_RESULT_TARGET,
IHUB_COLUMN_RUN_SEQUENCE,
IHUB_COLUMN_RULE_ID,
]
IHUB_COLUMN_SOURCE_KEY_DATIM = '1'
IHUB_COLUMN_SOURCE_KEY_IHUB = '2'
# Constants for DATIM code list columns
DATIM_CODELIST_COLUMN_DATASET = 0
DATIM_CODELIST_COLUMN_DATA_ELEMENT_NAME = 1
DATIM_CODELIST_COLUMN_DATA_ELEMENT_SHORT_NAME = 2
DATIM_CODELIST_COLUMN_DATA_ELEMENT_CODE = 3
DATIM_CODELIST_COLUMN_DATA_ELEMENT_UID = 4
DATIM_CODELIST_COLUMN_DATA_ELEMENT_DESCRIPTION = 5
DATIM_CODELIST_COLUMN_COC_NAME = 6
DATIM_CODELIST_COLUMN_COC_CODE = 7
DATIM_CODELIST_COLUMN_COC_UID = 8
DATIM_CODELIST_COLUMNS = [
DATIM_CODELIST_COLUMN_DATASET,
DATIM_CODELIST_COLUMN_DATA_ELEMENT_NAME,
DATIM_CODELIST_COLUMN_DATA_ELEMENT_SHORT_NAME,
DATIM_CODELIST_COLUMN_DATA_ELEMENT_CODE,
DATIM_CODELIST_COLUMN_DATA_ELEMENT_UID,
DATIM_CODELIST_COLUMN_DATA_ELEMENT_DESCRIPTION,
DATIM_CODELIST_COLUMN_COC_NAME,
DATIM_CODELIST_COLUMN_COC_CODE,
DATIM_CODELIST_COLUMN_COC_UID,
]
# Constants for MSP collections -- %s is replaced by period (eg FY19)
COLLECTION_NAME_MER_REFERENCE_INDICATORS = 'MER_REFERENCE_INDICATORS_%s'
COLLECTION_NAME_MER_FULL = 'MER_%s'
# Support type constants
SUPPORT_TYPE_CODES = {
'TA': 'Technical Assistance',
'DSD': 'Direct Service Delivery',
'CS': 'Central Support'
}
# Constants for data element custom attributes
ATTR_APPLICABLE_PERIODS = 'Applicable Periods'
ATTR_PERIOD = 'Period'
ATTR_REPORTING_FREQUENCY = 'Reporting frequency'
ATTR_RESULT_TARGET = 'resultTarget'
ATTR_CODELISTS = 'codelists'
ATTR_PEPFAR_SUPPORT_TYPE = 'pepfarSupportType'
ATTR_NUMERATOR_DENOMINATOR_TYPE = 'numeratorDenominator'
ATTR_DOMAIN_TYPE = 'domainType'
ATTR_VALUE_TYPE = 'valueType'
ATTR_AGGREGATION_TYPE = 'aggregationType'
ATTR_STRUCTURED_DATASET = 'Structured Dataset'
# Mapping between periods and terms that appear in DATIM indicator names (case-insensitive)
MAP_PERIOD_TO_INDICATOR_TERMS = {
"FY22": ["FY22", "COP21", "COP 21"],
"FY21": ["FY21", "2021", "WAD21", "COP20"],
"FY20": ["FY20", "2020", "WAD20", "FY17-20", "COP19"],
"FY19": ["FY19", "2019", "WAD19", "FY17-20", "COP18"],
"FY18": ["FY18", "2018", "WAD18", "FY16-18", "FY17-20", "COP17"],
"FY17": ["FY17", "2017", "WAD17", "FY16-18", "FY17-20", "COP16"],
"FY16": ["FY16", "2016", "WAD16", "FY16-18", "COP15"],
}
def display_resource_list_summaries(resource_list, summary_dict):
""" Outputs a summary of a resource list to stdout """
for (custom_attr_key, summary_dict_title) in summary_dict.items():
print(' Breakdown by %s:' % summary_dict_title)
for (key, count) in resource_list.summarize(custom_attr_key=custom_attr_key).items():
print(' %s: %s' % (key, count))
def display_input_metadata_summary(verbosity=1, input_periods=None, ref_indicator_concepts=None,
sorted_ref_indicator_codes=None, coc_concepts=None,
codelist_collections=None, de_concepts=None,
map_codelist_to_de_to_coc=None, datim_indicator_concepts=None,
ihub_dde_concepts=None, map_ref_indicator_to_de=None,
map_ref_indicator_to_ihub_dde=None,
map_ref_indicator_to_datim_indicator=None,
map_de_to_coc=None, map_ihub_dde_to_coc=None,
de_version_linkages=None, map_de_version_linkages=None,
map_dde_source_linkages=None,
ref_indicator_references=None, codelist_references=None):
""" Displays summary of the loaded metadata """
print('MSP Metadata Statistics %s\n' % datetime.datetime.now().strftime("%Y-%m-%d"))
print('METADATA SOURCES:')
# Input periods
print(' Input Periods:', input_periods)
# Reference Indicators
if ref_indicator_concepts and sorted_ref_indicator_codes:
print(' MER Reference Indicators (FY16-20):',)
print('%s unique reference indicator codes, %s total definitions' % (
len(ref_indicator_concepts), len(sorted_ref_indicator_codes)))
print(' Breakdown by Indicator Code:')
for ref_indicator_code in sorted(sorted_ref_indicator_codes):
print(' %s: ' % ref_indicator_code)
print(' Periods:', ', '.join(ref_indicator_concepts.get_resources(
core_attrs={'id': ref_indicator_code}).summarize(
custom_attr_key=ATTR_PERIOD).keys()))
ref_indicator_concept = ref_indicator_concepts.get_resource(
core_attrs={'type': 'Concept', 'id': ref_indicator_code})
if ref_indicator_concept:
if ref_indicator_concept['__url'] in map_ref_indicator_to_de:
print(' Mapped DATIM data elements: %s' % (
len(map_ref_indicator_to_de[ref_indicator_concept['__url']])))
if ref_indicator_concept['__url'] in map_ref_indicator_to_ihub_dde:
print(' Mapped iHUB derived data elements: %s' % (
len(map_ref_indicator_to_ihub_dde[ref_indicator_concept['__url']])))
if ref_indicator_concept['__url'] in map_ref_indicator_to_datim_indicator:
print(' Mapped DATIM indicators: %s' % (
len(map_ref_indicator_to_datim_indicator[ref_indicator_concept['__url']])))
print(' Breakdown by Period:')
for (period, count) in ref_indicator_concepts.summarize(
custom_attr_key=ATTR_PERIOD).items():
print(' %s: %s' % (period, count))
print(' Summary of Reference Indicator Mappings:')
print(' Mappings to DATIM Data Elements (DE): ',)
print('%s reference indicators with %s unique DE mappings' % (
get_dict_child_counts(map_ref_indicator_to_de)))
print(' Mappings to iHUB Derived Data Elements (DDE): ',)
print('%s reference indicators with %s unique DDE mappings' % (
get_dict_child_counts(map_ref_indicator_to_ihub_dde)))
print(' Mappings to DATIM Indicators: ',)
print('%s reference indicators with %s unique DATIM indicator mappings\n' % (
get_dict_child_counts(map_ref_indicator_to_datim_indicator)))
# Codelist collections
if codelist_collections:
print(' DATIM Code Lists (FY16-20):', len(codelist_collections))
print(' Breakdown by Period: (Note some codelists span multiple periods)')
for period in input_periods:
period_codelist_collections = ocldev.oclresourcelist.OclJsonResourceList()
for codelist in codelist_collections:
if period in codelist['extras'][ATTR_APPLICABLE_PERIODS]:
period_codelist_collections.append(codelist)
print(' %s Code Lists: %s' % (period, len(period_codelist_collections)))
if verbosity >= 2:
for result_target_type in ['Result', 'Target']:
filtered_codelists = period_codelist_collections.get_resources(
custom_attrs={ATTR_RESULT_TARGET: result_target_type})
if filtered_codelists:
print(' %s: %s' % (result_target_type, len(filtered_codelists)))
else:
print(' %s: None' % (result_target_type))
for codelist in filtered_codelists:
if codelist['external_id'] in map_codelist_to_de_to_coc:
print(' %s: %s data elements' % (
codelist['name'],
len(map_codelist_to_de_to_coc[codelist['external_id']])))
else:
print(' %s' % codelist['name'])
# DATIM data element concepts
if de_concepts:
de_concepts_summary_dict = {
ATTR_RESULT_TARGET: 'Result/Target',
ATTR_DOMAIN_TYPE: 'Domain Type',
ATTR_NUMERATOR_DENOMINATOR_TYPE: 'Numerator/Denominator',
ATTR_PEPFAR_SUPPORT_TYPE: 'PEPFAR Support Type',
ATTR_REPORTING_FREQUENCY: 'Reporting Frequency'
}
print(' DATIM Data Elements (All):', len(de_concepts))
print(' Breakdown by period (via codelists):')
for (period, count) in summarize_applicable_periods_from_concepts(de_concepts).items():
print(' %s: %s' % (period, count))
display_resource_list_summaries(de_concepts, de_concepts_summary_dict)
# iHUB derived data element concepts
if ihub_dde_concepts:
ihub_dde_concepts_summary_dict = {
ATTR_RESULT_TARGET: 'Result/Target',
'standardized_disaggregate': 'Standardized Disaggregate',
ATTR_NUMERATOR_DENOMINATOR_TYPE: 'Numerator/Denominator',
ATTR_PEPFAR_SUPPORT_TYPE: 'PEPFAR Support Type',
ATTR_REPORTING_FREQUENCY: 'Reporting Frequency'
}
print(' iHUB Derived Data Element (All):', len(ihub_dde_concepts))
print(' Breakdown by period (via derivation rules):')
for (period, count) in summarize_applicable_periods_from_concepts(
ihub_dde_concepts).items():
print(' %s: %s' % (period, count))
display_resource_list_summaries(ihub_dde_concepts, ihub_dde_concepts_summary_dict)
# Summary for DE version linkages
print('\nRESULTS OF GENERATING LINKAGES BETWEEN DATA ELEMENTS:')
print(' Data Element Version Links (DATIM and iHUB):')
print(' %s DEs replaced %s DEs' % get_dict_child_counts(map_de_version_linkages))
if verbosity >= 2:
for de_code in de_version_linkages:
print(' %s' % de_code)
for de_version in de_version_linkages[de_code]:
print(' %s: %s (%s)' % (
de_version['sort_order'], de_version['code'], de_version['url']))
# Summary for DE source-derivation linkages
print('\n iHUB Data Element Source-Derivation Linkages:')
print(' %s derived data elements linked to %s source data elements' % get_dict_child_counts(
map_dde_source_linkages))
print(' NOTE: Source-derivation linkages are defined between data elements only, not COCs')
# COC concepts
if coc_concepts:
print(' DATIM COC concepts (All):', len(coc_concepts))
if map_de_to_coc:
print(' %s DATIM data elements with %s unique COC maps' % get_dict_child_counts(
map_de_to_coc))
if map_ihub_dde_to_coc:
print(' %s iHUB DDEs with %s unique COC maps' % get_dict_child_counts(
map_ihub_dde_to_coc))
# DATIM indicator concepts
if datim_indicator_concepts:
datim_indicator_concepts_summary_dict = {
ATTR_RESULT_TARGET: 'Result/Target',
'annualized': 'Annualized',
'dimensionItemType': 'dimensionItemType',
}
print(' DATIM Indicators (All):', len(datim_indicator_concepts))
print(' Breakdown by period (via keywords in indicator names):')
for (period, count) in summarize_applicable_periods_from_concepts(
datim_indicator_concepts).items():
print(' %s: %s' % (period, count))
display_resource_list_summaries(
datim_indicator_concepts, datim_indicator_concepts_summary_dict)
# Display list of overlapping IDs between iHUB and DATIM data elements
if ihub_dde_concepts:
overlapping_de_concepts = {}
for dde_concept in ihub_dde_concepts:
de_concept = de_concepts.get_resource(core_attrs={'id': dde_concept['id']})
if de_concept:
overlapping_de_concepts[dde_concept['id']] = {
'ihub': dde_concept,
'datim': de_concept
}
if overlapping_de_concepts:
print(' Overlapping DATIM/iHUB Data Elements: %s' % len(overlapping_de_concepts))
for overlapping_de_concept_id in overlapping_de_concepts:
overlapping_concept = overlapping_de_concepts[overlapping_de_concept_id]
print(' [%s]\n DATIM: %s -- %s\n iHUB: %s -- %s' % (
overlapping_de_concept_id,
overlapping_concept['datim']['names'][0]['name'],
overlapping_concept['datim']['extras'].get(ATTR_APPLICABLE_PERIODS),
overlapping_concept['ihub']['names'][0]['name'],
overlapping_concept['ihub']['extras'].get(ATTR_APPLICABLE_PERIODS)))
def summarize_import_list(import_list):
""" Output a summary of the final import list """
print('\nSUMMARY OF FINAL IMPORT LIST:')
print(' Breakdown by resource type:')
for (key, count) in import_list.summarize(core_attr_key='type').items():
print(' %s: %s' % (key, count))
if key == 'Concept':
concepts = import_list.get_resources(core_attrs={'type': 'Concept'})
for (subresource_key, subresource_count) in concepts.summarize(
core_attr_key='concept_class').items():
print(' %s: %s' % (subresource_key, subresource_count))
if subresource_key == 'Data Element':
for (concept_key, value) in concepts.get_resources(core_attrs={'concept_class': 'Data Element'}).summarize(custom_attr_key='source').items():
print(' %s: %s' % (concept_key, value))
elif subresource_key == 'Reference Indicator':
for (concept_key, value) in concepts.get_resources(core_attrs={'concept_class': 'Reference Indicator'}).summarize(custom_attr_key='Period').items():
print(' %s: %s' % (concept_key, value))
elif key == 'Mapping':
for (subresource_key, subresource_count) in import_list.get_resources(core_attrs={'type': 'Mapping'}).summarize(core_attr_key='map_type').items():
print(' %s: %s' % (subresource_key, subresource_count))
elif key == 'Collection':
for collection in import_list.get_resources(core_attrs={'type': 'Collection'}):
print(' %s' % collection['id'])
def count_reference_expressions(references):
"""
Returns a count of the total number of expressions in the specified references.
Used by summary display methods.
"""
num_expressions = 0
for reference in references:
num_expressions += len(reference["data"]["expressions"])
return num_expressions
def get_dict_child_counts(dict_to_be_counted):
"""
Returns count of dict and count of all its children as a set.
Used by summary display methods.
"""
count_of_children = 0
for dict_key in dict_to_be_counted:
count_of_children += len(dict_to_be_counted[dict_key])
return len(dict_to_be_counted), count_of_children
def get_new_org_json(org_id=''):
""" Returns OCL-formatted JSON for the PEPFAR org """
return {
"website": "https://www.pepfar.gov/",
"name": "The United States President's Emergency Plan for AIDS Relief",
"public_access": "View",
"company": "US Government",
"type": "Organization",
"id": org_id,
"location": "Washington, DC, USA"
}
def get_primary_source(org_id, source_id, canonical_url):
""" Returns OCL-formatted JSON for the PEPFAR MER source """
return get_new_repo_json(
owner_id=org_id, repo_id=source_id, name="MER Source",
full_name="DATIM Monitoring, Evaluation & Results Metadata",
canonical_url="%s/CodeSystem/MER" % canonical_url)
def get_new_repo_json(owner_type='Organization', owner_id='', repo_type='Source', repo_id='',
name='', full_name='', repo_sub_type='Dictionary', default_locale='en',
public_access='View', supported_locales='en', canonical_url=''):
""" Returns OCL-formatted JSON for a source """
repo_json = {
"name": name,
"default_locale": default_locale,
"short_code": repo_id,
"%s_type" % repo_type.lower(): repo_sub_type,
"full_name": full_name,
"owner": owner_id,
"public_access": public_access,
"owner_type": owner_type,
"type": repo_type,
"id": repo_id,
"supported_locales": supported_locales
}
if canonical_url:
repo_json['canonical_url'] = canonical_url
return repo_json
def load_datim_data_elements(filename='', org_id='', source_id='',
sorted_ref_indicator_codes=None, codelist_collections=None,
ref_indicator_concepts=None):
"""
Load raw DHIS2-formatted DATIM data elements and return as OCL-formatted JSON resources.
Note that COCs and datasets are included as attributes of each data element.
"""
# Load raw DHIS2-formatted DATIM data elements
with open(filename, 'rb') as input_file:
raw_datim_de_all = json.load(input_file)
# Convert to OCL-formatted JSON
de_concepts = ocldev.oclresourcelist.OclJsonResourceList()
for de_raw in raw_datim_de_all['dataElements']:
de_concepts.append(build_concept_from_datim_de(
de_raw, org_id, source_id, sorted_ref_indicator_codes, codelist_collections,
ref_indicator_concepts))
return de_concepts
def load_datim_coc_concepts(filename='', org_id='', source_id=''):
""" Load and return DATIM categoryOptionCombos as OCL-formatted JSON concepts """
# Load COCs as raw DHIS2-formatted JSON
with open(filename, 'rb') as input_file:
raw_datim_cocs = json.load(input_file)
# Transform COCs to OCL-formatted JSON and return
coc_concepts = []
for coc_raw in raw_datim_cocs['categoryOptionCombos']:
coc_concepts.append(build_concept_from_datim_coc(coc_raw, org_id, source_id))
return ocldev.oclresourcelist.OclJsonResourceList(resources=coc_concepts)
def load_codelist_collections_with_exports_from_file(filename='', org_id=''):
"""
Load codelist collections with their exports from the specified filename.
This returns the same output as msp.load_codelist_collections and is designed to
be used in conjunction with save_codelists_to_file.py.
"""
with open(filename) as input_file:
resources = ocldev.oclresourcelist.OclJsonResourceList(json.load(input_file))
# Modify the owner
for resource in resources:
resource['owner'] = org_id
return resources
def load_codelist_collections(filename='', org_id='', canonical_url='', verbosity=0):
"""
Load and return codelist_collections as OCL-formatted JSON collections.
This method retrieves all of the full codelist from DATIM directly, which takes
a long time to process.
"""
# Load the codelist definitions into a resource list
csv_codelists = []
with open(filename) as ifile:
reader = csv.DictReader(ifile)
for row in reader:
# Skip rows that are not set to be imported
if not row['resource_type']:
continue
if verbosity:
print('Retrieving codelist: %s' % row['id'])
row['owner_id'] = org_id
dhis2_codelist_url = row.pop('ZenDesk: JSON Link')
dhis2_codelist_url += '&paging=false'
if verbosity:
print(' DHIS2 URL: %s' % dhis2_codelist_url)
print(' Canonical URL:', "%s/ValueSet/%s" % (canonical_url, row['id']))
row['attr:dhis2_codelist_url'] = dhis2_codelist_url
# Fetch the codelist from DHSI2
dhis2_codelist_response = requests.get(dhis2_codelist_url)
dhis2_codelist_response.raise_for_status()
row['attr:dhis2_codelist'] = dhis2_codelist_response.json()
csv_codelists.append(row)
codelist_csv_resource_list = ocldev.oclresourcelist.OclCsvResourceList(resources=csv_codelists)
codelist_json_resource_list = codelist_csv_resource_list.convert_to_ocl_formatted_json()
# Fields not supported in the CSV format get added here
for codelist in codelist_json_resource_list:
codelist['canonical_url'] = "%s/ValueSet/%s" % (canonical_url, codelist['id'])
return codelist_json_resource_list
def load_datim_indicators(filename='', org_id='', source_id='',
de_concepts=None, coc_concepts=None,
sorted_ref_indicator_codes=None, ref_indicator_concepts=None):
""" Load DHIS2-formatted DATIM indicators and return as OCL-formatted concepts """
# Load raw DHIS2-formatted DATIM indicators
with open(filename, 'rb') as input_file:
raw_datim_indicators = json.load(input_file)
# Transform indicators to OCL-formatted JSON resources
datim_indicator_concepts = ocldev.oclresourcelist.OclJsonResourceList()
for indicator_raw in raw_datim_indicators['indicators']:
datim_indicator_concepts.append(build_concept_from_datim_indicator(
indicator_raw, org_id=org_id, source_id=source_id,
de_concepts=de_concepts, coc_concepts=coc_concepts,
sorted_ref_indicator_codes=sorted_ref_indicator_codes,
ref_indicator_concepts=ref_indicator_concepts))
return datim_indicator_concepts
def load_ref_indicator_concepts(org_id='', source_id='', filenames=None):
""" Loads reference indicators from MER guidance as OCL-formatted JSON """
if not filenames:
return []
ref_indicator_concepts = []
for filename in filenames:
with open(filename) as ifile:
reader = csv.DictReader(ifile)
for row in reader:
row['owner_id'] = org_id
row['source'] = source_id
ref_indicator_concepts.append(row)
ref_indicator_csv_list = ocldev.oclresourcelist.OclCsvResourceList(
resources=ref_indicator_concepts)
ref_indicator_json_list = ref_indicator_csv_list.convert_to_ocl_formatted_json()
# Add throw-away attributes (only used for processing)
for ref_indicator in ref_indicator_json_list:
ref_indicator['__url'] = '/orgs/%s/sources/%s/concepts/%s/' % (
org_id, source_id, ref_indicator['id'])
return ref_indicator_json_list
def load_ihub_dde_concepts(filename='', num_run_sequences=3, org_id='',
source_id='', sorted_ref_indicator_codes=None,
ref_indicator_concepts=None,
ihub_rule_period_end_year=2020):
""" Load iHUB Derived Data Element extract and return as OCL-formatted JSON concepts """
# Load raw iHUB extract
ihub_raw = []
with open(filename) as input_csv_file:
reader = csv.DictReader(input_csv_file)
for row in reader:
# JP: Some iHUB exports contain extra unicode characters at the beginning of the
# file that python v2 doesn't handle well, so I'm removing them here
if '\xef\xbb\xbfindicator' in row:
row['indicator'] = row['\xef\xbb\xbfindicator']
elif '\ufeffindicator' in row:
row['indicator'] = row['\ufeffindicator']
ihub_raw.append(row)
# Transform to OCL-formatted concepts and return
dde_concept_dict = build_all_ihub_dde_concepts(
ihub_raw, num_run_sequences=num_run_sequences, org_id=org_id,
source_id=source_id, sorted_ref_indicator_codes=sorted_ref_indicator_codes,
ref_indicator_concepts=ref_indicator_concepts,
ihub_rule_period_end_year=ihub_rule_period_end_year)
return ocldev.oclresourcelist.OclJsonResourceList(list(dde_concept_dict.values()))
def get_ihub_dde_numerator_or_denominator(de_name):
"""
Returns 'Numerator' or 'Denominator', respectively, if 'N' or 'D' is
present in the data element modifiers (ie, in between parentheses).
"""
if '(N,' in de_name or '(N)' in de_name:
return 'Numerator'
elif '(D,' in de_name or '(D)' in de_name:
return 'Denominator'
return ''
def get_ihub_dde_support_type(de_name):
"""
Returns fully specified PEPFAR support type (eg 'Technical Assistance' or 'Direct Service
Delivery') based on the presence of one of the acronyms in a iHUB derived data element name.
"""
de_modifiers = get_data_element_name_modifiers(de_name)
for support_type_code in SUPPORT_TYPE_CODES:
if ', %s' % support_type_code in de_modifiers:
return SUPPORT_TYPE_CODES[support_type_code]
return ''
def get_data_element_support_type(de_code=''):
"""
Returns fully specified PEPFAR support type (eg 'Technical Assistance' or 'Direct Service
Delivery') based on the presence of one of the acronyms in a data element code.
"""
for support_type_code in SUPPORT_TYPE_CODES:
if '_%s_' % support_type_code in de_code:
return SUPPORT_TYPE_CODES[support_type_code]
return ''
def get_data_element_structured_dataset(de_code=''):
""" Returns the structured dataset key for a data element code """
if de_code.startswith('SIMS'):
return 'SIMS'
# TODO: Identify 'Other' or None structured dataset values
return 'MER'
def get_ihub_dde_version(de_name):
""" Returns data element version number if ' v#:' is present in the data element name. """
version_codes = ['v2', 'v3', 'v4', 'v5', 'v6', 'v7', 'v8', 'v9']
for version_code in version_codes:
if ' %s:' % version_code in de_name:
return version_code
return ''
def get_ihub_dde_name_without_version(de_name):
""" Return name of a derived data element with version information stripped """
de_version = get_ihub_dde_version(de_name=de_name)
if de_version:
return de_name.replace(' %s:' % de_version, ':')
return de_name
def get_data_element_numerator_or_denominator(de_code=''):
"""
Returns 'Numerator' or 'Denominator', respectively, if '_N_' or '_D_' is
present in the data element code.
"""
if '_N_' in de_code:
return 'Numerator'
elif '_D_' in de_code:
return 'Denominator'
return ''
def get_data_element_result_or_target(de_code=''):
""" Returns 'Target' if the text is in the data element code, otherwise 'Result' """
if 'target' in de_code.lower():
return 'Target'
return 'Result'
def lookup_reference_indicator_code(resource_name='', resource_code='',
resource_applicable_periods=None,
sorted_ref_indicator_codes=None, ref_indicator_concepts=None):
"""
Returns a reference indicator code that matches the resource code or name.
A ref indicator code is matched to the prefix of the resource name or code
(eg. "TX_CURR_N_DSD_Age_Sex" is a match for "TX_CURR") or embedded in the name surrounded by
whitespace (eg. "FY19 Results TX_ML Patient Died" is a match for "TX_ML"). If
"de_applicable_periods" is provided, a matching reference indicator must also be applicable
for at least on of the same periods. sorted_ref_indicator_codes must be a list of reference
indicator codes sorted by string length in descending order.
"""
for ref_indicator_code in sorted_ref_indicator_codes:
if (resource_code[:len(ref_indicator_code)] == ref_indicator_code or
resource_name[:len(ref_indicator_code)] == ref_indicator_code or
' %s ' % (ref_indicator_code) in resource_name):
if resource_applicable_periods:
for period in reversed(resource_applicable_periods):
ref_indicator_concept = ref_indicator_concepts.get_resource(
core_attrs={'id': ref_indicator_code}, custom_attrs={ATTR_PERIOD: period})
if ref_indicator_concept:
return ref_indicator_code
else:
return ref_indicator_code
return ''
def get_sorted_unique_indicator_codes(ref_indicator_concepts=None):
"""
Returns a list of unique sorted indicator codes given a list of
OCL-formatted reference indicators
"""
output = ref_indicator_concepts.summarize(core_attr_key='id').keys()
return sorted(output, reverse=True)
def get_data_element_version(de_code=''):
"""
Returns a data element version string (eg 'v2') if present. For example, a de_code of
'TX_CURR_AgeSex_v3' would return 'v3'.
"""
result = re.search('_([vV][0-9])$', de_code)
if result:
return result.group(1)
return None
def get_data_element_root(de_code=''):
"""
Returns a data element root string, ie with the version number removed. For example, a
de_code of 'TX_CURR_AgeSex_v3' would return 'TX_CURR_AgeSex' and a de_code of
'HTS_TST_AgeSex' would return 'HTS_TST_AgeSex'.
"""
de_version = get_data_element_version(de_code=de_code)
if de_version:
return de_code[:-len(de_version) - 1]
return de_code
def get_de_periods_from_codelist_collections(de_codelists, codelist_collections):
"""
Get a list of the periods present in a data element's codelists.
codelist_collections must be in the format of msp.load_codelist_collections or
msp.load_codelist_collections_with_exports_from_file.
de_codelists must be...
"""
periods = {}
for de_codelist in de_codelists:
for codelist_def in codelist_collections:
if de_codelist['id'] == codelist_def['external_id']:
for period in codelist_def['extras'][ATTR_APPLICABLE_PERIODS].split(', '):
periods[period] = True
break
return list(periods.keys())
def get_concepts_filtered_by_period(concepts=None, period=None):
"""
Returns a list of concepts filtered by ATTR_PERIOD or ATTR_APPLICABLE_PERIODS
custom attributes. Period filter may be a single period (eg 'FY18') or
a list of periods (eg ['FY18', 'FY19']). Works with ref_indicator_concepts, datim indicator
concepts and data elements for both DATIM and iHUB.
"""
# Get period filter into the right format
if isinstance(period, str):
period = [period]
elif isinstance(period, list) and all(isinstance(item, str) for item in period):
pass
else:
# Invalid period filter so just return an empty list
return []
# Setup the iterator
if isinstance(concepts, dict):
iterator_items = concepts.keys()
elif isinstance(concepts, list):
iterator_items = range(0, len(concepts))
else:
raise Exception('Invalid concepts. Expected dict or list')
# Filter the concepts in same order as filter_period
filtered_concepts = []
for filter_period in period:
for concept_key in iterator_items:
concept = concepts[concept_key]
concept_period = None
if ATTR_APPLICABLE_PERIODS in concept['extras']:
concept_period = concept['extras'][ATTR_APPLICABLE_PERIODS]
elif ATTR_PERIOD in concept['extras']:
concept_period = concept['extras'][ATTR_PERIOD]
if concept_period is None:
continue
elif isinstance(concept_period, list):
if filter_period in concept_period:
filtered_concepts.append(concept)
elif isinstance(concept_period, str):
if filter_period == concept_period:
filtered_concepts.append(concept)
else:
raise Exception('Invalid concept period. Expected list or string: %s <%s>' % (
concept_period, type(concept_period)))
return filtered_concepts
def get_filtered_cocs(de_concepts=None, map_de_to_coc=None, coc_concepts=None):
""" Returns list of COCs mapped to the list of data elements """
cocs = {}
for de_concept in de_concepts:
de_concept_key = '/orgs/%s/sources/%s/concepts/%s/' % (
de_concept['owner'], de_concept['source'], de_concept['id'])
if de_concept_key in map_de_to_coc:
for coc_concept_key in map_de_to_coc[de_concept_key]:
cocs[coc_concept_key] = coc_concepts[coc_concept_key]
return cocs
def get_filtered_codelist_collections(codelist_collections=None, period=None):
""" Returns list of code lists filtered by either a single period or a list of periods """
if isinstance(period, str):
period = [period]
elif isinstance(period, list) and all(isinstance(item, str) for item in period):
pass
else:
return []
filtered_codelist_collections = []
for codelist in codelist_collections:
if (period and ATTR_APPLICABLE_PERIODS in codelist['extras'] and
any(codelist_period.strip() in period for codelist_period in codelist[
'extras'][ATTR_APPLICABLE_PERIODS].split(','))):
filtered_codelist_collections.append(codelist)
elif not period:
filtered_codelist_collections.append(codelist)
return filtered_codelist_collections
def generate_mapping_id(id_format='MAP_%s_%s', from_concept_code='', to_concept_code='',
from_concept_url='', to_concept_url=''):
"""
Returns a custom mapping ID according to the specified id_format. id_format must
have two %s parameters for the from and to concept codes. Examples for id_format:
MAP_%s_%s
MAP_DE_COC_%s_%s
"""
if from_concept_url and to_concept_url:
from_concept_code = from_concept_url[from_concept_url[:-1].rfind('/') + 1:-1]
to_concept_code = to_concept_url[to_concept_url[:-1].rfind('/') + 1:-1]
return id_format % (from_concept_code, to_concept_code)
def build_ocl_mappings(map_dict=None, filtered_from_concepts=None,
owner_type='Organization', owner_id='',
source_id='', map_type='',
do_generate_mapping_id=False, id_format='MAP_%s_%s'):
"""
Returns a list of OCL-formatted mappings between from_concepts and to_concepts
defined in map_dict. If filtered_from_concepts is provided, then maps are
omitted if the from_concept is not in the filtered_from_concepts list. This
method is designed to work with mappings between ref_indicator_concepts and data elements,
and between data elements and COCs for DATIM and iHUB.
"""
output_mappings = []
for from_concept_url in map_dict:
if filtered_from_concepts and from_concept_url not in filtered_from_concepts:
continue
for to_concept_url in map_dict[from_concept_url]:
output_mapping = {
"type": "Mapping", 'owner': owner_id, 'owner_type': owner_type,
'source': source_id, 'map_type': map_type,
'from_concept_url': from_concept_url, 'to_concept_url': to_concept_url,
}
if do_generate_mapping_id:
output_mapping['id'] = generate_mapping_id(
id_format=id_format, from_concept_url=from_concept_url,
to_concept_url=to_concept_url)
output_mappings.append(output_mapping)
return output_mappings
def build_ref_indicator_references(ref_indicator_concepts, org_id=''):
"""
Return a dictionary with period as key and OCL-formatted reference as value representing the
set of reference indicators that are valid for each period. Eg:
{"FY18": {"type": "Reference", "owner": "PEPFAR", "owner_type": "Organization",
"collection": "MER_REFERENCE_INDICATORS_FY18",
"data": {"expressions": "/orgs/PEPFAR/sources/MER/concepts/HTS_TST/", ...}}}
"""
output_references_by_period = {}
ref_indicator_period_counts = ref_indicator_concepts.summarize(custom_attr_key=ATTR_PERIOD)
for period in ref_indicator_period_counts.keys():
expressions = [
ref_indicator_concept['__url'] for ref_indicator_concept in
ref_indicator_concepts.get_resources(custom_attrs={ATTR_PERIOD: period})]
output_references_by_period[period] = {
'type': ocldev.oclconstants.OclConstants.RESOURCE_TYPE_REFERENCE,
'owner': org_id,
'owner_type': ocldev.oclconstants.OclConstants.RESOURCE_TYPE_ORGANIZATION,
'collection': COLLECTION_NAME_MER_REFERENCE_INDICATORS % period,
'data': {'expressions': expressions}
}
return output_references_by_period
def build_fiscal_year_references(ref_indicator_concepts, datim_indicator_concepts, de_concepts,
ihub_dde_concepts, coc_concepts, map_ref_indicator_to_de,
map_ref_indicator_to_ihub_dde,
map_ref_indicator_to_datim_indicator,
map_de_to_coc, map_ihub_dde_to_coc,
org_id='', source_id=''):
"""
Return a dictionary with period as key and OCL-formatted reference as value representing
all resources that can be associated with that period. Includes everything but reference
indicators (i.e. date elements, DATIM indicators, and COCs). Reference indicators are
excluded because they are simply a copy of the MER_REFERENCE_INDICATOR_FY## collections
and they are processed at a different time than the remaining references defined here.
for each ref indicator in the period...
1. Cascade each Reference Indicator concept version to Indicator concepts using
Has DATIM Indicator mappings where target_concept.extras.Applicable+Periods=FY20
2. Cascade each Reference Indicator concept version to Data Element concepts using
Has Data Element mappings where target_concept.extras.Applicable+Periods=FY20
2a. Cascade each DE concept to Category Option Combo concepts using Has Option mappings
Example output:
{"FY18": {"type": "Reference", "owner": "PEPFAR", "owner_type": "Organization",
"collection": "MER_FY18",
"data": {"expressions": "/orgs/PEPFAR/sources/MER/concepts/XHBL1mOwLWb/", ...}}}
"""
output_references_by_period = {}
ref_indicator_period_counts = ref_indicator_concepts.summarize(custom_attr_key=ATTR_PERIOD)
for period in ref_indicator_period_counts.keys():
expressions = []
ref_indicator_concepts.get_resources(custom_attrs={ATTR_PERIOD: period})
for ref_indicator_concept in ref_indicator_concepts:
# datim indicators
ref_indicator_url = ref_indicator_concept['__url']
if ref_indicator_url in map_ref_indicator_to_datim_indicator:
for datim_indicator_url in map_ref_indicator_to_datim_indicator[ref_indicator_url]:
datim_indicator_concept = datim_indicator_concepts.get_resource_by_url(
datim_indicator_url)
if not datim_indicator_concept:
continue
if ('extras' in datim_indicator_concept and
ATTR_APPLICABLE_PERIODS in datim_indicator_concept['extras'] and
period in datim_indicator_concept["extras"][ATTR_APPLICABLE_PERIODS]):
# add the datim indicator
if datim_indicator_url not in expressions:
expressions.append(datim_indicator_url)
# add the mapping
mapping_id = generate_mapping_id(
from_concept_url=ref_indicator_url, to_concept_url=datim_indicator_url,
id_format=MSP_MAP_ID_FORMAT_REFIND_IND)
mapping_url = '/orgs/%s/sources/%s/mappings/%s/' % (
org_id, source_id, mapping_id)
if mapping_url not in expressions:
expressions.append(mapping_url)
# data elements
if ref_indicator_url in map_ref_indicator_to_de:
for de_url in map_ref_indicator_to_de[ref_indicator_url]:
de_concept = de_concepts.get_resource_by_url(de_url)
if not de_concept:
continue
if ('extras' in de_concept and
ATTR_APPLICABLE_PERIODS in de_concept['extras'] and
period in de_concept["extras"][ATTR_APPLICABLE_PERIODS]):
# add the data element
if de_url not in expressions:
expressions.append(de_url)
# add the mapping
mapping_id = generate_mapping_id(
from_concept_url=ref_indicator_url, to_concept_url=de_url,
id_format=MSP_MAP_ID_FORMAT_REFIND_DE)
mapping_url = '/orgs/%s/sources/%s/mappings/%s/' % (
org_id, source_id, mapping_id)
if mapping_url not in expressions:
expressions.append(mapping_url)
# cascade the COCs
if de_url in map_de_to_coc:
for coc_url in map_de_to_coc[de_url]:
# add the coc
if coc_url not in expressions:
expressions.append(coc_url)
# add the mapping
mapping_id = generate_mapping_id(
from_concept_url=de_url, to_concept_url=coc_url,
id_format=MSP_MAP_ID_FORMAT_DE_COC)
mapping_url = '/orgs/%s/sources/%s/mappings/%s/' % (
org_id, source_id, mapping_id)
if mapping_url not in expressions:
expressions.append(mapping_url)
# iHUB derived data elements
if ref_indicator_url in map_ref_indicator_to_ihub_dde:
for ihub_dde_url in map_ref_indicator_to_ihub_dde[ref_indicator_url]:
ihub_dde_concept = de_concepts.get_resource_by_url(ihub_dde_url)
if not ihub_dde_concept:
continue
if ('extras' in ihub_dde_concept and
ATTR_APPLICABLE_PERIODS in ihub_dde_concept['extras'] and
period in ihub_dde_concept["extras"][ATTR_APPLICABLE_PERIODS]):
# add the ihub derived data element
if ihub_dde_url not in expressions:
expressions.append(ihub_dde_url)
# add the mapping
mapping_id = generate_mapping_id(
from_concept_url=ref_indicator_url, to_concept_url=ihub_dde_url,
id_format=MSP_MAP_ID_FORMAT_REFIND_DE)
mapping_url = '/orgs/%s/sources/%s/mappings/%s/' % (
org_id, source_id, mapping_id)
if mapping_url not in expressions:
expressions.append(mapping_url)
# cascade the COCs
if ihub_dde_url in map_ihub_dde_to_coc:
for coc_url in map_ihub_dde_to_coc[ihub_dde_url]:
# add the coc
if coc_url not in expressions:
expressions.append(coc_url)
# add the mapping
mapping_id = generate_mapping_id(