Skip to content

Commit

Permalink
fix: optimize migration
Browse files Browse the repository at this point in the history
  • Loading branch information
gary-Shen committed Jul 8, 2024
1 parent 118d791 commit f3f5df4
Showing 1 changed file with 108 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,120 +43,127 @@ def upgrade() -> None:
Session = sessionmaker(bind=bind)
session = Session()

with context.begin_transaction():
# Create a new table task_pre_annotation
if not alembic_labelu_tools.table_exist("task_pre_annotation"):
op.create_table(
"task_pre_annotation",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True, index=True),
sa.Column("task_id", sa.Integer, sa.ForeignKey("task.id"), index=True),
sa.Column("file_id", sa.Integer, sa.ForeignKey("task_attachment.id"), index=True),
# json 字符串
sa.Column("data", sa.Text, comment="task sample pre annotation result"),
sa.Column("created_by", sa.Integer, sa.ForeignKey("user.id"), index=True),
sa.Column("updated_by", sa.Integer, sa.ForeignKey("user.id")),
sa.Column(
"created_at",
sa.DateTime,
default=sa.func.now(),
comment="Time a task sample result was created",
),
sa.Column(
"updated_at",
sa.DateTime,
default=sa.func.now(),
onupdate=sa.func.now(),
comment="Last time a task sample result was updated",
),
sa.Column(
"deleted_at",
sa.DateTime,
index=True,
comment="Task delete time",
),
)
# Update the task_sample table
if not alembic_labelu_tools.column_exist_in_table(
"task_sample", "file_id"
):
with op.batch_alter_table('task_sample', recreate="always") as batch_op:
batch_op.add_column(
try:

with context.begin_transaction():
# Create a new table task_pre_annotation
if not alembic_labelu_tools.table_exist("task_pre_annotation"):
op.create_table(
"task_pre_annotation",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True, index=True),
sa.Column("task_id", sa.Integer, sa.ForeignKey("task.id"), index=True),
sa.Column("file_id", sa.Integer, sa.ForeignKey("task_attachment.id"), index=True),
# json 字符串
sa.Column("data", sa.Text, comment="task sample pre annotation result"),
sa.Column("created_by", sa.Integer, sa.ForeignKey("user.id"), index=True),
sa.Column("updated_by", sa.Integer, sa.ForeignKey("user.id")),
sa.Column(
"file_id",
sa.Integer(),
sa.ForeignKey("task_attachment.id", name="fk_file_id"),
index=True,
comment="file id",
"created_at",
sa.DateTime,
default=sa.func.now(),
comment="Time a task sample result was created",
),
)

# Update the task_attachment table
if not alembic_labelu_tools.column_exist_in_table("task_attachment", "filename"):
with op.batch_alter_table("task_attachment", recreate="always") as batch_op_task_attachment:
batch_op_task_attachment.add_column(
sa.Column(
"filename",
sa.String(256),
comment="file name",
"updated_at",
sa.DateTime,
default=sa.func.now(),
onupdate=sa.func.now(),
comment="Last time a task sample result was updated",
),
)
batch_op_task_attachment.add_column(
sa.Column(
"url",
sa.String(256),
comment="file url",
"deleted_at",
sa.DateTime,
index=True,
comment="Task delete time",
),
)

# Update existing data in the task_sample table
task_items = session.execute(
'SELECT id, config FROM task'
)

# Update the task_attachment table
attachments = session.execute(
'SELECT id, path FROM task_attachment'
)

for attachment in attachments:
attachment_id = attachment[0]
attachment_path = attachment[1]
filename = os.path.basename(attachment_path)
url = f"{settings.API_V1_STR}/tasks/attachment/{attachment_path}"
# Update the task_sample table
if not alembic_labelu_tools.column_exist_in_table(
"task_sample", "file_id"
):
with op.batch_alter_table('task_sample', recreate="always") as batch_op:
batch_op.add_column(
sa.Column(
"file_id",
sa.Integer(),
sa.ForeignKey("task_attachment.id", name="fk_file_id"),
index=True,
comment="file id",
),
)

if filename:
session.execute(
f"UPDATE task_attachment SET filename='{filename}', url='{url}' WHERE id={attachment_id}"
)

for task_item in task_items:
task_id = task_item[0]
task_samples = session.execute(
f"SELECT id, task_attachment_ids FROM task_sample WHERE task_id={task_id}"
# Update the task_attachment table
if not alembic_labelu_tools.column_exist_in_table("task_attachment", "filename"):
with op.batch_alter_table("task_attachment", recreate="always") as batch_op_task_attachment:
batch_op_task_attachment.add_column(
sa.Column(
"filename",
sa.String(256),
comment="file name",
),
)
batch_op_task_attachment.add_column(
sa.Column(
"url",
sa.String(256),
comment="file url",
),
)

# Update existing data in the task_sample table
task_items = session.execute(
'SELECT id, config FROM task'
)

for task_sample in task_samples:
task_sample_id = task_sample[0]
attachment_ids = json.loads(task_sample[1])
# attachment_ids 存储的是字符串[id1, id2, id3],需要转换成数组
file_id = attachment_ids[0]

if not file_id:
continue

attachment = session.execute(
f"SELECT id, path FROM task_attachment WHERE id={file_id}"
)
attachment_path = list(attachment)[0][1]

# Update the task_attachment table
attachments = session.execute(
'SELECT id, path FROM task_attachment'
)

for attachment in attachments:
attachment_id = attachment[0]
attachment_path = attachment[1]
filename = os.path.basename(attachment_path)
url = f"{settings.API_V1_STR}/tasks/attachment/{attachment_path}"

if attachment_path:
# Update the task_sample table
if filename:
session.execute(
f"UPDATE task_sample SET file_id={file_id} WHERE id={task_sample_id}"
f"UPDATE task_attachment SET filename='{filename}', url='{url}' WHERE id={attachment_id}"
)

for task_item in task_items:
task_id = task_item[0]
task_samples = session.execute(
f"SELECT id, task_attachment_ids FROM task_sample WHERE task_id={task_id}"
)

for task_sample in task_samples:
task_sample_id = task_sample[0]
attachment_ids = json.loads(task_sample[1])
# attachment_ids 存储的是字符串[id1, id2, id3],需要转换成数组
file_id = attachment_ids[0]

if not file_id:
continue

attachment = session.execute(
f"SELECT id, path FROM task_attachment WHERE id={file_id}"
)

session.commit()
attachment_path = list(attachment)[0][1]

if attachment_path:
# Update the task_sample table
session.execute(
f"UPDATE task_sample SET file_id={file_id} WHERE id={task_sample_id}"
)

session.commit()

except Exception as e:
session.rollback()
raise e
finally:
session.close()

def downgrade() -> None:
op.drop_table("task_pre_annotation")
Expand Down

0 comments on commit f3f5df4

Please sign in to comment.