forked from balena-labs-projects/dashboard
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathget_schema.py
More file actions
47 lines (39 loc) · 1.4 KB
/
get_schema.py
File metadata and controls
47 lines (39 loc) · 1.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import time
import urllib.request
import urllib.error
import json
class GetSchema():
influx_url = "influxdb:8086"
influx_db_name = "balena"
def __init__(self):
while self.check_api() == False:
print('Waiting for Influx API')
time.sleep(5)
# Check the InfluxDB API is available
# This is done by checking for a 204 response code on the /ping endpoint
# Returns boolean true/false
def check_api(self):
req = urllib.request.Request('http://' + self.influx_url + '/ping')
try:
res = urllib.request.urlopen(req, timeout=5)
if res.getcode() == 204:
return True
except (urllib.error.HTTPError, urllib.error.URLError):
pass
return False
# Request the schema we need from InfluxDB and return the JSON
def get_influx_schema(self):
values = {
'db' : self.influx_db_name,
'q' : 'show field keys'
}
data = urllib.parse.urlencode(values)
req = urllib.request.Request('http://' + self.influx_url + '/query?' + data)
try:
res = urllib.request.urlopen(req, timeout=5).read()
data = json.loads(res.decode())
return data['results'][0]['series']
except (urllib.error.HTTPError, urllib.error.URLError, KeyError):
# TODO handle error
pass
return False