Skip to content

Commit c5ecca7

Browse files
fix: recoverdata support load disk table (#3888)
1 parent 7269141 commit c5ecca7

6 files changed

Lines changed: 111 additions & 54 deletions

File tree

docs/en/maintain/cli.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ $ ./openmldb --endpoint=172.27.2.52:9520 --role=client
401401

402402
### loadtable
403403

404-
1. Load an existing table
404+
Load an existing table, only support memory table
405405

406406
Command format: `loadtable table_name tid pid ttl segment_cnt`
407407

docs/zh/maintain/cli.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ $ ./openmldb --endpoint=172.27.2.52:9520 --role=client
395395

396396
### loadtable
397397

398-
1、加载已有表
398+
加载已有表,只支持内存表
399399

400400
命令格式: loadtable table\_name tid pid ttl segment\_cnt
401401

src/cmd/openmldb.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3260,7 +3260,6 @@ void HandleClientLoadTable(const std::vector<std::string> parts, ::openmldb::cli
32603260
return;
32613261
}
32623262
}
3263-
// TODO(): get status msg
32643263
auto st = client->LoadTable(parts[1], boost::lexical_cast<uint32_t>(parts[2]),
32653264
boost::lexical_cast<uint32_t>(parts[3]), ttl, is_leader, seg_cnt);
32663265
if (st.OK()) {

src/tablet/tablet_impl.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3039,6 +3039,7 @@ void TabletImpl::LoadTable(RpcController* controller, const ::openmldb::api::Loa
30393039
break;
30403040
}
30413041
std::string root_path;
3042+
// we can't know table is memory or disk, so set the right storage_mode in request message
30423043
bool ok = ChooseDBRootPath(tid, pid, table_meta.storage_mode(), root_path);
30433044
if (!ok) {
30443045
response->set_code(::openmldb::base::ReturnCode::kFailToGetDbRootPath);

tools/openmldb_ops.py

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -97,41 +97,43 @@ def CheckTable(executor, db, table_name):
9797
return Status(-1, "role is not match")
9898
return Status()
9999

100-
def RecoverPartition(executor, db, partitions, endpoint_status):
100+
def RecoverPartition(executor, db, replicas, endpoint_status, storage):
101+
"""recover all replicas of one partition"""
101102
leader_pos = -1
102103
max_offset = 0
103-
table_name = partitions[0].GetName()
104-
pid = partitions[0].GetPid()
105-
for pos in range(len(partitions)):
106-
partition = partitions[pos]
107-
if partition.IsLeader() and partition.GetOffset() >= max_offset:
104+
table_name = replicas[0].GetName()
105+
pid = replicas[0].GetPid()
106+
tid = replicas[0].GetTid()
107+
for pos in range(len(replicas)):
108+
replica = replicas[pos]
109+
if replica.IsLeader() and replica.GetOffset() >= max_offset:
108110
leader_pos = pos
109111
if leader_pos < 0:
110-
log.error("cannot find leader partition. db {db} name {table_name} partition {pid}".format(
111-
db=db, table_name=table_name, pid=pid))
112-
return Status(-1, "recover partition failed")
113-
tid = partitions[0].GetTid()
114-
leader_endpoint = partitions[leader_pos].GetEndpoint()
112+
msg = "cannot find leader replica. db {db} name {table_name} partition {pid}".format(
113+
db=db, table_name=table_name, pid=pid)
114+
log.error(msg)
115+
return Status(-1, "recover partition failed: {msg}".format(msg=msg))
116+
leader_endpoint = replicas[leader_pos].GetEndpoint()
115117
# recover leader
116118
if "{tid}_{pid}".format(tid=tid, pid=pid) not in endpoint_status[leader_endpoint]:
117-
log.info("leader partition is not in tablet, db {db} name {table_name} pid {pid} endpoint {leader_endpoint}. start loading data...".format(
119+
log.info("leader replica is not in tablet, db {db} name {table_name} pid {pid} endpoint {leader_endpoint}. start loading data...".format(
118120
db=db, table_name=table_name, pid=pid, leader_endpoint=leader_endpoint))
119-
status = executor.LoadTable(leader_endpoint, table_name, tid, pid)
121+
status = executor.LoadTableHTTP(leader_endpoint, table_name, tid, pid, storage)
120122
if not status.OK():
121123
log.error("load table failed. db {db} name {table_name} tid {tid} pid {pid} endpoint {leader_endpoint} msg {status}".format(
122124
db=db, table_name=table_name, tid=tid, pid=pid, leader_endpoint=leader_endpoint, status=status.GetMsg()))
123-
return Status(-1, "recover partition failed")
124-
if not partitions[leader_pos].IsAlive():
125+
return status
126+
if not replicas[leader_pos].IsAlive():
125127
status = executor.UpdateTableAlive(db, table_name, pid, leader_endpoint, "yes")
126128
if not status.OK():
127129
log.error("update leader alive failed. db {db} name {table_name} pid {pid} endpoint {leader_endpoint}".format(
128130
db=db, table_name=table_name, pid=pid, leader_endpoint=leader_endpoint))
129131
return Status(-1, "recover partition failed")
130132
# recover follower
131-
for pos in range(len(partitions)):
133+
for pos in range(len(replicas)):
132134
if pos == leader_pos:
133135
continue
134-
partition = partitions[pos]
136+
partition = replicas[pos]
135137
endpoint = partition.GetEndpoint()
136138
if partition.IsAlive():
137139
status = executor.UpdateTableAlive(db, table_name, pid, endpoint, "no")
@@ -149,24 +151,31 @@ def RecoverTable(executor, db, table_name):
149151
log.info("{table_name} in {db} is healthy".format(table_name=table_name, db=db))
150152
return Status()
151153
log.info("recover {table_name} in {db}".format(table_name=table_name, db=db))
152-
status, table_info = executor.GetTableInfo(db, table_name)
154+
status, table_info = executor.GetTableInfoHTTP(db, table_name)
153155
if not status.OK():
154-
log.warning("get table info failed. msg is {msg}".format(msg=status.GetMsg()))
155-
return Status(-1, "get table info failed. msg is {msg}".format(msg=status.GetMsg()))
156-
partition_dict = executor.ParseTableInfo(table_info)
156+
log.warning("get table info failed. msg is {msg}".format(msg=status))
157+
return Status(-1, "get table info failed. msg is {msg}".format(msg=status))
158+
if len(table_info) != 1:
159+
log.warning("table info should be 1, {table_info}".format(table_info=table_info))
160+
return Status(-1, "table info should be 1")
161+
table_info = table_info[0]
162+
partition_dict = executor.ParseTableInfoJson(table_info)
163+
storage = "kMemory" if "storage_mode" not in table_info else table_info["storage_mode"]
157164
endpoints = set()
158-
for record in table_info:
159-
endpoints.add(record[3])
165+
for _, reps in partition_dict.items():
166+
# list of replicas
167+
for rep in reps:
168+
endpoints.add(rep.GetEndpoint())
160169
endpoint_status = {}
161170
for endpoint in endpoints:
162171
status, result = executor.GetTableStatus(endpoint)
163172
if not status.OK():
164173
log.warning("get table status failed. msg is {msg}".format(msg=status.GetMsg()))
165174
return Status(-1, "get table status failed. msg is {msg}".format(msg=status.GetMsg()))
166175
endpoint_status[endpoint] = result
167-
max_pid = int(table_info[-1][2])
168-
for pid in range(max_pid + 1):
169-
RecoverPartition(executor, db, partition_dict[str(pid)], endpoint_status)
176+
177+
for _, part in partition_dict.items():
178+
RecoverPartition(executor, db, part, endpoint_status, storage)
170179
# wait op
171180
time.sleep(1)
172181
while True:

tools/tool.py

Lines changed: 73 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,15 @@
1616
import subprocess
1717
import sys
1818
import time
19+
# http lib for python2 or 3
20+
import json
21+
try:
22+
import httplib
23+
import urllib
24+
except ImportError:
25+
import http.client as httplib
26+
import urllib.parse as urllib
27+
1928
# for Python 2, don't use f-string
2029
log = logging.getLogger(__name__)
2130
logging.basicConfig(level=logging.INFO, format = '%(levelname)s: %(message)s')
@@ -35,6 +44,9 @@ def GetMsg(self):
3544
def GetCode(self):
3645
return self.code
3746

47+
def __str__(self):
48+
return "code: {code}, msg: {msg}".format(code = self.code, msg = self.msg)
49+
3850
class Partition:
3951
def __init__(self, name, tid, pid, endpoint, is_leader, is_alive, offset):
4052
self.name = name
@@ -202,17 +214,48 @@ def GetTableInfo(self, database, table_name = ''):
202214
continue
203215
result.append(record)
204216
return Status(), result
217+
def GetTableInfoHTTP(self, database, table_name = ''):
218+
"""http post ShowTable to ns leader, return one or all table info"""
219+
ns = self.endpoint_map[self.ns_leader]
220+
conn = httplib.HTTPConnection(ns)
221+
param = {"db": database, "name": table_name}
222+
headers = {"Content-type": "application/json"}
223+
conn.request("POST", "/NameServer/ShowTable", json.dumps(param), headers)
224+
response = conn.getresponse()
225+
if response.status != 200:
226+
return Status(response.status, response.reason), None
227+
result = json.loads(response.read())
228+
conn.close()
229+
# check resp
230+
if result["code"] != 0:
231+
return Status(result["code"], "get table info failed: {msg}".format(msg=result["msg"]))
232+
return Status(), result["table_info"]
205233

206234
def ParseTableInfo(self, table_info):
207235
result = {}
208236
for record in table_info:
209237
is_leader = True if record[4] == "leader" else False
210238
is_alive = True if record[5] == "yes" else False
211-
partition = Partition(record[0], record[1], record[2], record[3], is_leader, is_alive, record[6]);
239+
partition = Partition(record[0], record[1], record[2], record[3], is_leader, is_alive, record[6])
212240
result.setdefault(record[2], [])
213241
result[record[2]].append(partition)
214242
return result
215243

244+
def ParseTableInfoJson(self, table_info):
245+
"""parse one table's partition info from json"""
246+
result = {}
247+
parts = table_info["table_partition"]
248+
for partition in parts:
249+
# one partition(one leader and others)
250+
for replica in partition["partition_meta"]:
251+
is_leader = replica["is_leader"]
252+
is_alive = True if "is_alive" not in replica else replica["is_alive"]
253+
# the classname should be replica, but use partition for compatible
254+
pinfo = Partition(table_info["name"], table_info["tid"], partition["pid"], replica["endpoint"], is_leader, is_alive, replica["offset"])
255+
result.setdefault(partition["pid"], [])
256+
result[partition["pid"]].append(pinfo)
257+
return result
258+
216259
def GetTablePartition(self, database, table_name):
217260
status, result = self.GetTableInfo(database, table_name)
218261
if not status.OK:
@@ -274,30 +317,35 @@ def ShowTableStatus(self, pattern = '%'):
274317

275318
return Status(), output_processed
276319

277-
def LoadTable(self, endpoint, name, tid, pid, sync = True):
278-
cmd = list(self.tablet_base_cmd)
279-
cmd.append("--endpoint=" + self.endpoint_map[endpoint])
280-
cmd.append("--cmd=loadtable {} {} {} 0 8".format(name, tid, pid))
281-
log.info("run {cmd}".format(cmd = cmd))
282-
status, output = self.RunWithRetuncode(cmd)
283-
time.sleep(1)
284-
if status.OK() and output.find("LoadTable ok") != -1:
285-
if not sync:
286-
return Status()
287-
while True:
288-
status, result = self.GetTableStatus(endpoint, tid, pid)
289-
key = "{}_{}".format(tid, pid)
290-
if status.OK() and key in result:
291-
table_stat = result[key][4]
292-
if table_stat == "kTableNormal":
293-
return Status()
294-
elif table_stat == "kTableLoading" or table_stat == "kTableUndefined":
295-
log.info("table is loading... tid {tid} pid {pid}".format(tid = tid, pid = pid))
296-
else:
297-
return Status(-1, "table stat is {table_stat}".format(table_stat = table_stat))
298-
time.sleep(2)
299-
300-
return Status(-1, "execute load table failed, status {msg}, output {output}".format(msg = status.GetMsg(), output = output))
320+
def LoadTableHTTP(self, endpoint, name, tid, pid, storage):
321+
"""http post LoadTable to tablet, support all storage mode"""
322+
conn = httplib.HTTPConnection(endpoint)
323+
# ttl won't effect, set to 0, and seg cnt is always 8
324+
# and no matter if leader
325+
param = {"table_meta": {"name": name, "tid": tid, "pid": pid, "ttl":0, "seg_cnt":8, "storage_mode": storage}}
326+
headers = {"Content-type": "application/json"}
327+
conn.request("POST", "/TabletServer/LoadTable", json.dumps(param), headers)
328+
response = conn.getresponse()
329+
if response.status != 200:
330+
return Status(response.status, response.reason)
331+
result = response.read()
332+
conn.close()
333+
resp = json.loads(result)
334+
if resp["code"] != 0:
335+
return Status(resp["code"], resp["msg"])
336+
# wait for success TODO(hw): refactor
337+
while True:
338+
status, result = self.GetTableStatus(endpoint, str(tid), str(pid))
339+
key = "{}_{}".format(tid, pid)
340+
if status.OK() and key in result:
341+
table_stat = result[key][4]
342+
if table_stat == "kTableNormal":
343+
return Status()
344+
elif table_stat == "kTableLoading" or table_stat == "kTableUndefined":
345+
log.info("table is loading... tid {tid} pid {pid}".format(tid = tid, pid = pid))
346+
else:
347+
return Status(-1, "table stat is {table_stat}".format(table_stat = table_stat))
348+
time.sleep(2)
301349

302350
def GetLeaderFollowerOffset(self, endpoint, tid, pid):
303351
cmd = list(self.tablet_base_cmd)

0 commit comments

Comments
 (0)