Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker container crashes when python task passes outputs with unicode escape sequences #7221

Open
rybandrei2014 opened this issue Feb 6, 2025 · 2 comments
Labels
area/backend Needs backend code changes bug Something isn't working

Comments

@rybandrei2014
Copy link
Contributor

Describe the issue

We have a problem with following flow:

id: flow1
namespace: tutorial
inputs:
- id: csv
  type: FILE
tasks:
- id: op001
  type: io.kestra.plugin.serdes.csv.CsvToIon
  from: "{{inputs.csv}}"
  fieldSeparator: ;
- id: op01
  type: io.kestra.plugin.serdes.json.IonToJson
  from: "{{outputs.op001.uri}}"
- id: op1
  type: io.kestra.plugin.scripts.python.Script
  taskRunner:
    type: io.kestra.plugin.scripts.runner.docker.Docker
  beforeCommands: []
  outputFiles:
  - output.jsonl
  containerImage: ghcr.io/kestra-io/kestrapy
  script: |-
    import csv, json
    from kestra import Kestra
    file_uri = '{{ outputs.op01.uri }}'
    output_url = 'output.jsonl'
    headers = None
    with open(file_uri, 'rb') as f:
        # Open output file
        with open(output_url, 'w') as outfile:
            # Iterates over input json
            for line in f:
                data = json.loads(line)
                if headers is None:
                    headers = data.keys()
                new_data = dict(zip(headers, map(lambda x: ' ' + str(x) + ' ' if isinstance(x, (int, float)) else x, data.values())))
                outfile.write(json.dumps(new_data)+"\n")
    Kestra.outputs({ 'headers': ','.join(headers), 'headers_expression': ','.join(len(headers) * ['?']) })

We expect .csv file as an input. But we managed to upload .xlsx file instead and it has been processed by CsvToIon task and serialized to this broken .ion output

{'PK\x03\x04-\0\b\0\b\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x13\0\0\0[Content_Types].xml�S�n�0\x10����*6�PU\x15�C\x1f�\x16��\x03\\{�X�%����]\a8�R�':"q�cfgfW�d�q�ZCB\x13|��|�*�*h㻆},^�{Va�^K\x1b<4l\v�f��b\x1b\x01+��ذ>�� \x04�\x1e�D\x1e\"xBڐ��tL��R-e\a�v4�\x13*�\f>׹h���\tZ���z��\x17��\x18�Q2S,���H��\v�\x04v�`o\"�\x10�U�\x1bRٵC(2q��qa9S�\x1b"}
{'PK\x03\x04-\0\b\0\b\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x13\0\0\0[Content_Types].xml�S�n�0\x10����*6�PU\x15�C\x1f�\x16��\x03\\{�X�%����]\a8�R�':"&\x19"}

the problem happens in 3rd task that calls python script to extract headers from json output. The problem happens at line Kestra.outputs({ 'headers': ','.join(headers), 'headers_expression': ','.join(len(headers) * ['?']) }) as it sends to Kestra output headers with unicode escape sequences, which causes worker container to crash with the following error message:

org.jooq.exception.DataException: SQL [insert into queues ("value", "type", "key") values (cast(? as jsonb), CAST(? AS queue_type), ?)]; ERROR: unsupported Unicode escape sequence
kestra-1    |   Detail: \u0000 cannot be converted to text.
kestra-1    |   Where: JSON data, line 1: ...s":{"vars":{"csv_hlavicka":"PK\u0003\u0004-\u0000...
kestra-1    |   at org.jooq_3.19.10.POSTGRES.debug(Unknown Source)
kestra-1    |   at org.jooq.impl.Tools.translate(Tools.java:3603)
kestra-1    |   at org.jooq.impl.Tools.translate(Tools.java:3595)
kestra-1    |   at org.jooq.impl.DefaultExecuteContext.sqlException(DefaultExecuteContext.java:827)
kestra-1    |   at org.jooq.impl.AbstractQuery.execute(AbstractQuery.java:362)
kestra-1    |   at org.jooq.impl.AbstractDelegatingQuery.execute(AbstractDelegatingQuery.java:115)
kestra-1    |   at io.kestra.jdbc.runner.JdbcQueue.lambda$produce$0(JdbcQueue.java:111)
kestra-1    |   at org.jooq.impl.DefaultDSLContext.lambda$transaction$5(DefaultDSLContext.java:592)
kestra-1    |   at org.jooq.impl.DefaultDSLContext.lambda$transactionResult0$3(DefaultDSLContext.java:530)
kestra-1    |   at org.jooq.impl.Tools$3$1.block(Tools.java:6325)
kestra-1    |   at java.base/java.util.concurrent.ForkJoinPool.unmanagedBlock(Unknown Source)
kestra-1    |   at java.base/java.util.concurrent.ForkJoinPool.managedBlock(Unknown Source)
kestra-1    |   at org.jooq.impl.Tools$3.get(Tools.java:6322)
kestra-1    |   at org.jooq.impl.DefaultDSLContext.transactionResult0(DefaultDSLContext.java:578)
kestra-1    |   at org.jooq.impl.DefaultDSLContext.transactionResult(DefaultDSLContext.java:502)
kestra-1    |   at org.jooq.impl.DefaultDSLContext.transaction(DefaultDSLContext.java:591)
kestra-1    |   at io.kestra.jdbc.JooqDSLContextWrapper.lambda$transaction$1(JooqDSLContextWrapper.java:58)
kestra-1    |   at dev.failsafe.Functions.lambda$toCtxSupplier$11(Functions.java:243)
kestra-1    |   at dev.failsafe.Functions.lambda$get$0(Functions.java:46)
kestra-1    |   at dev.failsafe.internal.RetryPolicyExecutor.lambda$apply$0(RetryPolicyExecutor.java:74)
kestra-1    |   at dev.failsafe.internal.FallbackExecutor.lambda$apply$0(FallbackExecutor.java:51)
kestra-1    |   at dev.failsafe.SyncExecutionImpl.executeSync(SyncExecutionImpl.java:187)
kestra-1    |   at dev.failsafe.FailsafeExecutor.call(FailsafeExecutor.java:376)
kestra-1    |   at dev.failsafe.FailsafeExecutor.get(FailsafeExecutor.java:112)
kestra-1    |   at io.kestra.core.utils.RetryUtils$Instance.wrap(RetryUtils.java:144)
kestra-1    |   at io.kestra.core.utils.RetryUtils$Instance.runRetryIf(RetryUtils.java:103)
kestra-1    |   at io.kestra.jdbc.JooqDSLContextWrapper.transaction(JooqDSLContextWrapper.java:55)
kestra-1    |   at io.kestra.jdbc.runner.JdbcQueue.produce(JdbcQueue.java:101)
kestra-1    |   at io.kestra.jdbc.runner.JdbcQueue.emit(JdbcQueue.java:121)
kestra-1    |   at io.kestra.core.queues.QueueInterface.emit(QueueInterface.java:11)
kestra-1    |   at io.kestra.core.runners.Worker.run(Worker.java:606)
kestra-1    |   at io.kestra.core.runners.Worker.handleTask(Worker.java:286)
kestra-1    |   at io.kestra.core.runners.Worker.lambda$run$7(Worker.java:241)
kestra-1    |   at io.micrometer.core.instrument.internal.TimedRunnable.run(TimedRunnable.java:49)
kestra-1    |   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
kestra-1    |   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
kestra-1    |   at java.base/java.lang.Thread.run(Unknown Source)
kestra-1    | Caused by: org.postgresql.util.PSQLException: ERROR: unsupported Unicode escape sequence
kestra-1    |   Detail: \u0000 cannot be converted to text.
kestra-1    |   Where: JSON data, line 1: ...s":{"vars":{"csv_hlavicka":"PK\u0003\u0004-\u0000...
kestra-1    |   at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2725)
kestra-1    |   at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2412)
kestra-1    |   at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:371)
kestra-1    |   at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:502)
kestra-1    |   at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:419)
kestra-1    |   at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:194)
kestra-1    |   at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:180)
kestra-1    |   at com.zaxxer.hikari.pool.ProxyPreparedStatement.execute(ProxyPreparedStatement.java:44)
kestra-1    |   at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.execute(HikariProxyPreparedStatement.java)
kestra-1    |   at org.jooq.tools.jdbc.DefaultPreparedStatement.execute(DefaultPreparedStatement.java:219)
kestra-1    |   at org.jooq.impl.AbstractQuery.execute(AbstractQuery.java:458)
kestra-1    |   at org.jooq.impl.AbstractDMLQuery.execute(AbstractDMLQuery.java:1068)
kestra-1    |   at org.jooq.impl.AbstractQuery.execute(AbstractQuery.java:348)
kestra-1    |   ... 32 common frames omitted

Environment

  • Kestra Version: 0.20.12
@rybandrei2014 rybandrei2014 added area/backend Needs backend code changes area/frontend Needs frontend code changes bug Something isn't working labels Feb 6, 2025
@github-project-automation github-project-automation bot moved this to Backlog in Issues Feb 6, 2025
@MilosPaunovic MilosPaunovic added area/backend Needs backend code changes and removed area/backend Needs backend code changes area/frontend Needs frontend code changes labels Feb 6, 2025
@anna-geller anna-geller self-assigned this Feb 14, 2025
@anna-geller
Copy link
Member

anna-geller commented Feb 14, 2025

We expect .csv file as an input. But we managed to upload .xlsx file instead and it has been processed by CsvToIon task and serialized to this broken .ion output

hmm, this isn't a kestra issue, right? 😉 the file type doesn't have any expectations about the file extension — the input would happily accept a PNG file as well, it just expects any file and it's up to your task to determine how to process that user's file. We have dedicated tasks to work with Excel.

I'd recommend the following — use conditional logic to use either a CSV or Excel file depending on whether the user chooses to upload a CSV or Excel file:

id: flow1
namespace: tutorial

inputs:
  - id: fileType
    type: SELECT
    values:
      - CSV
      - XLSX
  - id: file
    type: FILE

tasks:
  - id: process_csv
    runIf: "{{ inputs.fileType is 'CSV' }}"
    type: io.kestra.plugin.serdes.csv.CsvToIon
    from: "{{inputs.file}}"
    fieldSeparator: ;

  - id: process_excel
    runIf: "{{ inputs.fileType is 'EXCEL' }}"
    type: io.kestra.plugin.serdes.excel.ExcelToIon
    from: "{{inputs.file}}"
  
  - id: op001
    type: io.kestra.plugin.core.debug.Return
    format: "{{ tasks.process_csv.state != 'SKIPPED' ? outputs.process_csv.uri : outputs.process_excel.uris.Sheet1 }}"

  - id: op01
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{outputs.op001.uri}}"
    
  - id: op1
    type: io.kestra.plugin.scripts.python.Script
    taskRunner:
      type: io.kestra.plugin.scripts.runner.docker.Docker
    beforeCommands: []
    outputFiles:
    - output.jsonl
    containerImage: ghcr.io/kestra-io/kestrapy
    script: |-
      import csv, json
      from kestra import Kestra
      file_uri = '{{ outputs.op01.uri }}'
      output_url = 'output.jsonl'
      headers = None
      with open(file_uri, 'rb') as f:
          # Open output file
          with open(output_url, 'w') as outfile:
              # Iterates over input json
              for line in f:
                  data = json.loads(line)
                  if headers is None:
                      headers = data.keys()
                  new_data = dict(zip(headers, map(lambda x: ' ' + str(x) + ' ' if isinstance(x, (int, float)) else x, data.values())))
                  outfile.write(json.dumps(new_data)+"\n")
      Kestra.outputs({ 'headers': ','.join(headers), 'headers_expression': ','.join(len(headers) * ['?']) })

I'll close the issue for now as the suggested solution above is preferrable than trying to just read and parse incorrect file type — @Skraye feel free to reopen if you have some suggestions on how to handle such possible user errors leading to encoding issues due to mismatched file type

@anna-geller anna-geller closed this as not planned Won't fix, can't repro, duplicate, stale Feb 14, 2025
@github-project-automation github-project-automation bot moved this from Backlog to Done in Issues Feb 14, 2025
@loicmathieu
Copy link
Member

It crashes the Worker itself which is not a good idea.
At least, we should not crash the Worker, even if we didn't succeed in failing the task.

@loicmathieu loicmathieu reopened this Feb 14, 2025
@github-project-automation github-project-automation bot moved this from Done to Backlog in Issues Feb 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/backend Needs backend code changes bug Something isn't working
Projects
Status: Backlog
Development

No branches or pull requests

4 participants