Coverage for django_view_manager/utils/management/commands/makeviewmigration.py: 99%
265 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-21 15:44 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-21 15:44 +0000
1import contextlib
2import decimal
3import importlib
4import io
5import os
7from django.apps import apps
8from django.core.management import BaseCommand
9from django.core.management import call_command
10from django.db import migrations
11from django.db.transaction import atomic
14VERSION = "2.0.0"
17COPIED_SQL_VIEW_CONTENT = f"""/*
18 This file was generated using django-view-manager {VERSION}.
19 Modify the SQL for this view and then commit the changes.
20 You can remove this comment before committing.
22 When you have changes to make to this sql, you need to run the makeviewmigration command
23 before altering the sql, so the historical sql file is created with the correct contents.
24*/
25"""
27INITIAL_SQL_VIEW_CONTENT = f"""/*
28 This file was generated using django-view-manager {VERSION}.
29 Add the SQL for this view and then commit the changes.
30 You can remove this comment before committing.
32 When you have changes to make to this sql, you need to run the makeviewmigration command
33 before altering the sql, so the historical sql file is created with the correct contents.
35 eg.
36 DROP VIEW IF EXISTS {{view_name}};
37 CREATE VIEW
38 {{view_name}} AS
39 SELECT
40 1 AS id,
41 42 AS employee_id,
42 'Kittens' AS name
43 UNION
44 2 AS id,
45 314 AS employee_id,
46 'Puppies' AS name
47*/
48"""
50LATEST_VIEW_NAME = "latest"
51LATEST_VIEW_NUMBER = decimal.Decimal("Infinity")
53# Add a comment right after the Django generated comment, to help find our modified migrations.
54MIGRATION_MODIFIED_COMMENT = f"# Modified using django-view-manager {VERSION}. Please do not delete this comment.\n"
57class Command(BaseCommand):
58 help = (
59 "In the appropriate app, two files will get created. "
60 "`sql/view-view_name-0000.sql` - contains the SQL for the view. "
61 "`migrations/0000_view_name.py` - a migration that reads the appropriate files in the sql folder. "
62 "If the `migrations` and `sql` folder do not exist, they will be created, along with the apps initial "
63 "migration, and an empty migration for the view."
64 )
66 def get_model(self, db_table_name):
67 matching_model = None
68 for model in apps.get_models(include_auto_created=True, include_swapped=True):
69 if getattr(model._meta, "db_table", "") == db_table_name:
70 matching_model = model
72 return matching_model
74 def get_choices(self):
75 return sorted(
76 {
77 x._meta.db_table
78 for x in apps.get_models(include_auto_created=True, include_swapped=True)
79 if getattr(x._meta, "managed", True) is False
80 }
81 )
83 def add_arguments(self, parser):
84 choices = self.get_choices()
86 parser.add_argument(
87 "db_table_name",
88 action="store",
89 choices=choices,
90 help='The view you want to modify".',
91 )
93 parser.add_argument(
94 "migration_name",
95 action="store",
96 help="The name of the migration that will be created.",
97 )
99 def _call_command(self, *args):
100 err = io.StringIO()
101 out = io.StringIO()
103 # If we don't do this, sometimes we can't import a newly created migration.
104 # Do it here, so we don't need to know which calls require it, and which don't.
105 importlib.invalidate_caches()
106 with contextlib.redirect_stdout(out), contextlib.redirect_stderr(err):
107 call_command(*args)
109 # Did an error occur?
110 if err.tell():
111 err.seek(0)
112 self.stdout.write(self.style.ERROR(err.read()))
113 return False
115 # Return the results.
116 out.seek(0)
117 return out.readlines()
119 def _create_migration_folder_and_initial_migration_if_needed(self, migrations_path, app_label):
120 created_initial_migration = False
121 if not os.path.exists(migrations_path):
122 # No, create one with an __init__.py file.
123 os.mkdir(migrations_path)
124 with open(os.path.join(migrations_path, "__init__.py"), "w") as f:
125 f.write("")
126 self.stdout.write(self.style.SUCCESS(f"\nCreated 'migrations' folder in app '{app_label}'."))
128 # Are there any migrations?
129 results = self._call_command("showmigrations", app_label)
130 if results is False: # Erred.
131 return
133 results = map(str.strip, results)
134 if "(no migrations)" in results:
135 # Create the initial migration for the app.
136 self.stdout.write(f"\nCreating initial migration for app '{app_label}'.")
137 results = self._call_command("makemigrations", app_label, "--noinput")
138 if not results: # Erred.
139 return
141 for result in results:
142 self.stdout.write(result)
143 created_initial_migration = True
145 return created_initial_migration
147 def _create_sql_folder_if_needed(self, sql_path, app_label):
148 created_sql_folder = False
149 if not os.path.exists(sql_path):
150 # No, create one.
151 os.mkdir(sql_path)
152 self.stdout.write(self.style.SUCCESS(f"\nCreated 'sql' folder in app '{app_label}'."))
153 created_sql_folder = True
154 return created_sql_folder
156 @staticmethod
157 def _parse_migration_number_from_show_migrations(line):
158 # Find the first word in the line that only contains numbers.
159 # The beginning words should either be '[X]', '[', or ']'.
160 for word in line.replace(" ", "_").split("_"):
161 if word.isdigit():
162 return word
164 @staticmethod
165 def _is_migration_modified(db_table_name, migrations_path, migration_name, num):
166 with open(os.path.join(migrations_path, f"{migration_name}.py"), encoding="utf-8") as f:
167 # Did we modify this migration? Check the first 10 lines for our modified comment.
168 found_modified_comment = False
169 for migration_line_no, migration_line in enumerate(f.readlines()):
170 if migration_line.find("Modified using django-view-manager") != -1:
171 found_modified_comment = True
173 if not found_modified_comment and migration_line_no > 20:
174 break
176 if (
177 migration_line.find(f"view-{db_table_name}-latest") != -1
178 or migration_line.find(f"view-{db_table_name}-{num}") != -1
179 ):
180 return True
182 return False
184 def _get_migration_numbers_and_names(self, db_table_name, app_label, migrations_path, *, only_latest=False):
185 show_migration_results = self._call_command("showmigrations", app_label)
186 if not show_migration_results: # Erred. The reason will be printed to the console via the command.
187 if only_latest:
188 return None
189 return None
191 # Parse the migration numbers that `showmigrations` returns
192 # and get the migration numbers, or new migration number.
193 migration_numbers_and_names = {}
194 migration_name = None
196 for line in show_migration_results:
197 num = self._parse_migration_number_from_show_migrations(line)
199 if num:
200 migration_num = decimal.Decimal(num)
201 migration_name = line.replace("[X]", "").replace("[-]", "").replace("[ ]", "").strip()
202 if "squashed migrations)" in migration_name: 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true
203 migration_name = migration_name[: migration_name.find(" (")]
205 # When looking for the latest, we haven't modified the migration yet, so we can't match by comment.
206 if not only_latest and self._is_migration_modified(db_table_name, migrations_path, migration_name, num):
207 migration_numbers_and_names[migration_num] = migration_name
209 if only_latest:
210 # If for some reason we can't find the latest migration.
211 return migration_name
213 return migration_numbers_and_names
215 @staticmethod
216 def _get_sql_numbers_and_names(sql_path, db_table_name):
217 sql_numbers_and_names = {}
219 view_name_start = f"view-{db_table_name}-"
220 view_name_end = ".sql"
222 for filename in os.listdir(sql_path):
223 if filename.startswith(view_name_start) and filename.endswith(view_name_end):
224 sql_file_num = filename.replace(view_name_start, "").replace(view_name_end, "")
226 sql_file_name = None
227 if "-" in sql_file_num:
228 sql_file_num, sql_file_name = sql_file_num.split("-")
230 # Convert the number to an int, so we can max them.
231 if sql_file_num == LATEST_VIEW_NAME:
232 sql_numbers_and_names[(LATEST_VIEW_NUMBER, sql_file_name)] = filename
234 elif sql_file_num.isdigit():
235 sql_numbers_and_names[(decimal.Decimal(sql_file_num), sql_file_name)] = filename
237 return sql_numbers_and_names
239 @staticmethod
240 def _get_latest_migration_number_and_name(migration_numbers_and_names, sql_numbers_and_names):
241 largest_migration_number = latest_sql_number = None
243 for migration_number, migration_name in migration_numbers_and_names.items():
244 largest_migration_number = (migration_number, migration_name)
246 for sql_number, sql_name in sql_numbers_and_names:
247 if sql_number is LATEST_VIEW_NUMBER:
248 latest_sql_number = (sql_number, sql_name)
250 if largest_migration_number and latest_sql_number is not None:
251 return largest_migration_number
253 return None, None
255 def _create_empty_migration(self, app_label, migration_name, created_initial_migration, created_sql_folder):
256 if created_initial_migration or created_sql_folder:
257 self.stdout.write("\nCreating empty migration for the new SQL view.")
258 else:
259 self.stdout.write("\nCreating empty migration for the SQL changes.")
261 # Force the migration to have a RunSQL operation with text we can easily find/replace.
262 migrations.Migration.operations = [
263 migrations.RunSQL("SELECT 'replace_forwards';", reverse_sql="SELECT 'replace_reverse';")
264 ]
265 results = self._call_command("makemigrations", app_label, "--empty", f"--name={migration_name}", "--noinput")
266 migrations.Migration.operations = []
267 if not results: # Erred.
268 return
270 for result in results:
271 self.stdout.write(result)
273 return results
275 def _copy_latest_sql_view(self, sql_path, latest_sql_filename, historical_sql_filename):
276 with open(os.path.join(sql_path, latest_sql_filename), "r+", encoding="utf-8") as f_in:
277 content = f_in.read()
278 with open(os.path.join(sql_path, historical_sql_filename), "w", encoding="utf-8") as f_out:
279 f_out.write(content)
281 if "This file was generated using django-view-manager" not in content:
282 f_in.seek(0)
283 f_in.truncate()
284 f_in.write(COPIED_SQL_VIEW_CONTENT)
285 f_in.write(content)
287 self.stdout.write(self.style.SUCCESS(f"\nCreated historical SQL view file - '{historical_sql_filename}'."))
289 def _create_latest_sql_file(self, sql_path, db_table_name):
290 latest_sql_filename = f"view-{db_table_name}-{LATEST_VIEW_NAME}.sql"
292 with open(os.path.join(sql_path, latest_sql_filename), "w", encoding="utf-8") as f:
293 f.write(INITIAL_SQL_VIEW_CONTENT.format(view_name=db_table_name))
295 self.stdout.write(self.style.SUCCESS(f"\nCreated new SQL view file - '{latest_sql_filename}'."))
297 def _find_and_rewrite_migrations_containing_latest(
298 self,
299 migration_numbers_and_names,
300 migrations_path,
301 latest_sql_filename,
302 historical_sql_filename,
303 ):
304 for migration_name in migration_numbers_and_names.values():
305 with open(os.path.join(migrations_path, f"{migration_name}.py"), "r+", encoding="utf-8") as f:
306 lines = f.readlines()
307 modified_migration = False
308 sql_line_no = 0
309 for line_no, line in enumerate(lines):
310 if line.find(latest_sql_filename) != -1:
311 sql_line_no = line_no
312 break
314 if sql_line_no:
315 lines[sql_line_no] = lines[sql_line_no].replace(latest_sql_filename, historical_sql_filename)
316 modified_migration = True
318 if modified_migration:
319 self.stdout.write(
320 self.style.SUCCESS(
321 f"\nModified migration '{migration_name}' to read from '{historical_sql_filename}'."
322 )
323 )
324 f.seek(0)
325 f.truncate(0)
326 f.writelines(lines)
328 def _rewrite_latest_migration(self, migrations_path, migration_name, latest_sql_filename, historical_sql_filename):
329 with open(os.path.join(migrations_path, migration_name + ".py"), "r+", encoding="utf-8") as f:
330 lines = f.readlines()
331 generated_line_no = latest_sql_line_no = 0
332 add_modified_message = False
333 for line_no, line in enumerate(lines):
334 if line.find("Generated by Django") != -1:
335 # Should be the first line, but we shouldn't assume that.
336 generated_line_no = line_no
337 if lines[line_no + 1].find("Modified using django-view-manager") == -1:
338 add_modified_message = True
339 elif line.find(latest_sql_filename) != -1:
340 latest_sql_line_no = line_no
341 # We write lines starting from the bottom to the top, so our line numbers are correct through the process.
342 lines[latest_sql_line_no] = lines[latest_sql_line_no].replace(latest_sql_filename, historical_sql_filename)
343 if add_modified_message:
344 # Insert the modified comment after the Django generated comment.
345 lines[generated_line_no + 1 : generated_line_no + 1] = MIGRATION_MODIFIED_COMMENT
346 f.seek(0)
347 f.truncate(0)
348 f.writelines(lines)
350 def _rewrite_migration(
351 self,
352 migrations_path,
353 sql_path,
354 db_table_name,
355 migration_name,
356 forward_sql_filename,
357 *,
358 reverse_sql_filename=None,
359 ):
360 with open(os.path.join(migrations_path, migration_name + ".py"), "r+", encoding="utf-8") as f:
361 generated_line_no = imports_line_no = class_line_no = replace_forwards_line_no = replace_reverse_line_no = 0
362 lines = f.readlines()
363 for line_no, line in enumerate(lines):
364 if line.find("Generated by Django") != -1:
365 # Should be the first line, but we shouldn't assume that.
366 generated_line_no = line_no
367 elif line.find("import") != -1 and not imports_line_no:
368 imports_line_no = line_no
369 elif line.startswith("class Migration"):
370 class_line_no = line_no
371 elif line.find("replace_forwards") != -1:
372 replace_forwards_line_no = line_no
373 elif line.find("replace_reverse") != -1:
374 replace_reverse_line_no = line_no
375 # We write lines starting from the bottom to the top, so our line numbers are correct through the process.
376 lines[replace_reverse_line_no] = lines[replace_reverse_line_no].replace(
377 '''"SELECT 'replace_reverse';"''',
378 "reverse_sql" if reverse_sql_filename else f'"DROP VIEW IF EXISTS {db_table_name};"',
379 )
380 lines[replace_forwards_line_no] = lines[replace_forwards_line_no].replace(
381 '''"SELECT 'replace_forwards';"''', "forwards_sql"
382 )
383 lines[class_line_no - 1 : class_line_no] = [
384 f'\nsql_path = "{os.path.relpath(sql_path)}"\n',
385 f'forward_sql_filename = "{forward_sql_filename}"\n',
386 f'reverse_sql_filename = "{reverse_sql_filename}"\n' if reverse_sql_filename else "",
387 "\n",
388 "with open(os.path.join(sql_path, forward_sql_filename)) as f:\n",
389 " forwards_sql = f.read()\n",
390 "\n",
391 "with open(os.path.join(sql_path, reverse_sql_filename)) as f:\n" if reverse_sql_filename else "",
392 " reverse_sql = f.read()\n" if reverse_sql_filename else "",
393 "\n" if reverse_sql_filename else "",
394 ]
395 lines[imports_line_no - 1 : imports_line_no] = [
396 "import os\n",
397 "\n",
398 ]
399 # Insert the generated comment at the top.
400 lines[generated_line_no + 1 : generated_line_no + 1] = [MIGRATION_MODIFIED_COMMENT]
401 f.seek(0)
402 f.writelines(lines)
404 def _get_historical_sql_filename(
405 self, db_table_name, latest_migration_number, latest_migration_name, sql_numbers_and_names
406 ):
407 historical_sql_filename = f"view-{db_table_name}-{str(latest_migration_number).zfill(4)}.sql"
408 # Do multiple migrations with the same number exist?
409 # If so, we need to include the migration name in the sql view name.
410 if historical_sql_filename in sql_numbers_and_names.values():
411 latest_migration_name = latest_migration_name.split("_", 1)[1] # Remove the migration number.
412 historical_sql_filename = (
413 f"view-{db_table_name}-{str(latest_migration_number).zfill(4)}-{latest_migration_name}.sql"
414 )
416 return historical_sql_filename
418 @atomic
419 def handle(self, *args, **options):
420 # Get passed in args.
421 db_table_name = options["db_table_name"]
422 migration_name = options["migration_name"]
424 # Get paths we need.
425 model = self.get_model(db_table_name)
426 model_meta = model._meta
427 app_config = model._meta.app_config
428 app_label = model_meta.app_label
429 path = app_config.path
430 sql_path = os.path.join(path, "sql")
431 migrations_path = os.path.join(path, "migrations")
433 # Does this app have a `migrations` folder, and if so, any migrations?
434 created_initial_migration = self._create_migration_folder_and_initial_migration_if_needed(
435 migrations_path, app_label
436 )
437 if created_initial_migration is None: # Erred.
438 return
440 # Does this app have an `sql` folder?
441 created_sql_folder = self._create_sql_folder_if_needed(sql_path, app_label)
443 # Get the migration numbers and names.
444 migration_numbers_and_names = self._get_migration_numbers_and_names(db_table_name, app_label, migrations_path)
445 if migration_numbers_and_names is None: # Erred.
446 return
448 # Get any existing migrations and sql view names and numbers.
449 sql_numbers_and_names = self._get_sql_numbers_and_names(sql_path, db_table_name)
451 # Figure out if we have a `latest` sql file and which migration is associates to it.
452 latest_migration_number, latest_migration_name = self._get_latest_migration_number_and_name(
453 migration_numbers_and_names, sql_numbers_and_names
454 )
456 # Create the empty migration for the SQL view.
457 results = self._create_empty_migration(app_label, migration_name, created_initial_migration, created_sql_folder)
458 if results is None: # Erred
459 return
461 # Get the new migration number and name.
462 new_migration_name = self._get_migration_numbers_and_names(
463 db_table_name, app_label, migrations_path, only_latest=True
464 )
466 if new_migration_name is None:
467 raise RuntimeError("Unable to find the name and number of the newly created migration.")
469 # Is there a `latest` SQL view and migration?
470 if latest_migration_number is not None and latest_migration_name is not None:
471 latest_sql_filename = f"view-{db_table_name}-{LATEST_VIEW_NAME}.sql"
472 historical_sql_filename = self._get_historical_sql_filename(
473 db_table_name, latest_migration_number, latest_migration_name, sql_numbers_and_names
474 )
476 # Copy the `latest` SQL view to match the latest migration number.
477 self._copy_latest_sql_view(sql_path, latest_sql_filename, historical_sql_filename)
479 # Update the historical migration to use the new historical sql view filename.
480 self._rewrite_latest_migration(
481 migrations_path, latest_migration_name, latest_sql_filename, historical_sql_filename
482 )
483 self.stdout.write(
484 self.style.SUCCESS(
485 f"\nModified migration '{latest_migration_name}' to read from '{historical_sql_filename}'."
486 )
487 )
489 # Find any additional migrations which should be switched to use the historical sql view filename.
490 self._find_and_rewrite_migrations_containing_latest(
491 migration_numbers_and_names,
492 migrations_path,
493 latest_sql_filename,
494 historical_sql_filename,
495 )
497 # Update the empty migration to use the `latest` sql view filename.
498 self._rewrite_migration(
499 migrations_path,
500 sql_path,
501 db_table_name,
502 new_migration_name,
503 forward_sql_filename=latest_sql_filename,
504 reverse_sql_filename=historical_sql_filename,
505 )
506 self.stdout.write(
507 self.style.SUCCESS(
508 f"\nModified migration '{new_migration_name}' to read from "
509 f"'{latest_sql_filename}' and '{historical_sql_filename}'."
510 )
511 )
513 else:
514 latest_sql_filename = f"view-{db_table_name}-{LATEST_VIEW_NAME}.sql"
516 # Create the `latest` SQL view.
517 self._create_latest_sql_file(sql_path, db_table_name)
519 # Update the emtpy migration to use the `latest` sql view filename.
520 self._rewrite_migration(
521 migrations_path,
522 sql_path,
523 db_table_name,
524 new_migration_name,
525 forward_sql_filename=latest_sql_filename,
526 )
527 self.stdout.write(
528 self.style.SUCCESS(f"\nModified migration '{new_migration_name}' to read from '{latest_sql_filename}'.")
529 )
531 self.stdout.write(f"\nDone - You can now edit '{latest_sql_filename}'.\n\n")