From 7478881ce66efc5c9724808c303e32f9f9047f81 Mon Sep 17 00:00:00 2001 From: Steph Herbers Date: Thu, 5 Jun 2025 18:11:55 -0400 Subject: [PATCH 01/18] initial --- ...ate_nonpipeline_to_pipeline_experiments.py | 266 ++++++++++++++++++ 1 file changed, 266 insertions(+) create mode 100644 apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py new file mode 100644 index 000000000..ee7e34ea0 --- /dev/null +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -0,0 +1,266 @@ +from uuid import uuid4 + +from django.core.management.base import BaseCommand +from django.db import transaction +from django.db.models import Q + +from apps.experiments.models import Experiment +from apps.pipelines.flow import FlowNode, FlowNodeData +from apps.pipelines.models import Pipeline +from apps.pipelines.nodes.nodes import AssistantNode, EndNode, LLMResponseWithPrompt, StartNode +from apps.teams.models import Flag + + +class Command(BaseCommand): + help = "Convert assistant and LLM experiments to pipeline experiments" + + def add_arguments(self, parser): + parser.add_argument( + "--dry-run", + action="store_true", + help="Show what would be converted without making changes", + ) + parser.add_argument( + "--team-slug", + type=str, + help="Only convert experiments for a specific team (by slug)", + ) + parser.add_argument( + "--experiment-id", + type=int, + help="Convert only a specific experiment by ID", + ) + parser.add_argument( + "--chatbots-flag-only", + action="store_true", + help='Only convert experiments for teams that have the "flag_chatbots" feature flag enabled', + ) + + def handle(self, *args, **options): + dry_run = options["dry_run"] + team_slug = options.get("team_slug") + experiment_id = options.get("experiment_id") + chatbots_flag_only = options["chatbots_flag_only"] + + query = Q(pipeline__isnull=True) & (Q(assistant__isnull=False) | Q(llm_provider__isnull=False)) + + if team_slug: + query &= Q(team__slug=team_slug) + + if experiment_id: + query &= Q(id=experiment_id) + + if chatbots_flag_only: + chatbots_flag_team_ids = self._get_chatbots_flag_team_ids() + if not chatbots_flag_team_ids: + self.stdout.write(self.style.WARNING('No teams found with the "flag_chatbots" feature flag enabled.')) + return + query &= Q(team_id__in=chatbots_flag_team_ids) + self.stdout.write(f"Filtering to teams with 'flag_chatbots' FF ({len(chatbots_flag_team_ids)} teams)") + + experiments_to_convert = Experiment.objects.filter(query).select_related( + "team", "assistant", "llm_provider", "llm_provider_model" + ) + + if not experiments_to_convert.exists(): + self.stdout.write(self.style.WARNING("No matching experiments found.")) + return + + self.stdout.write(f"Found {experiments_to_convert.count()} experiments to migrate:") + + for experiment in experiments_to_convert: + experiment_type = self._get_experiment_type(experiment) + team_info = f"{experiment.team.name} ({experiment.team.slug})" + self.stdout.write(f"{experiment.name} (ID: {experiment.id}) - Type: {experiment_type} - Team: {team_info}") + + if dry_run: + self.stdout.write(self.style.WARNING("\nDry run - no changes will be made.")) + return + + confirm = input("\nContinue? (y/N): ") + if confirm.lower() != "y": + self.stdout.write("Cancelled.") + return + + converted_count = 0 + failed_count = 0 + + for experiment in experiments_to_convert: + try: + with transaction.atomic(): + self._convert_experiment(experiment) + converted_count += 1 + self.stdout.write(self.style.SUCCESS(f"Success: {experiment.name}")) + except Exception as e: + failed_count += 1 + self.stdout.write(self.style.ERROR(f"FAILED {experiment.name}: {str(e)}")) + + self.stdout.write( + self.style.SUCCESS(f"\nMigration is complete!: {converted_count} succeeded, {failed_count} failed") + ) + + def _get_experiment_type(self, experiment): + if experiment.assistant: + return "Assistant" + elif experiment.llm_provider: + return "LLM" + else: + return "Unknown" + + def _convert_experiment(self, experiment): + experiment_type = self._get_experiment_type(experiment) + + if experiment_type == "Assistant": + pipeline = self._create_assistant_pipeline(experiment) + elif experiment_type == "LLM": + pipeline = self._create_llm_pipeline(experiment) + else: + raise ValueError(f"Unknown experiment type for experiment {experiment.id}: {experiment_type}") + + experiment.pipeline = pipeline + + # Clear the old type-specific fields + if experiment_type == "Assistant": + experiment.assistant = None + elif experiment_type == "LLM": + experiment.llm_provider = None + experiment.llm_provider_model = None + + experiment.save() + + def _create_assistant_pipeline(self, experiment): + """Create a start -> AssistantNode -> end nodes pipeline for an assistant experiment.""" + pipeline_name = f"{experiment.name} Pipeline" + pipeline = Pipeline.objects.create(team=experiment.team, name=pipeline_name, data={"nodes": [], "edges": []}) + start_id = str(uuid4()) + assistant_id = str(uuid4()) + end_id = str(uuid4()) + + start_node = FlowNode( + id=start_id, + type="startNode", + position={"x": 100, "y": 200}, + data=FlowNodeData(id=start_id, type=StartNode.__name__, params={"name": "start"}), + ) + + assistant_node = FlowNode( + id=assistant_id, + type="pipelineNode", + position={"x": 400, "y": 200}, + data=FlowNodeData( + id=assistant_id, + type=AssistantNode.__name__, + label="OpenAI Assistant", + params={ + "name": "assistant", + "assistant_id": str(experiment.assistant.id), + "citations_enabled": experiment.citations_enabled, + "input_formatter": experiment.input_formatter or "", + }, + ), + ) + + end_node = FlowNode( + id=end_id, + type="endNode", + position={"x": 700, "y": 200}, + data=FlowNodeData(id=end_id, type=EndNode.__name__, params={"name": "end"}), + ) + edges = [ + { + "id": f"edge-{start_id}-{assistant_id}", + "source": start_id, + "target": assistant_id, + "sourceHandle": "output", + "targetHandle": "input", + }, + { + "id": f"edge-{assistant_id}-{end_id}", + "source": assistant_id, + "target": end_id, + "sourceHandle": "output", + "targetHandle": "input", + }, + ] + pipeline.data = { + "nodes": [start_node.model_dump(), assistant_node.model_dump(), end_node.model_dump()], + "edges": edges, + } + pipeline.save() + pipeline.update_nodes_from_data() + + return pipeline + + def _get_chatbots_flag_team_ids(self): + chatbots_flag = Flag.objects.get(name="flag_chatbots") + return list(chatbots_flag.teams.values_list("id", flat=True)) + + def _create_llm_pipeline(self, experiment): + """Create a start -> LLMResponseWithPrompt -> end nodes pipeline for an LLM experiment.""" + pipeline_name = f"{experiment.name} Pipeline" + pipeline = Pipeline.objects.create(team=experiment.team, name=pipeline_name, data={"nodes": [], "edges": []}) + start_id = str(uuid4()) + llm_id = str(uuid4()) + end_id = str(uuid4()) + + start_node = FlowNode( + id=start_id, + type="startNode", + position={"x": 100, "y": 200}, + data=FlowNodeData(id=start_id, type=StartNode.__name__, params={"name": "start"}), + ) + llm_params = { + "name": "llm", + "llm_provider_id": experiment.llm_provider.id, + "llm_provider_model_id": experiment.llm_provider_model.id, + "llm_temperature": experiment.temperature, + "history_type": "none", + "history_name": None, + "history_mode": "summarize", + "user_max_token_limit": experiment.llm_provider_model.max_token_limit, + "max_history_length": 10, + "source_material_id": experiment.source_material.id if experiment.source_material else None, + "prompt": experiment.prompt_text or "", + "tools": list(experiment.tools) if experiment.tools else [], + "custom_actions": [], + "built_in_tools": [], + "tool_config": {}, + } + + llm_node = FlowNode( + id=llm_id, + type="pipelineNode", + position={"x": 400, "y": 200}, + data=FlowNodeData(id=llm_id, type=LLMResponseWithPrompt.__name__, label="LLM", params=llm_params), + ) + + end_node = FlowNode( + id=end_id, + type="endNode", + position={"x": 700, "y": 200}, + data=FlowNodeData(id=end_id, type=EndNode.__name__, params={"name": "end"}), + ) + edges = [ + { + "id": f"edge-{start_id}-{llm_id}", + "source": start_id, + "target": llm_id, + "sourceHandle": "output", + "targetHandle": "input", + }, + { + "id": f"edge-{llm_id}-{end_id}", + "source": llm_id, + "target": end_id, + "sourceHandle": "output", + "targetHandle": "input", + }, + ] + pipeline.data = { + "nodes": [start_node.model_dump(), llm_node.model_dump(), end_node.model_dump()], + "edges": edges, + } + pipeline.save() + pipeline.update_nodes_from_data() + + return pipeline From 81a8618896f14a156a93c52dc4936277c8dcf998 Mon Sep 17 00:00:00 2001 From: Steph Herbers Date: Thu, 5 Jun 2025 18:22:40 -0400 Subject: [PATCH 02/18] DRY it up --- ...ate_nonpipeline_to_pipeline_experiments.py | 105 ++++++------------ 1 file changed, 35 insertions(+), 70 deletions(-) diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py index ee7e34ea0..922f16187 100644 --- a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -128,12 +128,16 @@ def _convert_experiment(self, experiment): experiment.save() - def _create_assistant_pipeline(self, experiment): - """Create a start -> AssistantNode -> end nodes pipeline for an assistant experiment.""" + def _get_chatbots_flag_team_ids(self): + chatbots_flag = Flag.objects.get(name="flag_chatbots") + return list(chatbots_flag.teams.values_list("id", flat=True)) + + def _create_pipeline_with_node(self, experiment, node_type, node_label, node_params): + """Create a pipeline with start -> custom_node -> end structure.""" pipeline_name = f"{experiment.name} Pipeline" pipeline = Pipeline.objects.create(team=experiment.team, name=pipeline_name, data={"nodes": [], "edges": []}) start_id = str(uuid4()) - assistant_id = str(uuid4()) + middle_id = str(uuid4()) end_id = str(uuid4()) start_node = FlowNode( @@ -143,20 +147,15 @@ def _create_assistant_pipeline(self, experiment): data=FlowNodeData(id=start_id, type=StartNode.__name__, params={"name": "start"}), ) - assistant_node = FlowNode( - id=assistant_id, + middle_node = FlowNode( + id=middle_id, type="pipelineNode", position={"x": 400, "y": 200}, data=FlowNodeData( - id=assistant_id, - type=AssistantNode.__name__, - label="OpenAI Assistant", - params={ - "name": "assistant", - "assistant_id": str(experiment.assistant.id), - "citations_enabled": experiment.citations_enabled, - "input_formatter": experiment.input_formatter or "", - }, + id=middle_id, + type=node_type, + label=node_label, + params=node_params, ), ) @@ -168,22 +167,22 @@ def _create_assistant_pipeline(self, experiment): ) edges = [ { - "id": f"edge-{start_id}-{assistant_id}", + "id": f"edge-{start_id}-{middle_id}", "source": start_id, - "target": assistant_id, + "target": middle_id, "sourceHandle": "output", "targetHandle": "input", }, { - "id": f"edge-{assistant_id}-{end_id}", - "source": assistant_id, + "id": f"edge-{middle_id}-{end_id}", + "source": middle_id, "target": end_id, "sourceHandle": "output", "targetHandle": "input", }, ] pipeline.data = { - "nodes": [start_node.model_dump(), assistant_node.model_dump(), end_node.model_dump()], + "nodes": [start_node.model_dump(), middle_node.model_dump(), end_node.model_dump()], "edges": edges, } pipeline.save() @@ -191,24 +190,8 @@ def _create_assistant_pipeline(self, experiment): return pipeline - def _get_chatbots_flag_team_ids(self): - chatbots_flag = Flag.objects.get(name="flag_chatbots") - return list(chatbots_flag.teams.values_list("id", flat=True)) - def _create_llm_pipeline(self, experiment): """Create a start -> LLMResponseWithPrompt -> end nodes pipeline for an LLM experiment.""" - pipeline_name = f"{experiment.name} Pipeline" - pipeline = Pipeline.objects.create(team=experiment.team, name=pipeline_name, data={"nodes": [], "edges": []}) - start_id = str(uuid4()) - llm_id = str(uuid4()) - end_id = str(uuid4()) - - start_node = FlowNode( - id=start_id, - type="startNode", - position={"x": 100, "y": 200}, - data=FlowNodeData(id=start_id, type=StartNode.__name__, params={"name": "start"}), - ) llm_params = { "name": "llm", "llm_provider_id": experiment.llm_provider.id, @@ -227,40 +210,22 @@ def _create_llm_pipeline(self, experiment): "tool_config": {}, } - llm_node = FlowNode( - id=llm_id, - type="pipelineNode", - position={"x": 400, "y": 200}, - data=FlowNodeData(id=llm_id, type=LLMResponseWithPrompt.__name__, label="LLM", params=llm_params), - ) - - end_node = FlowNode( - id=end_id, - type="endNode", - position={"x": 700, "y": 200}, - data=FlowNodeData(id=end_id, type=EndNode.__name__, params={"name": "end"}), + return self._create_pipeline_with_node( + experiment=experiment, node_type=LLMResponseWithPrompt.__name__, node_label="LLM", node_params=llm_params ) - edges = [ - { - "id": f"edge-{start_id}-{llm_id}", - "source": start_id, - "target": llm_id, - "sourceHandle": "output", - "targetHandle": "input", - }, - { - "id": f"edge-{llm_id}-{end_id}", - "source": llm_id, - "target": end_id, - "sourceHandle": "output", - "targetHandle": "input", - }, - ] - pipeline.data = { - "nodes": [start_node.model_dump(), llm_node.model_dump(), end_node.model_dump()], - "edges": edges, - } - pipeline.save() - pipeline.update_nodes_from_data() - return pipeline + def _create_assistant_pipeline(self, experiment): + """Create a start -> AssistantNode -> end nodes pipeline for an assistant experiment.""" + assistant_params = { + "name": "assistant", + "assistant_id": str(experiment.assistant.id), + "citations_enabled": experiment.citations_enabled, + "input_formatter": experiment.input_formatter or "", + } + + return self._create_pipeline_with_node( + experiment=experiment, + node_type=AssistantNode.__name__, + node_label="OpenAI Assistant", + node_params=assistant_params, + ) From b813438b88e987106fc6dc4dbe44d8d4031b8863 Mon Sep 17 00:00:00 2001 From: Steph Herbers <36681924+stephherbers@users.noreply.github.com> Date: Mon, 9 Jun 2025 18:25:02 -0400 Subject: [PATCH 03/18] clear all fields Co-authored-by: Simon Kelly --- .../migrate_nonpipeline_to_pipeline_experiments.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py index 922f16187..b7bfb61d8 100644 --- a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -120,11 +120,9 @@ def _convert_experiment(self, experiment): experiment.pipeline = pipeline # Clear the old type-specific fields - if experiment_type == "Assistant": - experiment.assistant = None - elif experiment_type == "LLM": - experiment.llm_provider = None - experiment.llm_provider_model = None + experiment.assistant = None + experiment.llm_provider = None + experiment.llm_provider_model = None experiment.save() From 3cc9ee03f3bcf1b6e5418230d337487da9190738 Mon Sep 17 00:00:00 2001 From: Steph Herbers <36681924+stephherbers@users.noreply.github.com> Date: Mon, 9 Jun 2025 18:25:24 -0400 Subject: [PATCH 04/18] add custom actions Co-authored-by: Simon Kelly --- .../commands/migrate_nonpipeline_to_pipeline_experiments.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py index b7bfb61d8..da6df7f36 100644 --- a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -203,7 +203,7 @@ def _create_llm_pipeline(self, experiment): "source_material_id": experiment.source_material.id if experiment.source_material else None, "prompt": experiment.prompt_text or "", "tools": list(experiment.tools) if experiment.tools else [], - "custom_actions": [], + "custom_actions": [op.get_model_id(False) for op in experiment.custom_action_operations.select_related("custom_action").all()], "built_in_tools": [], "tool_config": {}, } From bbbdc2637026e0434ab4ea2c593de849617f01d7 Mon Sep 17 00:00:00 2001 From: Steph Herbers <36681924+stephherbers@users.noreply.github.com> Date: Mon, 9 Jun 2025 18:25:41 -0400 Subject: [PATCH 05/18] clean logic Co-authored-by: Simon Kelly --- .../commands/migrate_nonpipeline_to_pipeline_experiments.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py index da6df7f36..7c3e7ae1f 100644 --- a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -108,11 +108,9 @@ def _get_experiment_type(self, experiment): return "Unknown" def _convert_experiment(self, experiment): - experiment_type = self._get_experiment_type(experiment) - - if experiment_type == "Assistant": + if experiment.assistant: pipeline = self._create_assistant_pipeline(experiment) - elif experiment_type == "LLM": + elif experiment.llm_provider: pipeline = self._create_llm_pipeline(experiment) else: raise ValueError(f"Unknown experiment type for experiment {experiment.id}: {experiment_type}") From 2f03af3ec4537069e7c76c0d036c7c24e822110e Mon Sep 17 00:00:00 2001 From: Steph Herbers Date: Mon, 9 Jun 2025 21:05:19 -0400 Subject: [PATCH 06/18] add more to x position for output node --- .../migrate_nonpipeline_to_pipeline_experiments.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py index 7c3e7ae1f..a4a4ed0a3 100644 --- a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -113,7 +113,7 @@ def _convert_experiment(self, experiment): elif experiment.llm_provider: pipeline = self._create_llm_pipeline(experiment) else: - raise ValueError(f"Unknown experiment type for experiment {experiment.id}: {experiment_type}") + raise ValueError(f"Unknown experiment type for experiment {experiment.id}") experiment.pipeline = pipeline @@ -158,7 +158,7 @@ def _create_pipeline_with_node(self, experiment, node_type, node_label, node_par end_node = FlowNode( id=end_id, type="endNode", - position={"x": 700, "y": 200}, + position={"x": 800, "y": 200}, data=FlowNodeData(id=end_id, type=EndNode.__name__, params={"name": "end"}), ) edges = [ @@ -201,7 +201,10 @@ def _create_llm_pipeline(self, experiment): "source_material_id": experiment.source_material.id if experiment.source_material else None, "prompt": experiment.prompt_text or "", "tools": list(experiment.tools) if experiment.tools else [], - "custom_actions": [op.get_model_id(False) for op in experiment.custom_action_operations.select_related("custom_action").all()], + "custom_actions": [ + op.get_model_id(False) + for op in experiment.custom_action_operations.select_related("custom_action").all() + ], "built_in_tools": [], "tool_config": {}, } From b8888b5d712798a4b6b4ce818245df8b5473afa3 Mon Sep 17 00:00:00 2001 From: Steph Herbers Date: Tue, 10 Jun 2025 16:11:39 -0400 Subject: [PATCH 07/18] refactor pipeline create_default --- apps/pipelines/models.py | 129 ++++++++++++++++++++++----------------- 1 file changed, 72 insertions(+), 57 deletions(-) diff --git a/apps/pipelines/models.py b/apps/pipelines/models.py index 77d564da7..60d4e7006 100644 --- a/apps/pipelines/models.py +++ b/apps/pipelines/models.py @@ -114,45 +114,83 @@ def version_display(self) -> str: return "" return f"v{self.version_number}" + @classmethod + def _create_pipeline_with_nodes(cls, team, name, middle_nodes_config=None): + """ + Create a pipeline with start -> middle node -> end structure. + """ + from apps.pipelines.nodes.nodes import EndNode, StartNode + + start_node_config = { + "id": str(uuid4()), + "type": "startNode", + "position": {"x": 100, "y": 200}, + "data": {"type": StartNode.__name__, "params": {"name": "start"}}, + } + end_node_config = { + "id": str(uuid4()), + "type": "endNode", + "position": {"x": 800, "y": 200}, + "data": {"type": EndNode.__name__, "params": {"name": "end"}}, + } + all_nodes_config = [start_node_config] + if middle_nodes_config: + all_nodes_config.append(middle_nodes_config) + all_nodes_config.append(end_node_config) + + flow_nodes = [] + for node_config in all_nodes_config: + flow_node = FlowNode( + id=node_config["id"], + type=node_config["type"], + position=node_config["position"], + data=FlowNodeData( + id=node_config["id"], + type=node_config["data"]["type"], + label=node_config["data"].get("label", ""), + params=node_config["data"]["params"], + ), + ) + flow_nodes.append(flow_node) + edges = [] + if middle_nodes_config: + for i in range(len(flow_nodes) - 1): + current_node = flow_nodes[i] + next_node = flow_nodes[i + 1] + edge = { + "id": f"edge-{current_node.id}-{next_node.id}", + "source": current_node.id, + "target": next_node.id, + "sourceHandle": "output", + "targetHandle": "input", + } + edges.append(edge) + + pipeline = cls.objects.create( + team=team, name=name, data={"nodes": [node.model_dump() for node in flow_nodes], "edges": edges} + ) + pipeline.update_nodes_from_data() + return pipeline + @classmethod def create_default_pipeline_with_name(cls, team, name, llm_provider_id=None, llm_provider_model=None): return cls.create_default(team, name, llm_provider_id, llm_provider_model) @classmethod def create_default(cls, team, name=None, llm_provider_id=None, llm_provider_model=None): - from apps.pipelines.nodes.nodes import EndNode, StartNode - default_name = "New Pipeline" if name is None else name existing_pipeline_count = cls.objects.filter(team=team, name__startswith=default_name).count() - start_id = str(uuid4()) - start_node = FlowNode( - id=start_id, - type="startNode", - position={ - "x": -200, - "y": 200, - }, - data=FlowNodeData(id=start_id, type=StartNode.__name__, params={"name": "start"}), - ) - end_id = str(uuid4()) - end_node = FlowNode( - id=end_id, - type="endNode", - position={"x": 1000, "y": 200}, - data=FlowNodeData(id=end_id, type=EndNode.__name__, params={"name": "end"}), - ) if llm_provider_id and llm_provider_model: llm_id = f"LLMResponseWithPrompt-{uuid4().hex[:5]}" - llm_node = FlowNode( - id=llm_id, - type="pipelineNode", - position={"x": 300, "y": 0}, - data=FlowNodeData( - id=llm_id, - type="LLMResponseWithPrompt", - label="LLM", - params={ + llm_node_config = { + "id": llm_id, + "type": "pipelineNode", + "position": {"x": 300, "y": 0}, + "data": { + "type": "LLMResponseWithPrompt", + "label": "LLM", + "params": { "name": llm_id, "llm_provider_id": llm_provider_id, "llm_provider_model_id": llm_provider_model.id, @@ -168,38 +206,15 @@ def create_default(cls, team, name=None, llm_provider_id=None, llm_provider_mode "custom_actions": None, "keywords": [""], }, - ), - ) - edges = [ - { - "id": f"edge-{start_id}-{llm_id}", - "source": start_id, - "target": llm_id, - "sourceHandle": "output", - "targetHandle": "input", - }, - { - "id": f"edge-{llm_id}-{end_id}", - "source": llm_id, - "target": end_id, - "sourceHandle": "output", - "targetHandle": "input", }, - ] - else: - llm_node = None - edges = [] - default_nodes = [start_node.model_dump()] - if llm_node: - default_nodes.append(llm_node.model_dump()) - default_nodes.append(end_node.model_dump()) - new_pipeline = cls.objects.create( + } + + final_name = default_name if name else f"New Pipeline {existing_pipeline_count + 1}" + return cls._create_pipeline_with_nodes( team=team, - data={"nodes": default_nodes, "edges": edges}, - name=default_name if name else f"New Pipeline {existing_pipeline_count + 1}", + name=final_name, + middle_nodes_config=llm_node_config if llm_provider_id and llm_provider_model else None, ) - new_pipeline.update_nodes_from_data() - return new_pipeline def get_absolute_url(self): return reverse("pipelines:edit", args=[self.team.slug, self.id]) From a013f22b885cf3e06d96e6bba58c839a98c0063c Mon Sep 17 00:00:00 2001 From: Steph Herbers Date: Tue, 10 Jun 2025 17:15:02 -0400 Subject: [PATCH 08/18] use _create_pipeline_with_nodes, filter by published vrion and if they don't have one then working version --- ...ate_nonpipeline_to_pipeline_experiments.py | 109 ++++++------------ 1 file changed, 36 insertions(+), 73 deletions(-) diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py index a4a4ed0a3..34e9243c1 100644 --- a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -5,9 +5,8 @@ from django.db.models import Q from apps.experiments.models import Experiment -from apps.pipelines.flow import FlowNode, FlowNodeData from apps.pipelines.models import Pipeline -from apps.pipelines.nodes.nodes import AssistantNode, EndNode, LLMResponseWithPrompt, StartNode +from apps.pipelines.nodes.nodes import AssistantNode, LLMResponseWithPrompt from apps.teams.models import Flag @@ -58,7 +57,17 @@ def handle(self, *args, **options): query &= Q(team_id__in=chatbots_flag_team_ids) self.stdout.write(f"Filtering to teams with 'flag_chatbots' FF ({len(chatbots_flag_team_ids)} teams)") - experiments_to_convert = Experiment.objects.filter(query).select_related( + default_experiments = Experiment.objects.filter(query & Q(is_default_version=True)) + default_working_version_ids = default_experiments.exclude(working_version__isnull=True).values_list( + "working_version_id", flat=True + ) + + working_experiments = Experiment.objects.filter(query & Q(working_version__isnull=True)).exclude( + id__in=default_working_version_ids + ) + combined_ids = list(default_experiments.union(working_experiments).values_list("id", flat=True)) + + experiments_to_convert = Experiment.objects.filter(id__in=combined_ids).select_related( "team", "assistant", "llm_provider", "llm_provider_model" ) @@ -116,8 +125,6 @@ def _convert_experiment(self, experiment): raise ValueError(f"Unknown experiment type for experiment {experiment.id}") experiment.pipeline = pipeline - - # Clear the old type-specific fields experiment.assistant = None experiment.llm_provider = None experiment.llm_provider_model = None @@ -131,60 +138,16 @@ def _get_chatbots_flag_team_ids(self): def _create_pipeline_with_node(self, experiment, node_type, node_label, node_params): """Create a pipeline with start -> custom_node -> end structure.""" pipeline_name = f"{experiment.name} Pipeline" - pipeline = Pipeline.objects.create(team=experiment.team, name=pipeline_name, data={"nodes": [], "edges": []}) - start_id = str(uuid4()) - middle_id = str(uuid4()) - end_id = str(uuid4()) - - start_node = FlowNode( - id=start_id, - type="startNode", - position={"x": 100, "y": 200}, - data=FlowNodeData(id=start_id, type=StartNode.__name__, params={"name": "start"}), - ) - - middle_node = FlowNode( - id=middle_id, - type="pipelineNode", - position={"x": 400, "y": 200}, - data=FlowNodeData( - id=middle_id, - type=node_type, - label=node_label, - params=node_params, - ), - ) - - end_node = FlowNode( - id=end_id, - type="endNode", - position={"x": 800, "y": 200}, - data=FlowNodeData(id=end_id, type=EndNode.__name__, params={"name": "end"}), - ) - edges = [ - { - "id": f"edge-{start_id}-{middle_id}", - "source": start_id, - "target": middle_id, - "sourceHandle": "output", - "targetHandle": "input", - }, - { - "id": f"edge-{middle_id}-{end_id}", - "source": middle_id, - "target": end_id, - "sourceHandle": "output", - "targetHandle": "input", - }, - ] - pipeline.data = { - "nodes": [start_node.model_dump(), middle_node.model_dump(), end_node.model_dump()], - "edges": edges, + middle_node_config = { + "id": str(uuid4()), + "type": "pipelineNode", + "position": {"x": 400, "y": 200}, + "data": {"type": node_type, "label": node_label, "params": node_params}, } - pipeline.save() - pipeline.update_nodes_from_data() - return pipeline + return Pipeline._create_pipeline_with_nodes( + team=experiment.team, name=pipeline_name, middle_nodes_config=middle_node_config + ) def _create_llm_pipeline(self, experiment): """Create a start -> LLMResponseWithPrompt -> end nodes pipeline for an LLM experiment.""" @@ -193,7 +156,7 @@ def _create_llm_pipeline(self, experiment): "llm_provider_id": experiment.llm_provider.id, "llm_provider_model_id": experiment.llm_provider_model.id, "llm_temperature": experiment.temperature, - "history_type": "none", + "history_type": "global", "history_name": None, "history_mode": "summarize", "user_max_token_limit": experiment.llm_provider_model.max_token_limit, @@ -213,18 +176,18 @@ def _create_llm_pipeline(self, experiment): experiment=experiment, node_type=LLMResponseWithPrompt.__name__, node_label="LLM", node_params=llm_params ) - def _create_assistant_pipeline(self, experiment): - """Create a start -> AssistantNode -> end nodes pipeline for an assistant experiment.""" - assistant_params = { - "name": "assistant", - "assistant_id": str(experiment.assistant.id), - "citations_enabled": experiment.citations_enabled, - "input_formatter": experiment.input_formatter or "", - } - - return self._create_pipeline_with_node( - experiment=experiment, - node_type=AssistantNode.__name__, - node_label="OpenAI Assistant", - node_params=assistant_params, - ) + def _create_assistant_pipeline(self, experiment): + """Createii a start -> AssistantNode -> end nodes pipeline for an assistant experiment.""" + assistant_params = { + "name": "assistant", + "assistant_id": str(experiment.assistant.id), + "citations_enabled": experiment.citations_enabled, + "input_formatter": experiment.input_formatter or "", + } + + return self._create_pipeline_with_node( + experiment=experiment, + node_type=AssistantNode.__name__, + node_label="OpenAI Assistant", + node_params=assistant_params, + ) From 2398aaabe3928e6d93cef5b49b0cc6dc1f59863f Mon Sep 17 00:00:00 2001 From: Steph Herbers Date: Wed, 11 Jun 2025 11:17:06 -0400 Subject: [PATCH 09/18] pass in FlowNode not dict --- ...ate_nonpipeline_to_pipeline_experiments.py | 24 +++-- apps/pipelines/models.py | 102 ++++++++---------- 2 files changed, 59 insertions(+), 67 deletions(-) diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py index 34e9243c1..4de60db8d 100644 --- a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -5,6 +5,7 @@ from django.db.models import Q from apps.experiments.models import Experiment +from apps.pipelines.flow import FlowNode, FlowNodeData from apps.pipelines.models import Pipeline from apps.pipelines.nodes.nodes import AssistantNode, LLMResponseWithPrompt from apps.teams.models import Flag @@ -138,17 +139,21 @@ def _get_chatbots_flag_team_ids(self): def _create_pipeline_with_node(self, experiment, node_type, node_label, node_params): """Create a pipeline with start -> custom_node -> end structure.""" pipeline_name = f"{experiment.name} Pipeline" - middle_node_config = { - "id": str(uuid4()), - "type": "pipelineNode", - "position": {"x": 400, "y": 200}, - "data": {"type": node_type, "label": node_label, "params": node_params}, - } - return Pipeline._create_pipeline_with_nodes( - team=experiment.team, name=pipeline_name, middle_nodes_config=middle_node_config + node = FlowNode( + id=str(uuid4()), + type="pipelineNode", + position={"x": 400, "y": 200}, + data=FlowNodeData( + id=str(uuid4()), + type=node_type, + label=node_label, + params=node_params, + ), ) + return Pipeline._create_pipeline_with_nodes(team=experiment.team, name=pipeline_name, middle_node=node) + def _create_llm_pipeline(self, experiment): """Create a start -> LLMResponseWithPrompt -> end nodes pipeline for an LLM experiment.""" llm_params = { @@ -171,13 +176,12 @@ def _create_llm_pipeline(self, experiment): "built_in_tools": [], "tool_config": {}, } - return self._create_pipeline_with_node( experiment=experiment, node_type=LLMResponseWithPrompt.__name__, node_label="LLM", node_params=llm_params ) def _create_assistant_pipeline(self, experiment): - """Createii a start -> AssistantNode -> end nodes pipeline for an assistant experiment.""" + """Create a start -> AssistantNode -> end nodes pipeline for an assistant experiment.""" assistant_params = { "name": "assistant", "assistant_id": str(experiment.assistant.id), diff --git a/apps/pipelines/models.py b/apps/pipelines/models.py index 60d4e7006..e635d5dbe 100644 --- a/apps/pipelines/models.py +++ b/apps/pipelines/models.py @@ -115,48 +115,43 @@ def version_display(self) -> str: return f"v{self.version_number}" @classmethod - def _create_pipeline_with_nodes(cls, team, name, middle_nodes_config=None): + def _create_pipeline_with_nodes(cls, team, name, middle_node=None): """ Create a pipeline with start -> middle node -> end structure. """ from apps.pipelines.nodes.nodes import EndNode, StartNode - start_node_config = { - "id": str(uuid4()), - "type": "startNode", - "position": {"x": 100, "y": 200}, - "data": {"type": StartNode.__name__, "params": {"name": "start"}}, - } - end_node_config = { - "id": str(uuid4()), - "type": "endNode", - "position": {"x": 800, "y": 200}, - "data": {"type": EndNode.__name__, "params": {"name": "end"}}, - } - all_nodes_config = [start_node_config] - if middle_nodes_config: - all_nodes_config.append(middle_nodes_config) - all_nodes_config.append(end_node_config) - - flow_nodes = [] - for node_config in all_nodes_config: - flow_node = FlowNode( - id=node_config["id"], - type=node_config["type"], - position=node_config["position"], - data=FlowNodeData( - id=node_config["id"], - type=node_config["data"]["type"], - label=node_config["data"].get("label", ""), - params=node_config["data"]["params"], - ), - ) - flow_nodes.append(flow_node) + start_node = FlowNode( + id=str(uuid4()), + type="startNode", + position={"x": 100, "y": 200}, + data=FlowNodeData( + id=str(uuid4()), + type=StartNode.__name__, + label="", + params={"name": "start"}, + ), + ) + end_node = FlowNode( + id=str(uuid4()), + type="endNode", + position={"x": 800, "y": 200}, + data=FlowNodeData( + id=str(uuid4()), + type=EndNode.__name__, + label="", + params={"name": "end"}, + ), + ) + all_flow_nodes = [start_node] + if middle_node: + all_flow_nodes.append(middle_node) + all_flow_nodes.append(end_node) edges = [] - if middle_nodes_config: - for i in range(len(flow_nodes) - 1): - current_node = flow_nodes[i] - next_node = flow_nodes[i + 1] + if middle_node: + for i in range(len(all_flow_nodes) - 1): + current_node = all_flow_nodes[i] + next_node = all_flow_nodes[i + 1] edge = { "id": f"edge-{current_node.id}-{next_node.id}", "source": current_node.id, @@ -165,32 +160,29 @@ def _create_pipeline_with_nodes(cls, team, name, middle_nodes_config=None): "targetHandle": "input", } edges.append(edge) - pipeline = cls.objects.create( - team=team, name=name, data={"nodes": [node.model_dump() for node in flow_nodes], "edges": edges} + team=team, name=name, data={"nodes": [node.model_dump() for node in all_flow_nodes], "edges": edges} ) pipeline.update_nodes_from_data() return pipeline - @classmethod - def create_default_pipeline_with_name(cls, team, name, llm_provider_id=None, llm_provider_model=None): - return cls.create_default(team, name, llm_provider_id, llm_provider_model) - @classmethod def create_default(cls, team, name=None, llm_provider_id=None, llm_provider_model=None): default_name = "New Pipeline" if name is None else name existing_pipeline_count = cls.objects.filter(team=team, name__startswith=default_name).count() + node = None if llm_provider_id and llm_provider_model: llm_id = f"LLMResponseWithPrompt-{uuid4().hex[:5]}" - llm_node_config = { - "id": llm_id, - "type": "pipelineNode", - "position": {"x": 300, "y": 0}, - "data": { - "type": "LLMResponseWithPrompt", - "label": "LLM", - "params": { + node = FlowNode( + id=llm_id, + type="pipelineNode", + position={"x": 300, "y": 0}, + data=FlowNodeData( + id=llm_id, + type="LLMResponseWithPrompt", + label="LLM", + params={ "name": llm_id, "llm_provider_id": llm_provider_id, "llm_provider_model_id": llm_provider_model.id, @@ -206,15 +198,11 @@ def create_default(cls, team, name=None, llm_provider_id=None, llm_provider_mode "custom_actions": None, "keywords": [""], }, - }, - } + ), + ) final_name = default_name if name else f"New Pipeline {existing_pipeline_count + 1}" - return cls._create_pipeline_with_nodes( - team=team, - name=final_name, - middle_nodes_config=llm_node_config if llm_provider_id and llm_provider_model else None, - ) + return cls._create_pipeline_with_nodes(team=team, name=final_name, middle_node=node) def get_absolute_url(self): return reverse("pipelines:edit", args=[self.team.slug, self.id]) From c82bf07680e0b4fbf16ad1c4dbe82759d97a7726 Mon Sep 17 00:00:00 2001 From: Steph Herbers Date: Wed, 11 Jun 2025 11:52:26 -0400 Subject: [PATCH 10/18] add back in function + lint --- ...ate_nonpipeline_to_pipeline_experiments.py | 44 +++++++++++++------ apps/pipelines/models.py | 4 ++ 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py index 4de60db8d..06bfe0aca 100644 --- a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -47,9 +47,6 @@ def handle(self, *args, **options): if team_slug: query &= Q(team__slug=team_slug) - if experiment_id: - query &= Q(id=experiment_id) - if chatbots_flag_only: chatbots_flag_team_ids = self._get_chatbots_flag_team_ids() if not chatbots_flag_team_ids: @@ -58,19 +55,38 @@ def handle(self, *args, **options): query &= Q(team_id__in=chatbots_flag_team_ids) self.stdout.write(f"Filtering to teams with 'flag_chatbots' FF ({len(chatbots_flag_team_ids)} teams)") - default_experiments = Experiment.objects.filter(query & Q(is_default_version=True)) - default_working_version_ids = default_experiments.exclude(working_version__isnull=True).values_list( - "working_version_id", flat=True - ) + if experiment_id: + query &= Q(id=experiment_id) + experiment = Experiment.objects.filter(query).first() + if not experiment: + self.stdout.write( + self.style.WARNING(f"Experiment {experiment_id} not found or does not need migration.") + ) + return - working_experiments = Experiment.objects.filter(query & Q(working_version__isnull=True)).exclude( - id__in=default_working_version_ids - ) - combined_ids = list(default_experiments.union(working_experiments).values_list("id", flat=True)) + if not (experiment.is_default_version or experiment.is_working_version): + self.stdout.write( + self.style.WARNING( + f"Experiment {experiment_id} is not a published or unreleased version so does not require migration." + ) + ) + return - experiments_to_convert = Experiment.objects.filter(id__in=combined_ids).select_related( - "team", "assistant", "llm_provider", "llm_provider_model" - ) + experiments_to_convert = Experiment.objects.filter(id=experiment.id).select_related( + "team", "assistant", "llm_provider", "llm_provider_model" + ) + else: + default_experiments = Experiment.objects.filter(query & Q(is_default_version=True)) + default_working_version_ids = default_experiments.exclude(working_version__isnull=True).values_list( + "working_version_id", flat=True + ) + working_experiments = Experiment.objects.filter(query & Q(working_version__isnull=True)).exclude( + id__in=default_working_version_ids + ) + combined_ids = list(default_experiments.union(working_experiments).values_list("id", flat=True)) + experiments_to_convert = Experiment.objects.filter(id__in=combined_ids).select_related( + "team", "assistant", "llm_provider", "llm_provider_model" + ) if not experiments_to_convert.exists(): self.stdout.write(self.style.WARNING("No matching experiments found.")) diff --git a/apps/pipelines/models.py b/apps/pipelines/models.py index e635d5dbe..5cb78ad42 100644 --- a/apps/pipelines/models.py +++ b/apps/pipelines/models.py @@ -166,6 +166,10 @@ def _create_pipeline_with_nodes(cls, team, name, middle_node=None): pipeline.update_nodes_from_data() return pipeline + @classmethod + def create_default_pipeline_with_name(cls, team, name, llm_provider_id=None, llm_provider_model=None): + return cls.create_default(team, name, llm_provider_id, llm_provider_model) + @classmethod def create_default(cls, team, name=None, llm_provider_id=None, llm_provider_model=None): default_name = "New Pipeline" if name is None else name From 123ed5e4aebb7318e064965cedaba4715fe022fc Mon Sep 17 00:00:00 2001 From: Steph Herbers Date: Wed, 11 Jun 2025 11:54:55 -0400 Subject: [PATCH 11/18] lint --- .../commands/migrate_nonpipeline_to_pipeline_experiments.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py index 06bfe0aca..a8b68557c 100644 --- a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -67,7 +67,8 @@ def handle(self, *args, **options): if not (experiment.is_default_version or experiment.is_working_version): self.stdout.write( self.style.WARNING( - f"Experiment {experiment_id} is not a published or unreleased version so does not require migration." + f"Experiment {experiment_id} is not a published or unreleased version so does not\ + require migration." ) ) return From 4f5256a8f636fb6b287c51253d62797abcd4b17c Mon Sep 17 00:00:00 2001 From: Steph Herbers Date: Thu, 12 Jun 2025 11:40:03 -0400 Subject: [PATCH 12/18] fix id logic for start and end nodes --- apps/pipelines/models.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/apps/pipelines/models.py b/apps/pipelines/models.py index 5cb78ad42..834a62d58 100644 --- a/apps/pipelines/models.py +++ b/apps/pipelines/models.py @@ -121,23 +121,25 @@ def _create_pipeline_with_nodes(cls, team, name, middle_node=None): """ from apps.pipelines.nodes.nodes import EndNode, StartNode + start_node_id = str(uuid4()) + end_node_id = str(uuid4()) start_node = FlowNode( - id=str(uuid4()), + id=start_node_id, type="startNode", position={"x": 100, "y": 200}, data=FlowNodeData( - id=str(uuid4()), + id=start_node_id, type=StartNode.__name__, label="", params={"name": "start"}, ), ) end_node = FlowNode( - id=str(uuid4()), + id=end_node_id, type="endNode", position={"x": 800, "y": 200}, data=FlowNodeData( - id=str(uuid4()), + id=end_node_id, type=EndNode.__name__, label="", params={"name": "end"}, From 753975c1d6bd3d4e852ca71361936b11634c9e8f Mon Sep 17 00:00:00 2001 From: Steph Herbers Date: Thu, 12 Jun 2025 11:40:14 -0400 Subject: [PATCH 13/18] use iterator on experiments_to_convert --- .../migrate_nonpipeline_to_pipeline_experiments.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py index a8b68557c..943c3c533 100644 --- a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -76,6 +76,7 @@ def handle(self, *args, **options): experiments_to_convert = Experiment.objects.filter(id=experiment.id).select_related( "team", "assistant", "llm_provider", "llm_provider_model" ) + experiment_count = 1 else: default_experiments = Experiment.objects.filter(query & Q(is_default_version=True)) default_working_version_ids = default_experiments.exclude(working_version__isnull=True).values_list( @@ -88,14 +89,15 @@ def handle(self, *args, **options): experiments_to_convert = Experiment.objects.filter(id__in=combined_ids).select_related( "team", "assistant", "llm_provider", "llm_provider_model" ) + experiment_count = experiments_to_convert.count() if not experiments_to_convert.exists(): self.stdout.write(self.style.WARNING("No matching experiments found.")) return - self.stdout.write(f"Found {experiments_to_convert.count()} experiments to migrate:") + self.stdout.write(f"Found {experiment_count} experiments to migrate:") - for experiment in experiments_to_convert: + for experiment in experiments_to_convert.iterator(20): experiment_type = self._get_experiment_type(experiment) team_info = f"{experiment.team.name} ({experiment.team.slug})" self.stdout.write(f"{experiment.name} (ID: {experiment.id}) - Type: {experiment_type} - Team: {team_info}") @@ -111,8 +113,7 @@ def handle(self, *args, **options): converted_count = 0 failed_count = 0 - - for experiment in experiments_to_convert: + for experiment in experiments_to_convert.iterator(20): try: with transaction.atomic(): self._convert_experiment(experiment) From a7cf808b957e285a51468f87ee14c6e31673ab65 Mon Sep 17 00:00:00 2001 From: Steph Herbers Date: Thu, 12 Jun 2025 11:45:54 -0400 Subject: [PATCH 14/18] combine queries --- .../migrate_nonpipeline_to_pipeline_experiments.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py index 943c3c533..68aa20982 100644 --- a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -57,7 +57,11 @@ def handle(self, *args, **options): if experiment_id: query &= Q(id=experiment_id) - experiment = Experiment.objects.filter(query).first() + experiment = ( + Experiment.objects.filter(query) + .select_related("team", "assistant", "llm_provider", "llm_provider_model") + .first() + ) if not experiment: self.stdout.write( self.style.WARNING(f"Experiment {experiment_id} not found or does not need migration.") @@ -73,9 +77,7 @@ def handle(self, *args, **options): ) return - experiments_to_convert = Experiment.objects.filter(id=experiment.id).select_related( - "team", "assistant", "llm_provider", "llm_provider_model" - ) + experiments_to_convert = [experiment] experiment_count = 1 else: default_experiments = Experiment.objects.filter(query & Q(is_default_version=True)) From b28598caa17df291aa4e2c796fc1b995c02cf078 Mon Sep 17 00:00:00 2001 From: Steph Herbers Date: Fri, 13 Jun 2025 09:44:41 -0400 Subject: [PATCH 15/18] have node_id in FlowNodeData match that of FlowNode in migration as done elsewhere --- .../commands/migrate_nonpipeline_to_pipeline_experiments.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py index 68aa20982..029ea86a2 100644 --- a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -159,13 +159,13 @@ def _get_chatbots_flag_team_ids(self): def _create_pipeline_with_node(self, experiment, node_type, node_label, node_params): """Create a pipeline with start -> custom_node -> end structure.""" pipeline_name = f"{experiment.name} Pipeline" - + node_id = str(uuid4()) node = FlowNode( - id=str(uuid4()), + id=node_id, type="pipelineNode", position={"x": 400, "y": 200}, data=FlowNodeData( - id=str(uuid4()), + id=node_id, type=node_type, label=node_label, params=node_params, From de28dc00b198c1e9387068460c407eabf1a859ca Mon Sep 17 00:00:00 2001 From: Steph Herbers Date: Fri, 13 Jun 2025 09:46:38 -0400 Subject: [PATCH 16/18] simplify logic --- .../commands/migrate_nonpipeline_to_pipeline_experiments.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py index 029ea86a2..5b7cfc393 100644 --- a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -93,7 +93,7 @@ def handle(self, *args, **options): ) experiment_count = experiments_to_convert.count() - if not experiments_to_convert.exists(): + if not experiment_count: self.stdout.write(self.style.WARNING("No matching experiments found.")) return From edfae1ce53a3623152cf113a13e6964ea71b7978 Mon Sep 17 00:00:00 2001 From: Steph Herbers Date: Fri, 13 Jun 2025 10:49:10 -0400 Subject: [PATCH 17/18] refactor: move experiment info logging to a function so looping is only done once --- ...ate_nonpipeline_to_pipeline_experiments.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py index 5b7cfc393..c5ba5cd0d 100644 --- a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -99,12 +99,9 @@ def handle(self, *args, **options): self.stdout.write(f"Found {experiment_count} experiments to migrate:") - for experiment in experiments_to_convert.iterator(20): - experiment_type = self._get_experiment_type(experiment) - team_info = f"{experiment.team.name} ({experiment.team.slug})" - self.stdout.write(f"{experiment.name} (ID: {experiment.id}) - Type: {experiment_type} - Team: {team_info}") - if dry_run: + for experiment in experiments_to_convert.iterator(20): + self._log_experiment_info(experiment) self.stdout.write(self.style.WARNING("\nDry run - no changes will be made.")) return @@ -116,6 +113,7 @@ def handle(self, *args, **options): converted_count = 0 failed_count = 0 for experiment in experiments_to_convert.iterator(20): + self._log_experiment_info(experiment) try: with transaction.atomic(): self._convert_experiment(experiment) @@ -129,13 +127,10 @@ def handle(self, *args, **options): self.style.SUCCESS(f"\nMigration is complete!: {converted_count} succeeded, {failed_count} failed") ) - def _get_experiment_type(self, experiment): - if experiment.assistant: - return "Assistant" - elif experiment.llm_provider: - return "LLM" - else: - return "Unknown" + def _log_experiment_info(self, experiment): + experiment_type = "Assistant" if experiment.assistant else "LLM" if experiment.llm_provider else "Unknown" + team_info = f"{experiment.team.name} ({experiment.team.slug})" + self.stdout.write(f"{experiment.name} (ID: {experiment.id}) - Type: {experiment_type} - Team: {team_info}") def _convert_experiment(self, experiment): if experiment.assistant: From a0219388961cf789a151dedfc66e497469370372 Mon Sep 17 00:00:00 2001 From: Steph Herbers Date: Fri, 13 Jun 2025 11:01:51 -0400 Subject: [PATCH 18/18] check if one exeriment, and then don't use an iterator --- ...ate_nonpipeline_to_pipeline_experiments.py | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py index c5ba5cd0d..4d8d91b00 100644 --- a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -77,7 +77,7 @@ def handle(self, *args, **options): ) return - experiments_to_convert = [experiment] + experiments_to_convert = experiment experiment_count = 1 else: default_experiments = Experiment.objects.filter(query & Q(is_default_version=True)) @@ -100,8 +100,11 @@ def handle(self, *args, **options): self.stdout.write(f"Found {experiment_count} experiments to migrate:") if dry_run: - for experiment in experiments_to_convert.iterator(20): - self._log_experiment_info(experiment) + if experiment_count == 1: + self._log_experiment_info(experiments_to_convert) + else: + for experiment in experiments_to_convert.iterator(20): + self._log_experiment_info(experiment) self.stdout.write(self.style.WARNING("\nDry run - no changes will be made.")) return @@ -112,21 +115,29 @@ def handle(self, *args, **options): converted_count = 0 failed_count = 0 - for experiment in experiments_to_convert.iterator(20): - self._log_experiment_info(experiment) - try: - with transaction.atomic(): - self._convert_experiment(experiment) - converted_count += 1 - self.stdout.write(self.style.SUCCESS(f"Success: {experiment.name}")) - except Exception as e: - failed_count += 1 - self.stdout.write(self.style.ERROR(f"FAILED {experiment.name}: {str(e)}")) - + if experiment_count == 1: + converted_count, failed_count = self._process_expriment( + experiments_to_convert, converted_count, failed_count + ) + else: + for experiment in experiments_to_convert.iterator(20): + converted_count, failed_count = self._process_expriment(experiment, converted_count, failed_count) self.stdout.write( self.style.SUCCESS(f"\nMigration is complete!: {converted_count} succeeded, {failed_count} failed") ) + def _process_expriment(self, experiment, converted_count, failed_count): + self._log_experiment_info(experiment) + try: + with transaction.atomic(): + self._convert_experiment(experiment) + converted_count += 1 + self.stdout.write(self.style.SUCCESS(f"Success: {experiment.name}")) + except Exception as e: + failed_count += 1 + self.stdout.write(self.style.ERROR(f"FAILED {experiment.name}: {str(e)}")) + return converted_count, failed_count + def _log_experiment_info(self, experiment): experiment_type = "Assistant" if experiment.assistant else "LLM" if experiment.llm_provider else "Unknown" team_info = f"{experiment.team.name} ({experiment.team.slug})"