diff --git a/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py new file mode 100644 index 000000000..4d8d91b00 --- /dev/null +++ b/apps/experiments/management/commands/migrate_nonpipeline_to_pipeline_experiments.py @@ -0,0 +1,223 @@ +from uuid import uuid4 + +from django.core.management.base import BaseCommand +from django.db import transaction +from django.db.models import Q + +from apps.experiments.models import Experiment +from apps.pipelines.flow import FlowNode, FlowNodeData +from apps.pipelines.models import Pipeline +from apps.pipelines.nodes.nodes import AssistantNode, LLMResponseWithPrompt +from apps.teams.models import Flag + + +class Command(BaseCommand): + help = "Convert assistant and LLM experiments to pipeline experiments" + + def add_arguments(self, parser): + parser.add_argument( + "--dry-run", + action="store_true", + help="Show what would be converted without making changes", + ) + parser.add_argument( + "--team-slug", + type=str, + help="Only convert experiments for a specific team (by slug)", + ) + parser.add_argument( + "--experiment-id", + type=int, + help="Convert only a specific experiment by ID", + ) + parser.add_argument( + "--chatbots-flag-only", + action="store_true", + help='Only convert experiments for teams that have the "flag_chatbots" feature flag enabled', + ) + + def handle(self, *args, **options): + dry_run = options["dry_run"] + team_slug = options.get("team_slug") + experiment_id = options.get("experiment_id") + chatbots_flag_only = options["chatbots_flag_only"] + + query = Q(pipeline__isnull=True) & (Q(assistant__isnull=False) | Q(llm_provider__isnull=False)) + + if team_slug: + query &= Q(team__slug=team_slug) + + if chatbots_flag_only: + chatbots_flag_team_ids = self._get_chatbots_flag_team_ids() + if not chatbots_flag_team_ids: + self.stdout.write(self.style.WARNING('No teams found with the "flag_chatbots" feature flag enabled.')) + return + query &= Q(team_id__in=chatbots_flag_team_ids) + self.stdout.write(f"Filtering to teams with 'flag_chatbots' FF ({len(chatbots_flag_team_ids)} teams)") + + if experiment_id: + query &= Q(id=experiment_id) + experiment = ( + Experiment.objects.filter(query) + .select_related("team", "assistant", "llm_provider", "llm_provider_model") + .first() + ) + if not experiment: + self.stdout.write( + self.style.WARNING(f"Experiment {experiment_id} not found or does not need migration.") + ) + return + + if not (experiment.is_default_version or experiment.is_working_version): + self.stdout.write( + self.style.WARNING( + f"Experiment {experiment_id} is not a published or unreleased version so does not\ + require migration." + ) + ) + return + + experiments_to_convert = experiment + experiment_count = 1 + else: + default_experiments = Experiment.objects.filter(query & Q(is_default_version=True)) + default_working_version_ids = default_experiments.exclude(working_version__isnull=True).values_list( + "working_version_id", flat=True + ) + working_experiments = Experiment.objects.filter(query & Q(working_version__isnull=True)).exclude( + id__in=default_working_version_ids + ) + combined_ids = list(default_experiments.union(working_experiments).values_list("id", flat=True)) + experiments_to_convert = Experiment.objects.filter(id__in=combined_ids).select_related( + "team", "assistant", "llm_provider", "llm_provider_model" + ) + experiment_count = experiments_to_convert.count() + + if not experiment_count: + self.stdout.write(self.style.WARNING("No matching experiments found.")) + return + + self.stdout.write(f"Found {experiment_count} experiments to migrate:") + + if dry_run: + if experiment_count == 1: + self._log_experiment_info(experiments_to_convert) + else: + for experiment in experiments_to_convert.iterator(20): + self._log_experiment_info(experiment) + self.stdout.write(self.style.WARNING("\nDry run - no changes will be made.")) + return + + confirm = input("\nContinue? (y/N): ") + if confirm.lower() != "y": + self.stdout.write("Cancelled.") + return + + converted_count = 0 + failed_count = 0 + if experiment_count == 1: + converted_count, failed_count = self._process_expriment( + experiments_to_convert, converted_count, failed_count + ) + else: + for experiment in experiments_to_convert.iterator(20): + converted_count, failed_count = self._process_expriment(experiment, converted_count, failed_count) + self.stdout.write( + self.style.SUCCESS(f"\nMigration is complete!: {converted_count} succeeded, {failed_count} failed") + ) + + def _process_expriment(self, experiment, converted_count, failed_count): + self._log_experiment_info(experiment) + try: + with transaction.atomic(): + self._convert_experiment(experiment) + converted_count += 1 + self.stdout.write(self.style.SUCCESS(f"Success: {experiment.name}")) + except Exception as e: + failed_count += 1 + self.stdout.write(self.style.ERROR(f"FAILED {experiment.name}: {str(e)}")) + return converted_count, failed_count + + def _log_experiment_info(self, experiment): + experiment_type = "Assistant" if experiment.assistant else "LLM" if experiment.llm_provider else "Unknown" + team_info = f"{experiment.team.name} ({experiment.team.slug})" + self.stdout.write(f"{experiment.name} (ID: {experiment.id}) - Type: {experiment_type} - Team: {team_info}") + + def _convert_experiment(self, experiment): + if experiment.assistant: + pipeline = self._create_assistant_pipeline(experiment) + elif experiment.llm_provider: + pipeline = self._create_llm_pipeline(experiment) + else: + raise ValueError(f"Unknown experiment type for experiment {experiment.id}") + + experiment.pipeline = pipeline + experiment.assistant = None + experiment.llm_provider = None + experiment.llm_provider_model = None + + experiment.save() + + def _get_chatbots_flag_team_ids(self): + chatbots_flag = Flag.objects.get(name="flag_chatbots") + return list(chatbots_flag.teams.values_list("id", flat=True)) + + def _create_pipeline_with_node(self, experiment, node_type, node_label, node_params): + """Create a pipeline with start -> custom_node -> end structure.""" + pipeline_name = f"{experiment.name} Pipeline" + node_id = str(uuid4()) + node = FlowNode( + id=node_id, + type="pipelineNode", + position={"x": 400, "y": 200}, + data=FlowNodeData( + id=node_id, + type=node_type, + label=node_label, + params=node_params, + ), + ) + + return Pipeline._create_pipeline_with_nodes(team=experiment.team, name=pipeline_name, middle_node=node) + + def _create_llm_pipeline(self, experiment): + """Create a start -> LLMResponseWithPrompt -> end nodes pipeline for an LLM experiment.""" + llm_params = { + "name": "llm", + "llm_provider_id": experiment.llm_provider.id, + "llm_provider_model_id": experiment.llm_provider_model.id, + "llm_temperature": experiment.temperature, + "history_type": "global", + "history_name": None, + "history_mode": "summarize", + "user_max_token_limit": experiment.llm_provider_model.max_token_limit, + "max_history_length": 10, + "source_material_id": experiment.source_material.id if experiment.source_material else None, + "prompt": experiment.prompt_text or "", + "tools": list(experiment.tools) if experiment.tools else [], + "custom_actions": [ + op.get_model_id(False) + for op in experiment.custom_action_operations.select_related("custom_action").all() + ], + "built_in_tools": [], + "tool_config": {}, + } + return self._create_pipeline_with_node( + experiment=experiment, node_type=LLMResponseWithPrompt.__name__, node_label="LLM", node_params=llm_params + ) + + def _create_assistant_pipeline(self, experiment): + """Create a start -> AssistantNode -> end nodes pipeline for an assistant experiment.""" + assistant_params = { + "name": "assistant", + "assistant_id": str(experiment.assistant.id), + "citations_enabled": experiment.citations_enabled, + "input_formatter": experiment.input_formatter or "", + } + + return self._create_pipeline_with_node( + experiment=experiment, + node_type=AssistantNode.__name__, + node_label="OpenAI Assistant", + node_params=assistant_params, + ) diff --git a/apps/pipelines/models.py b/apps/pipelines/models.py index 77d564da7..834a62d58 100644 --- a/apps/pipelines/models.py +++ b/apps/pipelines/models.py @@ -115,36 +115,72 @@ def version_display(self) -> str: return f"v{self.version_number}" @classmethod - def create_default_pipeline_with_name(cls, team, name, llm_provider_id=None, llm_provider_model=None): - return cls.create_default(team, name, llm_provider_id, llm_provider_model) - - @classmethod - def create_default(cls, team, name=None, llm_provider_id=None, llm_provider_model=None): + def _create_pipeline_with_nodes(cls, team, name, middle_node=None): + """ + Create a pipeline with start -> middle node -> end structure. + """ from apps.pipelines.nodes.nodes import EndNode, StartNode - default_name = "New Pipeline" if name is None else name - existing_pipeline_count = cls.objects.filter(team=team, name__startswith=default_name).count() - - start_id = str(uuid4()) + start_node_id = str(uuid4()) + end_node_id = str(uuid4()) start_node = FlowNode( - id=start_id, + id=start_node_id, type="startNode", - position={ - "x": -200, - "y": 200, - }, - data=FlowNodeData(id=start_id, type=StartNode.__name__, params={"name": "start"}), + position={"x": 100, "y": 200}, + data=FlowNodeData( + id=start_node_id, + type=StartNode.__name__, + label="", + params={"name": "start"}, + ), ) - end_id = str(uuid4()) end_node = FlowNode( - id=end_id, + id=end_node_id, type="endNode", - position={"x": 1000, "y": 200}, - data=FlowNodeData(id=end_id, type=EndNode.__name__, params={"name": "end"}), + position={"x": 800, "y": 200}, + data=FlowNodeData( + id=end_node_id, + type=EndNode.__name__, + label="", + params={"name": "end"}, + ), + ) + all_flow_nodes = [start_node] + if middle_node: + all_flow_nodes.append(middle_node) + all_flow_nodes.append(end_node) + edges = [] + if middle_node: + for i in range(len(all_flow_nodes) - 1): + current_node = all_flow_nodes[i] + next_node = all_flow_nodes[i + 1] + edge = { + "id": f"edge-{current_node.id}-{next_node.id}", + "source": current_node.id, + "target": next_node.id, + "sourceHandle": "output", + "targetHandle": "input", + } + edges.append(edge) + pipeline = cls.objects.create( + team=team, name=name, data={"nodes": [node.model_dump() for node in all_flow_nodes], "edges": edges} ) + pipeline.update_nodes_from_data() + return pipeline + + @classmethod + def create_default_pipeline_with_name(cls, team, name, llm_provider_id=None, llm_provider_model=None): + return cls.create_default(team, name, llm_provider_id, llm_provider_model) + + @classmethod + def create_default(cls, team, name=None, llm_provider_id=None, llm_provider_model=None): + default_name = "New Pipeline" if name is None else name + existing_pipeline_count = cls.objects.filter(team=team, name__startswith=default_name).count() + + node = None if llm_provider_id and llm_provider_model: llm_id = f"LLMResponseWithPrompt-{uuid4().hex[:5]}" - llm_node = FlowNode( + node = FlowNode( id=llm_id, type="pipelineNode", position={"x": 300, "y": 0}, @@ -170,36 +206,9 @@ def create_default(cls, team, name=None, llm_provider_id=None, llm_provider_mode }, ), ) - edges = [ - { - "id": f"edge-{start_id}-{llm_id}", - "source": start_id, - "target": llm_id, - "sourceHandle": "output", - "targetHandle": "input", - }, - { - "id": f"edge-{llm_id}-{end_id}", - "source": llm_id, - "target": end_id, - "sourceHandle": "output", - "targetHandle": "input", - }, - ] - else: - llm_node = None - edges = [] - default_nodes = [start_node.model_dump()] - if llm_node: - default_nodes.append(llm_node.model_dump()) - default_nodes.append(end_node.model_dump()) - new_pipeline = cls.objects.create( - team=team, - data={"nodes": default_nodes, "edges": edges}, - name=default_name if name else f"New Pipeline {existing_pipeline_count + 1}", - ) - new_pipeline.update_nodes_from_data() - return new_pipeline + + final_name = default_name if name else f"New Pipeline {existing_pipeline_count + 1}" + return cls._create_pipeline_with_nodes(team=team, name=final_name, middle_node=node) def get_absolute_url(self): return reverse("pipelines:edit", args=[self.team.slug, self.id])