Skip to content

Commit 98c4a91

Browse files
committed
tools: Hash defrag script
Signed-off-by: Vladislav Oleshko <[email protected]>
1 parent afb3928 commit 98c4a91

File tree

1 file changed

+104
-0
lines changed

1 file changed

+104
-0
lines changed

tools/defrag_db.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import redis.asyncio as aioredis
2+
import argparse
3+
import asyncio
4+
5+
"""
6+
This script iterates over all keys and "recycles" them.
7+
Recycling is done by DUMPing the key first and then re-creating it with EXPIRE.
8+
This will trigger re-allocation of internal data structures in order to reduce
9+
memory fragmentation.
10+
"""
11+
12+
SCRIPT = """
13+
local recycled = 0
14+
for _, key in ipairs(KEYS) do
15+
local ttl = redis.call('PTTL', key)
16+
local dumpedData = redis.call('DUMP', key)
17+
18+
if dumpedData then
19+
redis.call('RESTORE', key, 0, dumpedData, 'REPLACE')
20+
if ttl > 0 then
21+
redis.call('PEXPIRE', key, ttl)
22+
end
23+
recycled = recycled + 1
24+
end
25+
end
26+
return recycled
27+
"""
28+
29+
total_recycled = 0
30+
31+
32+
async def workerfn(client_supplier, sha, queue):
33+
global total_recycled
34+
35+
r = client_supplier()
36+
while True:
37+
keys = await queue.get()
38+
39+
try:
40+
recycled = await r.evalsha(sha, len(keys), *keys)
41+
except Exception as e:
42+
raise SystemExit(e)
43+
44+
if isinstance(recycled, int):
45+
total_recycled += recycled
46+
else:
47+
print("Error recycling", recycled)
48+
49+
queue.task_done()
50+
51+
52+
async def infofn():
53+
while True:
54+
await asyncio.sleep(0.5)
55+
print("Keys processed:", total_recycled)
56+
57+
58+
async def main(client_supplier, scan_type, num_workers, queue_size, batch_size):
59+
r = client_supplier()
60+
sha = await r.script_load(SCRIPT)
61+
queue = asyncio.Queue(maxsize=queue_size)
62+
63+
workers = [
64+
asyncio.create_task(workerfn(client_supplier, sha, queue)) for _ in range(num_workers)
65+
]
66+
info_worker = asyncio.create_task(infofn())
67+
68+
keys = []
69+
async for key in r.scan_iter("*", count=batch_size * 2, _type=scan_type):
70+
keys.append(key)
71+
if len(keys) >= batch_size:
72+
await queue.put(keys)
73+
keys = []
74+
75+
await queue.put(keys)
76+
await queue.join()
77+
78+
info_worker.cancel()
79+
for w in workers:
80+
w.cancel()
81+
82+
await asyncio.gather(*workers, info_worker, return_exceptions=True)
83+
print("Recycled in total:", total_recycled)
84+
85+
86+
arg_parser = argparse.ArgumentParser()
87+
arg_parser.add_argument("--workers", type=int, default=8)
88+
arg_parser.add_argument("--batch", type=int, default=20)
89+
90+
arg_parser.add_argument(
91+
"--type", type=str, default=None, help="Process keys only of specified type"
92+
)
93+
94+
arg_parser.add_argument("--db", type=int)
95+
arg_parser.add_argument("--port", type=int, default=6379)
96+
arg_parser.add_argument("--host", type=str, default="localhost")
97+
args = arg_parser.parse_args()
98+
99+
100+
def client_supplier():
101+
return aioredis.StrictRedis(db=args.db, port=args.port, host=args.host)
102+
103+
104+
asyncio.run(main(client_supplier, args.type, args.workers, args.workers * 2, args.batch))

0 commit comments

Comments
 (0)