Coverage for django_view_manager/utils/management/commands/makeviewmigration.py: 99%

265 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-21 15:45 +0000

1import contextlib 

2import decimal 

3import importlib 

4import io 

5import os 

6 

7from django.apps import apps 

8from django.core.management import BaseCommand 

9from django.core.management import call_command 

10from django.db import migrations 

11from django.db.transaction import atomic 

12 

13 

14VERSION = "2.0.0" 

15 

16 

17COPIED_SQL_VIEW_CONTENT = f"""/* 

18 This file was generated using django-view-manager {VERSION}. 

19 Modify the SQL for this view and then commit the changes. 

20 You can remove this comment before committing. 

21 

22 When you have changes to make to this sql, you need to run the makeviewmigration command 

23 before altering the sql, so the historical sql file is created with the correct contents. 

24*/ 

25""" 

26 

27INITIAL_SQL_VIEW_CONTENT = f"""/* 

28 This file was generated using django-view-manager {VERSION}. 

29 Add the SQL for this view and then commit the changes. 

30 You can remove this comment before committing. 

31 

32 When you have changes to make to this sql, you need to run the makeviewmigration command 

33 before altering the sql, so the historical sql file is created with the correct contents. 

34 

35 eg. 

36 DROP VIEW IF EXISTS {{view_name}}; 

37 CREATE VIEW 

38 {{view_name}} AS 

39 SELECT 

40 1 AS id, 

41 42 AS employee_id, 

42 'Kittens' AS name 

43 UNION 

44 2 AS id, 

45 314 AS employee_id, 

46 'Puppies' AS name 

47*/ 

48""" 

49 

50LATEST_VIEW_NAME = "latest" 

51LATEST_VIEW_NUMBER = decimal.Decimal("Infinity") 

52 

53# Add a comment right after the Django generated comment, to help find our modified migrations. 

54MIGRATION_MODIFIED_COMMENT = f"# Modified using django-view-manager {VERSION}. Please do not delete this comment.\n" 

55 

56 

57class Command(BaseCommand): 

58 help = ( 

59 "In the appropriate app, two files will get created. " 

60 "`sql/view-view_name-0000.sql` - contains the SQL for the view. " 

61 "`migrations/0000_view_name.py` - a migration that reads the appropriate files in the sql folder. " 

62 "If the `migrations` and `sql` folder do not exist, they will be created, along with the apps initial " 

63 "migration, and an empty migration for the view." 

64 ) 

65 

66 def get_model(self, db_table_name): 

67 matching_model = None 

68 for model in apps.get_models(include_auto_created=True, include_swapped=True): 

69 if getattr(model._meta, "db_table", "") == db_table_name: 

70 matching_model = model 

71 

72 return matching_model 

73 

74 def get_choices(self): 

75 return sorted( 

76 { 

77 x._meta.db_table 

78 for x in apps.get_models(include_auto_created=True, include_swapped=True) 

79 if getattr(x._meta, "managed", True) is False 

80 } 

81 ) 

82 

83 def add_arguments(self, parser): 

84 choices = self.get_choices() 

85 

86 parser.add_argument( 

87 "db_table_name", 

88 action="store", 

89 choices=choices, 

90 help='The view you want to modify".', 

91 ) 

92 

93 parser.add_argument( 

94 "migration_name", 

95 action="store", 

96 help="The name of the migration that will be created.", 

97 ) 

98 

99 def _call_command(self, *args): 

100 err = io.StringIO() 

101 out = io.StringIO() 

102 

103 # If we don't do this, sometimes we can't import a newly created migration. 

104 # Do it here, so we don't need to know which calls require it, and which don't. 

105 importlib.invalidate_caches() 

106 with contextlib.redirect_stdout(out), contextlib.redirect_stderr(err): 

107 call_command(*args) 

108 

109 # Did an error occur? 

110 if err.tell(): 

111 err.seek(0) 

112 self.stdout.write(self.style.ERROR(err.read())) 

113 return False 

114 

115 # Return the results. 

116 out.seek(0) 

117 return out.readlines() 

118 

119 def _create_migration_folder_and_initial_migration_if_needed(self, migrations_path, app_label): 

120 created_initial_migration = False 

121 if not os.path.exists(migrations_path): 

122 # No, create one with an __init__.py file. 

123 os.mkdir(migrations_path) 

124 with open(os.path.join(migrations_path, "__init__.py"), "w") as f: 

125 f.write("") 

126 self.stdout.write(self.style.SUCCESS(f"\nCreated 'migrations' folder in app '{app_label}'.")) 

127 

128 # Are there any migrations? 

129 results = self._call_command("showmigrations", app_label) 

130 if results is False: # Erred. 

131 return 

132 

133 results = map(str.strip, results) 

134 if "(no migrations)" in results: 

135 # Create the initial migration for the app. 

136 self.stdout.write(f"\nCreating initial migration for app '{app_label}'.") 

137 results = self._call_command("makemigrations", app_label, "--noinput") 

138 if not results: # Erred. 

139 return 

140 

141 for result in results: 

142 self.stdout.write(result) 

143 created_initial_migration = True 

144 

145 return created_initial_migration 

146 

147 def _create_sql_folder_if_needed(self, sql_path, app_label): 

148 created_sql_folder = False 

149 if not os.path.exists(sql_path): 

150 # No, create one. 

151 os.mkdir(sql_path) 

152 self.stdout.write(self.style.SUCCESS(f"\nCreated 'sql' folder in app '{app_label}'.")) 

153 created_sql_folder = True 

154 return created_sql_folder 

155 

156 @staticmethod 

157 def _parse_migration_number_from_show_migrations(line): 

158 # Find the first word in the line that only contains numbers. 

159 # The beginning words should either be '[X]', '[', or ']'. 

160 for word in line.replace(" ", "_").split("_"): 

161 if word.isdigit(): 

162 return word 

163 

164 @staticmethod 

165 def _is_migration_modified(db_table_name, migrations_path, migration_name, num): 

166 with open(os.path.join(migrations_path, f"{migration_name}.py"), encoding="utf-8") as f: 

167 # Did we modify this migration? Check the first 10 lines for our modified comment. 

168 found_modified_comment = False 

169 for migration_line_no, migration_line in enumerate(f.readlines()): 

170 if migration_line.find("Modified using django-view-manager") != -1: 

171 found_modified_comment = True 

172 

173 if not found_modified_comment and migration_line_no > 20: 

174 break 

175 

176 if ( 

177 migration_line.find(f"view-{db_table_name}-latest") != -1 

178 or migration_line.find(f"view-{db_table_name}-{num}") != -1 

179 ): 

180 return True 

181 

182 return False 

183 

184 def _get_migration_numbers_and_names(self, db_table_name, app_label, migrations_path, *, only_latest=False): 

185 show_migration_results = self._call_command("showmigrations", app_label) 

186 if not show_migration_results: # Erred. The reason will be printed to the console via the command. 

187 if only_latest: 

188 return None 

189 return None 

190 

191 # Parse the migration numbers that `showmigrations` returns 

192 # and get the migration numbers, or new migration number. 

193 migration_numbers_and_names = {} 

194 migration_name = None 

195 

196 for line in show_migration_results: 

197 num = self._parse_migration_number_from_show_migrations(line) 

198 

199 if num: 

200 migration_num = decimal.Decimal(num) 

201 migration_name = line.replace("[X]", "").replace("[-]", "").replace("[ ]", "").strip() 

202 if "squashed migrations)" in migration_name: 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true

203 migration_name = migration_name[: migration_name.find(" (")] 

204 

205 # When looking for the latest, we haven't modified the migration yet, so we can't match by comment. 

206 if not only_latest and self._is_migration_modified(db_table_name, migrations_path, migration_name, num): 

207 migration_numbers_and_names[migration_num] = migration_name 

208 

209 if only_latest: 

210 # If for some reason we can't find the latest migration. 

211 return migration_name 

212 

213 return migration_numbers_and_names 

214 

215 @staticmethod 

216 def _get_sql_numbers_and_names(sql_path, db_table_name): 

217 sql_numbers_and_names = {} 

218 

219 view_name_start = f"view-{db_table_name}-" 

220 view_name_end = ".sql" 

221 

222 for filename in os.listdir(sql_path): 

223 if filename.startswith(view_name_start) and filename.endswith(view_name_end): 

224 sql_file_num = filename.replace(view_name_start, "").replace(view_name_end, "") 

225 

226 sql_file_name = None 

227 if "-" in sql_file_num: 

228 sql_file_num, sql_file_name = sql_file_num.split("-") 

229 

230 # Convert the number to an int, so we can max them. 

231 if sql_file_num == LATEST_VIEW_NAME: 

232 sql_numbers_and_names[(LATEST_VIEW_NUMBER, sql_file_name)] = filename 

233 

234 elif sql_file_num.isdigit(): 

235 sql_numbers_and_names[(decimal.Decimal(sql_file_num), sql_file_name)] = filename 

236 

237 return sql_numbers_and_names 

238 

239 @staticmethod 

240 def _get_latest_migration_number_and_name(migration_numbers_and_names, sql_numbers_and_names): 

241 largest_migration_number = latest_sql_number = None 

242 

243 for migration_number, migration_name in migration_numbers_and_names.items(): 

244 largest_migration_number = (migration_number, migration_name) 

245 

246 for sql_number, sql_name in sql_numbers_and_names: 

247 if sql_number is LATEST_VIEW_NUMBER: 

248 latest_sql_number = (sql_number, sql_name) 

249 

250 if largest_migration_number and latest_sql_number is not None: 

251 return largest_migration_number 

252 

253 return None, None 

254 

255 def _create_empty_migration(self, app_label, migration_name, created_initial_migration, created_sql_folder): 

256 if created_initial_migration or created_sql_folder: 

257 self.stdout.write("\nCreating empty migration for the new SQL view.") 

258 else: 

259 self.stdout.write("\nCreating empty migration for the SQL changes.") 

260 

261 # Force the migration to have a RunSQL operation with text we can easily find/replace. 

262 migrations.Migration.operations = [ 

263 migrations.RunSQL("SELECT 'replace_forwards';", reverse_sql="SELECT 'replace_reverse';") 

264 ] 

265 results = self._call_command("makemigrations", app_label, "--empty", f"--name={migration_name}", "--noinput") 

266 migrations.Migration.operations = [] 

267 if not results: # Erred. 

268 return 

269 

270 for result in results: 

271 self.stdout.write(result) 

272 

273 return results 

274 

275 def _copy_latest_sql_view(self, sql_path, latest_sql_filename, historical_sql_filename): 

276 with open(os.path.join(sql_path, latest_sql_filename), "r+", encoding="utf-8") as f_in: 

277 content = f_in.read() 

278 with open(os.path.join(sql_path, historical_sql_filename), "w", encoding="utf-8") as f_out: 

279 f_out.write(content) 

280 

281 if "This file was generated using django-view-manager" not in content: 

282 f_in.seek(0) 

283 f_in.truncate() 

284 f_in.write(COPIED_SQL_VIEW_CONTENT) 

285 f_in.write(content) 

286 

287 self.stdout.write(self.style.SUCCESS(f"\nCreated historical SQL view file - '{historical_sql_filename}'.")) 

288 

289 def _create_latest_sql_file(self, sql_path, db_table_name): 

290 latest_sql_filename = f"view-{db_table_name}-{LATEST_VIEW_NAME}.sql" 

291 

292 with open(os.path.join(sql_path, latest_sql_filename), "w", encoding="utf-8") as f: 

293 f.write(INITIAL_SQL_VIEW_CONTENT.format(view_name=db_table_name)) 

294 

295 self.stdout.write(self.style.SUCCESS(f"\nCreated new SQL view file - '{latest_sql_filename}'.")) 

296 

297 def _find_and_rewrite_migrations_containing_latest( 

298 self, 

299 migration_numbers_and_names, 

300 migrations_path, 

301 latest_sql_filename, 

302 historical_sql_filename, 

303 ): 

304 for migration_name in migration_numbers_and_names.values(): 

305 with open(os.path.join(migrations_path, f"{migration_name}.py"), "r+", encoding="utf-8") as f: 

306 lines = f.readlines() 

307 modified_migration = False 

308 sql_line_no = 0 

309 for line_no, line in enumerate(lines): 

310 if line.find(latest_sql_filename) != -1: 

311 sql_line_no = line_no 

312 break 

313 

314 if sql_line_no: 

315 lines[sql_line_no] = lines[sql_line_no].replace(latest_sql_filename, historical_sql_filename) 

316 modified_migration = True 

317 

318 if modified_migration: 

319 self.stdout.write( 

320 self.style.SUCCESS( 

321 f"\nModified migration '{migration_name}' to read from '{historical_sql_filename}'." 

322 ) 

323 ) 

324 f.seek(0) 

325 f.truncate(0) 

326 f.writelines(lines) 

327 

328 def _rewrite_latest_migration(self, migrations_path, migration_name, latest_sql_filename, historical_sql_filename): 

329 with open(os.path.join(migrations_path, migration_name + ".py"), "r+", encoding="utf-8") as f: 

330 lines = f.readlines() 

331 generated_line_no = latest_sql_line_no = 0 

332 add_modified_message = False 

333 for line_no, line in enumerate(lines): 

334 if line.find("Generated by Django") != -1: 

335 # Should be the first line, but we shouldn't assume that. 

336 generated_line_no = line_no 

337 if lines[line_no + 1].find("Modified using django-view-manager") == -1: 

338 add_modified_message = True 

339 elif line.find(latest_sql_filename) != -1: 

340 latest_sql_line_no = line_no 

341 # We write lines starting from the bottom to the top, so our line numbers are correct through the process. 

342 lines[latest_sql_line_no] = lines[latest_sql_line_no].replace(latest_sql_filename, historical_sql_filename) 

343 if add_modified_message: 

344 # Insert the modified comment after the Django generated comment. 

345 lines[generated_line_no + 1 : generated_line_no + 1] = MIGRATION_MODIFIED_COMMENT 

346 f.seek(0) 

347 f.truncate(0) 

348 f.writelines(lines) 

349 

350 def _rewrite_migration( 

351 self, 

352 migrations_path, 

353 sql_path, 

354 db_table_name, 

355 migration_name, 

356 forward_sql_filename, 

357 *, 

358 reverse_sql_filename=None, 

359 ): 

360 with open(os.path.join(migrations_path, migration_name + ".py"), "r+", encoding="utf-8") as f: 

361 generated_line_no = imports_line_no = class_line_no = replace_forwards_line_no = replace_reverse_line_no = 0 

362 lines = f.readlines() 

363 for line_no, line in enumerate(lines): 

364 if line.find("Generated by Django") != -1: 

365 # Should be the first line, but we shouldn't assume that. 

366 generated_line_no = line_no 

367 elif line.find("import") != -1 and not imports_line_no: 

368 imports_line_no = line_no 

369 elif line.startswith("class Migration"): 

370 class_line_no = line_no 

371 elif line.find("replace_forwards") != -1: 

372 replace_forwards_line_no = line_no 

373 elif line.find("replace_reverse") != -1: 

374 replace_reverse_line_no = line_no 

375 # We write lines starting from the bottom to the top, so our line numbers are correct through the process. 

376 lines[replace_reverse_line_no] = lines[replace_reverse_line_no].replace( 

377 '''"SELECT 'replace_reverse';"''', 

378 "reverse_sql" if reverse_sql_filename else f'"DROP VIEW IF EXISTS {db_table_name};"', 

379 ) 

380 lines[replace_forwards_line_no] = lines[replace_forwards_line_no].replace( 

381 '''"SELECT 'replace_forwards';"''', "forwards_sql" 

382 ) 

383 lines[class_line_no - 1 : class_line_no] = [ 

384 f'\nsql_path = "{os.path.relpath(sql_path)}"\n', 

385 f'forward_sql_filename = "{forward_sql_filename}"\n', 

386 f'reverse_sql_filename = "{reverse_sql_filename}"\n' if reverse_sql_filename else "", 

387 "\n", 

388 "with open(os.path.join(sql_path, forward_sql_filename)) as f:\n", 

389 " forwards_sql = f.read()\n", 

390 "\n", 

391 "with open(os.path.join(sql_path, reverse_sql_filename)) as f:\n" if reverse_sql_filename else "", 

392 " reverse_sql = f.read()\n" if reverse_sql_filename else "", 

393 "\n" if reverse_sql_filename else "", 

394 ] 

395 lines[imports_line_no - 1 : imports_line_no] = [ 

396 "import os\n", 

397 "\n", 

398 ] 

399 # Insert the generated comment at the top. 

400 lines[generated_line_no + 1 : generated_line_no + 1] = [MIGRATION_MODIFIED_COMMENT] 

401 f.seek(0) 

402 f.writelines(lines) 

403 

404 def _get_historical_sql_filename( 

405 self, db_table_name, latest_migration_number, latest_migration_name, sql_numbers_and_names 

406 ): 

407 historical_sql_filename = f"view-{db_table_name}-{str(latest_migration_number).zfill(4)}.sql" 

408 # Do multiple migrations with the same number exist? 

409 # If so, we need to include the migration name in the sql view name. 

410 if historical_sql_filename in sql_numbers_and_names.values(): 

411 latest_migration_name = latest_migration_name.split("_", 1)[1] # Remove the migration number. 

412 historical_sql_filename = ( 

413 f"view-{db_table_name}-{str(latest_migration_number).zfill(4)}-{latest_migration_name}.sql" 

414 ) 

415 

416 return historical_sql_filename 

417 

418 @atomic 

419 def handle(self, *args, **options): 

420 # Get passed in args. 

421 db_table_name = options["db_table_name"] 

422 migration_name = options["migration_name"] 

423 

424 # Get paths we need. 

425 model = self.get_model(db_table_name) 

426 model_meta = model._meta 

427 app_config = model._meta.app_config 

428 app_label = model_meta.app_label 

429 path = app_config.path 

430 sql_path = os.path.join(path, "sql") 

431 migrations_path = os.path.join(path, "migrations") 

432 

433 # Does this app have a `migrations` folder, and if so, any migrations? 

434 created_initial_migration = self._create_migration_folder_and_initial_migration_if_needed( 

435 migrations_path, app_label 

436 ) 

437 if created_initial_migration is None: # Erred. 

438 return 

439 

440 # Does this app have an `sql` folder? 

441 created_sql_folder = self._create_sql_folder_if_needed(sql_path, app_label) 

442 

443 # Get the migration numbers and names. 

444 migration_numbers_and_names = self._get_migration_numbers_and_names(db_table_name, app_label, migrations_path) 

445 if migration_numbers_and_names is None: # Erred. 

446 return 

447 

448 # Get any existing migrations and sql view names and numbers. 

449 sql_numbers_and_names = self._get_sql_numbers_and_names(sql_path, db_table_name) 

450 

451 # Figure out if we have a `latest` sql file and which migration is associates to it. 

452 latest_migration_number, latest_migration_name = self._get_latest_migration_number_and_name( 

453 migration_numbers_and_names, sql_numbers_and_names 

454 ) 

455 

456 # Create the empty migration for the SQL view. 

457 results = self._create_empty_migration(app_label, migration_name, created_initial_migration, created_sql_folder) 

458 if results is None: # Erred 

459 return 

460 

461 # Get the new migration number and name. 

462 new_migration_name = self._get_migration_numbers_and_names( 

463 db_table_name, app_label, migrations_path, only_latest=True 

464 ) 

465 

466 if new_migration_name is None: 

467 raise RuntimeError("Unable to find the name and number of the newly created migration.") 

468 

469 # Is there a `latest` SQL view and migration? 

470 if latest_migration_number is not None and latest_migration_name is not None: 

471 latest_sql_filename = f"view-{db_table_name}-{LATEST_VIEW_NAME}.sql" 

472 historical_sql_filename = self._get_historical_sql_filename( 

473 db_table_name, latest_migration_number, latest_migration_name, sql_numbers_and_names 

474 ) 

475 

476 # Copy the `latest` SQL view to match the latest migration number. 

477 self._copy_latest_sql_view(sql_path, latest_sql_filename, historical_sql_filename) 

478 

479 # Update the historical migration to use the new historical sql view filename. 

480 self._rewrite_latest_migration( 

481 migrations_path, latest_migration_name, latest_sql_filename, historical_sql_filename 

482 ) 

483 self.stdout.write( 

484 self.style.SUCCESS( 

485 f"\nModified migration '{latest_migration_name}' to read from '{historical_sql_filename}'." 

486 ) 

487 ) 

488 

489 # Find any additional migrations which should be switched to use the historical sql view filename. 

490 self._find_and_rewrite_migrations_containing_latest( 

491 migration_numbers_and_names, 

492 migrations_path, 

493 latest_sql_filename, 

494 historical_sql_filename, 

495 ) 

496 

497 # Update the empty migration to use the `latest` sql view filename. 

498 self._rewrite_migration( 

499 migrations_path, 

500 sql_path, 

501 db_table_name, 

502 new_migration_name, 

503 forward_sql_filename=latest_sql_filename, 

504 reverse_sql_filename=historical_sql_filename, 

505 ) 

506 self.stdout.write( 

507 self.style.SUCCESS( 

508 f"\nModified migration '{new_migration_name}' to read from " 

509 f"'{latest_sql_filename}' and '{historical_sql_filename}'." 

510 ) 

511 ) 

512 

513 else: 

514 latest_sql_filename = f"view-{db_table_name}-{LATEST_VIEW_NAME}.sql" 

515 

516 # Create the `latest` SQL view. 

517 self._create_latest_sql_file(sql_path, db_table_name) 

518 

519 # Update the emtpy migration to use the `latest` sql view filename. 

520 self._rewrite_migration( 

521 migrations_path, 

522 sql_path, 

523 db_table_name, 

524 new_migration_name, 

525 forward_sql_filename=latest_sql_filename, 

526 ) 

527 self.stdout.write( 

528 self.style.SUCCESS(f"\nModified migration '{new_migration_name}' to read from '{latest_sql_filename}'.") 

529 ) 

530 

531 self.stdout.write(f"\nDone - You can now edit '{latest_sql_filename}'.\n\n")