Skip to content

Experiment Migration Management Command #1721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
from uuid import uuid4

from django.core.management.base import BaseCommand
from django.db import transaction
from django.db.models import Q

from apps.experiments.models import Experiment
from apps.pipelines.flow import FlowNode, FlowNodeData
from apps.pipelines.models import Pipeline
from apps.pipelines.nodes.nodes import AssistantNode, LLMResponseWithPrompt
from apps.teams.models import Flag


class Command(BaseCommand):
help = "Convert assistant and LLM experiments to pipeline experiments"

def add_arguments(self, parser):
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be converted without making changes",
)
parser.add_argument(
"--team-slug",
type=str,
help="Only convert experiments for a specific team (by slug)",
)
parser.add_argument(
"--experiment-id",
type=int,
help="Convert only a specific experiment by ID",
)
parser.add_argument(
"--chatbots-flag-only",
action="store_true",
help='Only convert experiments for teams that have the "flag_chatbots" feature flag enabled',
)

def handle(self, *args, **options):
dry_run = options["dry_run"]
team_slug = options.get("team_slug")
experiment_id = options.get("experiment_id")
chatbots_flag_only = options["chatbots_flag_only"]

query = Q(pipeline__isnull=True) & (Q(assistant__isnull=False) | Q(llm_provider__isnull=False))

if team_slug:
query &= Q(team__slug=team_slug)

if chatbots_flag_only:
chatbots_flag_team_ids = self._get_chatbots_flag_team_ids()
if not chatbots_flag_team_ids:
self.stdout.write(self.style.WARNING('No teams found with the "flag_chatbots" feature flag enabled.'))
return
query &= Q(team_id__in=chatbots_flag_team_ids)
self.stdout.write(f"Filtering to teams with 'flag_chatbots' FF ({len(chatbots_flag_team_ids)} teams)")

if experiment_id:
query &= Q(id=experiment_id)
experiment = (
Experiment.objects.filter(query)
.select_related("team", "assistant", "llm_provider", "llm_provider_model")
.first()
)
if not experiment:
self.stdout.write(
self.style.WARNING(f"Experiment {experiment_id} not found or does not need migration.")
)
return

if not (experiment.is_default_version or experiment.is_working_version):
self.stdout.write(
self.style.WARNING(
f"Experiment {experiment_id} is not a published or unreleased version so does not\
require migration."
)
)
return

experiments_to_convert = experiment
experiment_count = 1
else:
default_experiments = Experiment.objects.filter(query & Q(is_default_version=True))
default_working_version_ids = default_experiments.exclude(working_version__isnull=True).values_list(
"working_version_id", flat=True
)
working_experiments = Experiment.objects.filter(query & Q(working_version__isnull=True)).exclude(
id__in=default_working_version_ids
)
combined_ids = list(default_experiments.union(working_experiments).values_list("id", flat=True))
experiments_to_convert = Experiment.objects.filter(id__in=combined_ids).select_related(
"team", "assistant", "llm_provider", "llm_provider_model"
)
Comment on lines +91 to +93
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be that loading all of these into memory at once could be an issue so it would be prudent to use an iterator:

Suggested change
experiments_to_convert = Experiment.objects.filter(id__in=combined_ids).select_related(
"team", "assistant", "llm_provider", "llm_provider_model"
)
experiments_to_convert_queryset = Experiment.objects.filter(id__in=combined_ids).select_related(
"team", "assistant", "llm_provider", "llm_provider_model"
)
# do this here since you can't do a count on an iterator
experiment_count = experiments_to_convert_queryset.count()
experiments_to_convert = experiments_to_convert_queryset.iterator(20)

experiment_count = experiments_to_convert.count()

if not experiment_count:
self.stdout.write(self.style.WARNING("No matching experiments found."))
return

self.stdout.write(f"Found {experiment_count} experiments to migrate:")

if dry_run:
if experiment_count == 1:
self._log_experiment_info(experiments_to_convert)
else:
for experiment in experiments_to_convert.iterator(20):
self._log_experiment_info(experiment)
self.stdout.write(self.style.WARNING("\nDry run - no changes will be made."))
return

confirm = input("\nContinue? (y/N): ")
if confirm.lower() != "y":
self.stdout.write("Cancelled.")
return

converted_count = 0
failed_count = 0
if experiment_count == 1:
converted_count, failed_count = self._process_expriment(
experiments_to_convert, converted_count, failed_count
)
else:
for experiment in experiments_to_convert.iterator(20):
converted_count, failed_count = self._process_expriment(experiment, converted_count, failed_count)
self.stdout.write(
self.style.SUCCESS(f"\nMigration is complete!: {converted_count} succeeded, {failed_count} failed")
)

def _process_expriment(self, experiment, converted_count, failed_count):
self._log_experiment_info(experiment)
try:
with transaction.atomic():
self._convert_experiment(experiment)
converted_count += 1
self.stdout.write(self.style.SUCCESS(f"Success: {experiment.name}"))
except Exception as e:
failed_count += 1
self.stdout.write(self.style.ERROR(f"FAILED {experiment.name}: {str(e)}"))
return converted_count, failed_count

def _log_experiment_info(self, experiment):
experiment_type = "Assistant" if experiment.assistant else "LLM" if experiment.llm_provider else "Unknown"
team_info = f"{experiment.team.name} ({experiment.team.slug})"
self.stdout.write(f"{experiment.name} (ID: {experiment.id}) - Type: {experiment_type} - Team: {team_info}")

def _convert_experiment(self, experiment):
if experiment.assistant:
pipeline = self._create_assistant_pipeline(experiment)
elif experiment.llm_provider:
pipeline = self._create_llm_pipeline(experiment)
else:
raise ValueError(f"Unknown experiment type for experiment {experiment.id}")

experiment.pipeline = pipeline
experiment.assistant = None
experiment.llm_provider = None
experiment.llm_provider_model = None

experiment.save()

def _get_chatbots_flag_team_ids(self):
chatbots_flag = Flag.objects.get(name="flag_chatbots")
return list(chatbots_flag.teams.values_list("id", flat=True))

def _create_pipeline_with_node(self, experiment, node_type, node_label, node_params):
"""Create a pipeline with start -> custom_node -> end structure."""
pipeline_name = f"{experiment.name} Pipeline"
node_id = str(uuid4())
node = FlowNode(
id=node_id,
type="pipelineNode",
position={"x": 400, "y": 200},
data=FlowNodeData(
id=node_id,
type=node_type,
label=node_label,
params=node_params,
),
)

return Pipeline._create_pipeline_with_nodes(team=experiment.team, name=pipeline_name, middle_node=node)

def _create_llm_pipeline(self, experiment):
"""Create a start -> LLMResponseWithPrompt -> end nodes pipeline for an LLM experiment."""
llm_params = {
"name": "llm",
"llm_provider_id": experiment.llm_provider.id,
"llm_provider_model_id": experiment.llm_provider_model.id,
"llm_temperature": experiment.temperature,
"history_type": "global",
"history_name": None,
"history_mode": "summarize",
"user_max_token_limit": experiment.llm_provider_model.max_token_limit,
"max_history_length": 10,
"source_material_id": experiment.source_material.id if experiment.source_material else None,
"prompt": experiment.prompt_text or "",
"tools": list(experiment.tools) if experiment.tools else [],
"custom_actions": [
op.get_model_id(False)
for op in experiment.custom_action_operations.select_related("custom_action").all()
],
"built_in_tools": [],
"tool_config": {},
}
return self._create_pipeline_with_node(
experiment=experiment, node_type=LLMResponseWithPrompt.__name__, node_label="LLM", node_params=llm_params
)

def _create_assistant_pipeline(self, experiment):
"""Create a start -> AssistantNode -> end nodes pipeline for an assistant experiment."""
assistant_params = {
"name": "assistant",
"assistant_id": str(experiment.assistant.id),
"citations_enabled": experiment.citations_enabled,
"input_formatter": experiment.input_formatter or "",
}

return self._create_pipeline_with_node(
experiment=experiment,
node_type=AssistantNode.__name__,
node_label="OpenAI Assistant",
node_params=assistant_params,
)
109 changes: 59 additions & 50 deletions apps/pipelines/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,36 +115,72 @@ def version_display(self) -> str:
return f"v{self.version_number}"

@classmethod
def create_default_pipeline_with_name(cls, team, name, llm_provider_id=None, llm_provider_model=None):
return cls.create_default(team, name, llm_provider_id, llm_provider_model)

@classmethod
def create_default(cls, team, name=None, llm_provider_id=None, llm_provider_model=None):
def _create_pipeline_with_nodes(cls, team, name, middle_node=None):
"""
Create a pipeline with start -> middle node -> end structure.
"""
from apps.pipelines.nodes.nodes import EndNode, StartNode

default_name = "New Pipeline" if name is None else name
existing_pipeline_count = cls.objects.filter(team=team, name__startswith=default_name).count()

start_id = str(uuid4())
start_node_id = str(uuid4())
end_node_id = str(uuid4())
start_node = FlowNode(
id=start_id,
id=start_node_id,
type="startNode",
position={
"x": -200,
"y": 200,
},
data=FlowNodeData(id=start_id, type=StartNode.__name__, params={"name": "start"}),
position={"x": 100, "y": 200},
data=FlowNodeData(
id=start_node_id,
type=StartNode.__name__,
label="",
params={"name": "start"},
),
)
end_id = str(uuid4())
end_node = FlowNode(
id=end_id,
id=end_node_id,
type="endNode",
position={"x": 1000, "y": 200},
data=FlowNodeData(id=end_id, type=EndNode.__name__, params={"name": "end"}),
position={"x": 800, "y": 200},
data=FlowNodeData(
id=end_node_id,
type=EndNode.__name__,
label="",
params={"name": "end"},
),
)
all_flow_nodes = [start_node]
if middle_node:
all_flow_nodes.append(middle_node)
all_flow_nodes.append(end_node)
edges = []
if middle_node:
for i in range(len(all_flow_nodes) - 1):
current_node = all_flow_nodes[i]
next_node = all_flow_nodes[i + 1]
edge = {
"id": f"edge-{current_node.id}-{next_node.id}",
"source": current_node.id,
"target": next_node.id,
"sourceHandle": "output",
"targetHandle": "input",
}
edges.append(edge)
pipeline = cls.objects.create(
team=team, name=name, data={"nodes": [node.model_dump() for node in all_flow_nodes], "edges": edges}
)
pipeline.update_nodes_from_data()
return pipeline

@classmethod
def create_default_pipeline_with_name(cls, team, name, llm_provider_id=None, llm_provider_model=None):
return cls.create_default(team, name, llm_provider_id, llm_provider_model)

@classmethod
def create_default(cls, team, name=None, llm_provider_id=None, llm_provider_model=None):
default_name = "New Pipeline" if name is None else name
existing_pipeline_count = cls.objects.filter(team=team, name__startswith=default_name).count()

node = None
if llm_provider_id and llm_provider_model:
llm_id = f"LLMResponseWithPrompt-{uuid4().hex[:5]}"
llm_node = FlowNode(
node = FlowNode(
id=llm_id,
type="pipelineNode",
position={"x": 300, "y": 0},
Expand All @@ -170,36 +206,9 @@ def create_default(cls, team, name=None, llm_provider_id=None, llm_provider_mode
},
),
)
edges = [
{
"id": f"edge-{start_id}-{llm_id}",
"source": start_id,
"target": llm_id,
"sourceHandle": "output",
"targetHandle": "input",
},
{
"id": f"edge-{llm_id}-{end_id}",
"source": llm_id,
"target": end_id,
"sourceHandle": "output",
"targetHandle": "input",
},
]
else:
llm_node = None
edges = []
default_nodes = [start_node.model_dump()]
if llm_node:
default_nodes.append(llm_node.model_dump())
default_nodes.append(end_node.model_dump())
new_pipeline = cls.objects.create(
team=team,
data={"nodes": default_nodes, "edges": edges},
name=default_name if name else f"New Pipeline {existing_pipeline_count + 1}",
)
new_pipeline.update_nodes_from_data()
return new_pipeline

final_name = default_name if name else f"New Pipeline {existing_pipeline_count + 1}"
return cls._create_pipeline_with_nodes(team=team, name=final_name, middle_node=node)

def get_absolute_url(self):
return reverse("pipelines:edit", args=[self.team.slug, self.id])
Expand Down