Skip to content

Commit 18f3325

Browse files
committed
tools: Hash defrag script
Signed-off-by: Vladislav Oleshko <[email protected]>
1 parent afb3928 commit 18f3325

File tree

1 file changed

+100
-0
lines changed

1 file changed

+100
-0
lines changed

tools/defrag_all_hashes.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import redis.asyncio as aioredis
2+
import argparse
3+
import asyncio
4+
5+
"""
6+
This script iterates over all hash keys and "recycles" them.
7+
Recycling is done by DUMPing the hash first and then re-creating it with EXPIRE.
8+
This will trigger re-allocation of internal hash data structures in order to reduce
9+
memory fragmentation.
10+
"""
11+
12+
SCRIPT = """
13+
local recycled = 0
14+
for _, key in ipairs(KEYS) do
15+
local ttl = redis.call('PTTL', key)
16+
local dumpedData = redis.call('DUMP', key)
17+
18+
if dumpedData then
19+
redis.call('RESTORE', key, 0, dumpedData, 'REPLACE')
20+
if ttl > 0 then
21+
redis.call('PEXPIRE', key, ttl)
22+
end
23+
recycled = recycled + 1
24+
end
25+
end
26+
return recycled
27+
"""
28+
29+
total_recycled = 0
30+
31+
32+
async def workerfn(client_supplier, sha, queue):
33+
global total_recycled
34+
35+
r = client_supplier()
36+
while True:
37+
keys = await queue.get()
38+
39+
try:
40+
recycled = await r.evalsha(sha, len(keys), *keys)
41+
except Exception as e:
42+
raise SystemExit(e)
43+
44+
if isinstance(recycled, int):
45+
total_recycled += recycled
46+
else:
47+
print("Error recycling", recycled)
48+
49+
queue.task_done()
50+
51+
52+
async def infofn():
53+
while True:
54+
await asyncio.sleep(0.5)
55+
print("Keys processed:", total_recycled)
56+
57+
58+
async def main(client_supplier, num_workers, queue_size, batch_size):
59+
r = client_supplier()
60+
sha = await r.script_load(SCRIPT)
61+
queue = asyncio.Queue(maxsize=queue_size)
62+
63+
workers = [
64+
asyncio.create_task(workerfn(client_supplier, sha, queue)) for _ in range(num_workers)
65+
]
66+
info_worker = asyncio.create_task(infofn())
67+
68+
keys = []
69+
async for key in r.scan_iter("*", count=batch_size * 2, type="hash"):
70+
keys.append(key)
71+
if len(keys) >= batch_size:
72+
await queue.put(keys)
73+
keys = []
74+
75+
await queue.put(keys)
76+
await queue.join()
77+
78+
info_worker.cancel()
79+
for w in workers:
80+
w.cancel()
81+
82+
await asyncio.gather(*workers, info_worker, return_exceptions=True)
83+
print("Recycled in total:", total_recycled)
84+
85+
86+
arg_parser = argparse.ArgumentParser()
87+
arg_parser.add_argument("--workers", type=int, default=8)
88+
arg_parser.add_argument("--batch", type=int, default=20)
89+
90+
arg_parser.add_argument("--db", type=int)
91+
arg_parser.add_argument("--port", type=int, default=6379)
92+
arg_parser.add_argument("--host", type=str, default="localhost")
93+
args = arg_parser.parse_args()
94+
95+
96+
def client_supplier():
97+
return aioredis.StrictRedis(db=args.db, port=args.port, host=args.host)
98+
99+
100+
asyncio.run(main(client_supplier, args.workers, args.workers * 2, args.batch))

0 commit comments

Comments
 (0)