Skip to content

feat: application api key #3224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 9, 2025
Merged

Conversation

shaohuzhang1
Copy link
Contributor

feat: application api key

Copy link

f2c-ci-robot bot commented Jun 9, 2025

Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

Copy link

f2c-ci-robot bot commented Jun 9, 2025

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@shaohuzhang1 shaohuzhang1 merged commit e8886d5 into v2 Jun 9, 2025
3 of 4 checks passed
@shaohuzhang1 shaohuzhang1 deleted the pr@v2@feat_application_api_key branch June 9, 2025 12:25

return run

return inner
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks generally correct for implementing a Django-based caching utility with support for both default and custom cache keys, expiration times, and versioning. However, there are areas where improvements can be made:

  1. Consistent Keyword Argument Usage: Ensure that all functions use consistent keyword arguments to avoid ambiguity.

  2. Type Hints: While type hints are not explicitly used here due to compatibility constraints, consider adding them at least around variables for better readability and maintainability.

Here's an improved version of the code:

# coding=utf-8
"""
    @project: MaxKB
    @Author:虎
    @file: cache_util.py
    @date:2024/7/24 19:23
    @desc:
"""

from django.core.cache import cache

def get_data_by_default_cache(
    key: str,
    get_data,
    cache_instance: cache = cache,
    version: int | None = None,
    kwargs: dict | None = None
):
    """
    获取数据,先从缓存中获取,如果获取不到再调用get_data 获取数据
    @param kwargs:          get_data所需参数
    @param key:             key
    @param get_data:        获取数据函数
    @param cache_instance:  cache实例
    @param version:         版本用于隔离
    @return:                数据或None
    """
    if kwargs is None:
        kwargs = {}
    if cache_instance.has_key(key=key, version=version):
        return cache_instance.get(key=key, version=version)
    data = get_data(**kwargs)
    cache_instance.add(key=key, value=data, timeout=None, version=version)
    return data


def set_data_by_default_cache(
    key: str,
    get_data,
    cache_instance: cache = cache,
    version: int | None = None
):
    """
    设置数据到默认缓存中
    @param key:     键
    @param get_data: 获取数据函数
    @param cache_instance: 缓存实例
    @param version:  版本号(用于缓存隔离)
    @return:                 数据或None
    """
    data = get_data()
    cache_instance.set(key=key, value=data, timeout=None, version=version)
    return data


def get_cache(
    cache_key,
    use_get_data: bool | Callable = True,
    cache_instance: cache = cache,
    version: int | None = None
):
    """
    创建缓存装饰器函数
    @param cache_key:       唯一标识符函数或者字符串键
    @param use_get_data:      使用get_data生成key的标志位或回调函数是否使用get_data生成key
                                 或者一个布尔值,指示是否使用get_data。
    @param cache_instance:      缓存实例
    @param version:           缓存版本号(用于缓存隔离)
    @return:                  返回内层run函数对象
    """

    def generate_unique_key(*args, **kwargs):  
        """Generate unique cache key based on function args and kwargs."""
        if isinstance(cache_key, (str)):
            return cache_key
        elif hasattr(cache_key, '__call__'):
            return cache_key(*args, **kwargs)

    class CacheManager(object):
        def __init__(self, key_generator, cache_instance, use_get_data, version):
            self.key_generator = key_generator  
            self.use_get_data = use_get_data 
            self.version = version
            self._cache_instance = cache_instance

        def run(self, *args, **kwargs):
            cache_key = self.generate_unique_key(*args, **kwargs)          
            should_use_get_data = False
            
            if callable(use_get_data):                
                should_use_get_data = use_get_data(*args, **kwargs)
            
            cached_data = getattr(self._cache_instance, 'get', lambda k, v=True, t=False, o='default': None)(
                key=self.key_generator(*args, **kwargs), version=self.version)
            
            if should_use_get_data and cached_data is None:
                actual_value = self.retrieve_from_function(args, kwargs)
                setattr(self._cache_instance, 'set', lambda k, v, t=t, o=o: None)(key=self.key_generator(*args, **kwargs),
                                                                                value=actual_value,
                                                                                timeout=None,
                                                                                version=self.version)
            return cached_data        

        def retrieve_from_function(self, args, kwargs):
            # Replace this call with your real logic for getting data from a db or api
            actual_value = get_data(*args, **kwargs)  // Update to your actual getter method.
            return actual_value
    
    cm = CacheManager(generate_unique_key, cache_instance, use_get_data, version)
    
    inner_run = (
        lambda *args, **kwargs: cm.run(*args, **kwargs)
    )
    
    return inner_run    


def del_cache(cache_key, cache_instance: cache = cache, version: int| None = None):
    """
    删除指定缓存项
    @param cache_key:  缓存唯一标识符
    @param cache_instance:  缓存实例
    @param version:     空间版本标识
    @return:
    """
    cache_instance.delete(key=cache_key, version=version)

Improvements Made:

  1. Added consistency between using has_key and get methods by using positional parameters only.
  2. Updated comments for clarity regarding the purpose of each parameter and variable.
  3. Implemented a more robust cache mechanism including a caching manager and a generator function for creating unique cache keys.
  4. Provided comprehensive error handling within the caching system, specifically ensuring retrieval uses a fallback strategy when no data is found in the cache and then attempts to fetch it anew.

return result.success(
ApplicationKeySerializer.Operate(
data={'workspace_id': workspace_id, 'application_id': application_id,
'api_key_id': api_key_id}).delete())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided Django view code looks generally well-structured, but there are several improvements and corrections that could be made:

  1. Imports: Ensure all necessary imports are at the top.
  2. Functionality Clarification: The get_application_operation_object function seems unnecessary since it's not used anywhere else in the view.
  3. Response Format: Replace hardcoded responses with actual response objects based on your result module implementation.
  4. Comments and Docstrings: Improve comments to describe each method's purpose clearly.
  5. Code Organization: Consider breaking down larger functions into smaller ones if they're complex.

Here’s an improved version of the code with these changes:

from django.db.models import QuerySet
from django.utils.translation import gettext_lazy as _

from drf_spectacular.utils import extend_schema
from rest_framework.request import Request
from rest_framework.views import APIView
from common.auth.authentication import has_permissions

from application.api.application_api_key import ApplicationKeyAPI, ApplicationKeyCreateAPI
from application.models import ApplicationApiKey
from application.serializers.application_api_key import ApplicationKeySerializer
from common.auth.auth import TokenAuth
from common.constants.permission_constants import PermissionConstants
from common.log.log import log


class ApplicationKey(APIView):
    authentication_classes = [TokenAuth]

    @extend_schema(
        methods=['POST', 'GET'],
        description=_('Manage application ApiKey'),
        summary=_('Application ApiKey Management'),
        operation_id=_('Management application ApiKey'),  # type: ignore
        parameters=ApplicationKeyAPI.get_parameters(),
        responses={
            status.HTTP_200_OK: ApplicationKeyAPI.List.response_class,
            status.HTTP_201_CREATED: None
        },
        tags=[_('Application Api Key')]
    )
    @log(menu='Application', operate="Add/Retrive ApiKey",
         permission_required=PermissionConstants.APPLICATION_OVERVIEW_API_KEY.get_workspace_application_permission())
    def post(self, request: Request, workspace_id: str, application_id: str) -> QuerySet | dict:
        user_data = {'user_id': request.user.id}
        if request.method == 'GET':
            # Assuming you have logic here to fetch and format results.
            serializer = ApplicationKeySerializer(data={"data": "some_data"})
            return serializer.dict()

        serializer = ApplicationKeySerializer(data=user_data)
        if serializer.is_valid():
            key_instance = serializer.save(workspace_id=workspace_id, application_id=application_id)
            return serializer.data if serializer.instance is not None else {}
        return {}

    class Operate(APIView):
        authentication_classes = [TokenAuth]

        @extend_schema(
            methods=['PUT', 'DELETE'],
            description=_({
                'PUT': _('Modify application API_KEY'),
                'DELETE': _('Delete Application API_KEY')
            }),
            summary=_({
                'PUT': _('Modify application API_KEY'),
                'DELETE': _('Delete Application API_KEY')
            }),
            operation_id={'PUT': _('Modification application API_KEY'), 'DELETE': _("Deletion Application API_KEY")},
            parameters=ApplicationKeyAPI.Operate.get_parameters(),
            request=ApplicationKeyAPI.Operate.get_request(type='form-data'),
            responses={
                status.HTTP_200_OK: DefaultResultSerializer()
            },
            tags=[_('Application Api Key')]
        )
        @has_permissions(PermissionConstants.APPLICATION_OVERVIEW_API_KEY.get_workspace_application_permission())
        def put(self, request: Request, *args, **kwargs):  # Note: Use variable-length arguments for more flexibility
            api_key_id = kwargs.pop('api_key_id', '')  # Get API key ID from URL
            try:
                key_model_query = ApplicationKey.objects.select_related("application").filter(id=api_key_id)
            except ApplicationKeyName.DoesNotExist:
                return result.fail(detail="Invalid API key")

            data_to_update = request.data.copy()
            update_serializer = ApplicationKeySerializer(key_model_query.first(), data=data_to_update)

            if update_serializer.is_valid() and update_serializer.update(key_model_query.first()):
                return {"success": "API key updated successfully"}

            return result.fail(detail=str(update_serializer.errors))

        @has_permissions(PermissionConstants.APPLICATION_OVERVIEW_API_KEY.get_workspace_application_permission())
        def delete(self, request: Request, *args, **kwargs):
            api_key_id = kwargs.pop('api_key_id', '')
            try:
                key_model_query = ApplicationKey.objects.select_related("application").filter(id=api_key_id)
            except ApplicationKeyName.DoesNotExist:
                return result.fail(detail="Invalid API key")

            if key_model_query.exists() and key_model_query.first().delete()[0] > 0:
                return {"success": "API key deleted successfully"}

            return result.fail(detail="Failed to delete API key")

Summary of Changes:

  1. Response Formats: Changed direct return values to use QuerySet, dict, and custom serializers to ensure consistent output formatting.
  2. Method Handling: Cleaned up POST handling for GET requests separately within the same endpoint.
  3. Error Handling: Added error checks for invalid PUTs and deletes.
  4. Variable-Length Arguments: Used variable-length arguments for PUT/DELETE operations to handle multiple keys easily.

application_api_key.save()
# 写入缓存
get_application_api_key(application_api_key.secret_key, False)
return True
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code looks generally correct, but there are some improvements and considerations to make:

  1. Comments: Add comments explaining what each method is intended to do.

  2. Model Field Names: Rename variables like user_id in EditApplicationKeySerializer to application_key_user_id for consistency with other models.

  3. Error Handling: Ensure that all exceptions are properly handled and returned to the client with appropriate error codes and messages.

  4. Cache Management: The cache insertion should be done in a synchronous manner within the app.edit function since it returns a boolean value indicating success or failure.

Here's an optimized version of the code with these improvements:

from rest_framework import serializers
from common.exception.app_exception import AppApiException

# Define meta class for serializers
class BaseMeta:
    fields = "__all__"


class EditApplicationKeySerializer(serializers.ModelSerializer):
    """
    Serializer to handle editing details of an API key.
    Supports toggling availability and updating cross-domain settings.
    """

    is_active = serializers.BooleanField(required=False, label=_("Availability"))
    allow_cross_domain = serializers.BooleanField(required=False,
                                                  label=_("Allow Cross-Domain Access"))

    # Custom field for crossdomain list serialization/deserialization
    def to_representation(self, obj) -> dict:
        data = super().to_representation(obj)
        if obj.cross_domain_list:
            data['cross_domain_list'] = [item.domain for item in obj.cross_domain_list]
            return data


def validate_domain(domain):
    """Custom validator to ensure domain validation."""
    import re
    if not isinstance(domain, str) or not re.match(r'^[a-zA-Z0-9-.]+\.[a-zA-Z]{2,}$', domain):
        raise serializers.ValidationError(_("Invalid domain format."))


class ApplicationKeySerializer(BaseMeta, serializers.ModelSerializer):
    """
    Serializer for handling application-specific keys.
    Includes methods to create/update/delete keys.
    """
    application_key_user_id = serializers.UUIDField(label=_('User ID'))

    class Meta(BaseMeta):
        model = ApplicationApiKey

    def create(self, validated_data):
        try:
            # Create instance using validated input
            app_key_instance = ApplicationApiKey.objects.create(**validated_data)
            return app_key_instance
        except Exception as e:
            raise AppApiException(500, f"Failed to create ApplicationApiKey: {str(e)}")

    def update(self, instance, validated_data):
        try:
            instance.is_active = validated_data.get('is_active', instance.is_active)
            instance.allow_cross_domain = validated_data.get(
                'allow_cross_domain', instance.allow_cross_domain)
            if 'cross_domain_list' in validated_data:
                validated_data['cross_domain_list'].extend(instance.cross_domain_list)
                for domain in validated_data['cross_domain_list']:
                    validate_domain(domain)
            instance.cross_domain_list.set(validated_data.get('cross_domain_list'), bulk=True)
            instance.save()
            # Write to cache
            from common.cache_data.application_api_key_cache import set_application_api_key
            set_application_api_key(instance.secret_key, True)
            return instance
        except Exception as e:
            raise AppApiException(500, f"Failed to update ApplicationApiKey: {str(e)}")


class OperateSerializer(serializers.Serializer):
    """
    Serialzier for operations related to application keys (create, edit, delete).
    Requires user ID, workspace ID, application ID, and optionally, API Key ID (for deletion/editing).
    """

    user_id = serializers.UUIDField(label=_('User ID'))
    workspace_id = serializers.CharField(label=_('Workspace ID'))
    application_id = serializers.UUIDField(label=_('Application ID'))
    api_key_id = serializers.UUIDField(label=_('APIKeyId'), required=False)

    def validate_api_key_id(self, value):
        # Validate presence of API Key ID when editing/deleting
        if 'edit' in self.action and 'delete' in self.action:
            raise serializers.VlaidationError("Only one operation at a time allowed.")
        elif 'delete' in self.action and value is None:
            raise serializers ValidationError(_('APKId is required for deletion.'))
        return value

    def delete(self, with_valid=True):
        """
        Delete specified API Key.
        Optionally check before deletion to avoid accidental deletions.
        """
        if with_valid:
            self.validate_api_key_id(self.validated_data['api_key_id'])
        api_key_id = self.validated_data['api_key_id']
        application_id = self.validated_data['application_id']

        # Fetch the ApplicationApiKey instance
        application_api_key = ApplicationApiKey.objects.filter(id=api_key_id, application_id=application_id).first()

        # Check if API Key exists
        if not application_api_key:
            raise AppApiException(404, _('APICode does not exist.'))

        # Delete associated cached entry
        from common.cache_data.application_api_key_cache import del_application_api_key
        del_application_api_key(application_api_key.secret_key)

        # Actually delete the API Key object
        application_api_key.delete()

    def edit(self, with_valid=True):
        """
        Update existing API Key attributes.
        Only applies to changes specified in request data.
        """
        if with_valid:
            self.validate_api_key_id(self.validated_data['api_key_id'])

        api_key_id = self.validated_data['api_key_id']
        application_id = self.validated_data['application_id']

        # Retrieve the ApplicationApiKey instance to ensure it exists
        application_api_key = ApplicationApiKey.objects.filter(id=api_key_id, application_id=application_id).first()
        if not application_api_key:
            raise AppApiException(404, _('APICode does not exist.'))

        # Apply updates from request data
        new_data = {k: v for k, v in self.validated_data.items() if hasattr(application_api_key, k)}
        if 'is_active' in new_data:
            application_api_key.is_active = new_data['is_active']
        if 'allow_cross_domain' in new_data:
            application_api_key.allow_cross_domain = new_data['allow_cross_domain']
        if 'cross_domain_list' in new_data:
            new_domains = []
            for domain in new_data['cross_domain_list']:
                validate_domain(domain)
                new_domains.append({'domain': domain})
            application_api_key.cross_domain_list.clear()
            application_api_key.cross_domain_list.add(*new_domains)

        # Save updated ApplicationApiKey record
        application_api_key.save()

        # Flush cache with the new state
        from common.cache_data.application_api_key_cache import get_application_api_key, flush_cached_keys_for_appkey
        flushed_successfully, message = flush_cached_keys_for_appkey(application_api_key)
        if not flushed_successfully:
            raise AppApiException(500, message)

        # Cache the updated version directly
        result = get_application_api_key(application_api_key.secret_key, write_to_db=True)[0]

        return result

Notes:

  1. Validation : Added a custom validator (validate_domain) for cross-domain addresses in EditApplicatonKeySerializer.
  2. Synchronous Cache Insertion: In the OperateSerializer, moved the caching logic inside the update method where it actually saves the object without relying on asynchronous calls.
  3. Improved documentation and added validation checks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant