Skip to content

JOIN between MQTT Stream Source and LookUp Tables is not working as expected #3736

Open
@canob

Description

@canob

Environment:

  • eKuiper version: 2.1.4 (Docker)
  • Hardware configuration: Intel(R) Xeon(R) CPU E5-2670 v3 @ 2.30GHz
  • OS: Ubuntu 22.04.5 LTS
  • Others: EMQX 5.8.6 as MQTT (Docker)

What happened and what you expected to happen:
I'm trying to do a JOIN between a stream from a MQTT topic and two LookUp Tables (one of SQL source type, and one from HTTPPull source type).

Stream describe:

Fields
--------------------------------------------------------------------------------

CONF_KEY: TEST_emqx
DATASOURCE: CUST1 - Customer 1/crwd_output
FORMAT: json
SHARED: true
TIMESTAMP: date_updated
TIMESTAMP_FORMAT: YYYY-MM-ddTHH:mmZ
TYPE: mqtt

SQL LookUp Table describe:
Fields

--------------------------------------------------------------------------------

CONF_KEY: postgresql_TEST
DATASOURCE: enrich_machine_user
FORMAT: json
KEY: machine
TYPE: sql

HTTPPull LookUp Table describe:

Fields
--------------------------------------------------------------------------------

CONF_KEY: user_info
DATASOURCE: /testing410/user_info
FORMAT: json
KEY: id_user
TYPE: httppull

Individual output examples:

Stream (removed some fields to compact it here):

[
    {
        "cid": "9e250b3eed5344",
        "created_timestamp": "2025-05-29T23:39:55.536722495Z",
        "detection_id": "ldt:054a744b712b14aa1:2170206",
        "device": {
            "bios_manufacturer": "Microsoft Corporation",
            "bios_version": "Hyper-V UEFI Release v4.1",
            "hostname": "DESKTOP-TEST"
        },
        "behaviors": [
            {
                "filename": "powershell.exe"
            }
        ],
        "email_sent": false,
        "date_updated": "2025-05-29T23:43:00Z"
    }
]

SQL LookUp Table:

Image

HTTPPush LookUp Table:
[
{
"id_user": "testing_user",
"name_user": "Usuario de Test",
"machine_user": "DESKTOP-TEST"
},
{
"id_user": "testing_user2",
"name_user": "Usuario de Test 2",
"machine_user": "DESKTOP-TEST2"
}
]

Rule describe:

{
  "id": "CUST1_Crowdstrike_enrich",
  "name": "CUST1_Crowdstrike_enrich",
  "triggered": true,
  "sql": "SELECT *\r\nFROM CUST1_Crowdstrike\r\nLEFT JOIN machine_user\r\nON CUST1_Crowdstrike.device.hostname = machine_user.machine",
  "actions": [
    {
      "file": {
        "bufferLength": 1024,
        "checkInterval": 6000,
        "cleanCacheAtStop": false,
        "enableCache": true,
        "fileType": "lines",
        "format": "json",
        "hasHeader": false,
        "omitIfEmpty": true,
        "path": "/kuiper/output/mqtt_crwd.json",
        "resourceId": "CUST1_crwd",
        "rollingCount": 10000,
        "rollingInterval": 36000,
        "rollingNamePattern": "suffix",
        "runAsync": false,
        "sendSingle": true
      }
    },
    {
      "mqtt": {
        "clientId": "eKuiper_output",
        "insecureSkipVerify": true,
        "protocolVersion": "3.1.1",
        "qos": 2,
        "retained": true,
        "server": "tcp://10.3.6.120:1883",
        "topic": "CUST1 - Customer 1/crwd_output_enriched",
        "omitIfEmpty": false,
        "sendSingle": true,
        "format": "json",
        "bufferLength": 1024,
        "enableCache": true,
        "cleanCacheAtStop": false,
        "runAsync": false
      }
    }
  ],
  "options": {
    "debug": false,
    "isEventTime": false,
    "lateTolerance": "1s",
    "concurrency": 1,
    "bufferLength": 1024,
    "sendMetaToSink": false,
    "sendNilField": false,
    "sendError": false,
    "checkpointInterval": "5m0s",
    "restartStrategy": {
      "delay": "1s",
      "multiplier": 2,
      "maxDelay": "30s",
      "jitterFactor": 0.1
    }
  }
}

I tried with different SQL queries, modifying the previous rule:

SELECT *
FROM CUST1_Crowdstrike
LEFT JOIN machine_user
ON CUST1_Crowdstrike.device.hostname = machine_user.machine

With this query (the one that is on the example of the rule describe), I'm getting three outputs, one for each entry on the SQL Table, and I'm expecting one, only when "hostname"="DESKTOP-TEST" on the stream event)

SELECT *
FROM CUST1_Crowdstrike
LEFT JOIN machine_user
ON CUST1_Crowdstrike.device.hostname = machine_user.machine
WHERE CUST1_Crowdstrike.device.hostname = machine_user.machine

With this query, I'm getting one output, only when "hostname"="DESKTOP-TEST" on the stream event, which is correct, but at my understanding, with LEFT JOIN, the WHERE it shouldn't be necessary.

SELECT *
FROM CUST1_Crowdstrike
LEFT JOIN user_info
ON CUST1_Crowdstrike.device.hostname = user_info.machine_user

With this query, I'm getting no output, and I'm expecting one, only when "hostname"="DESKTOP-TEST" on the stream event)

SELECT *
FROM CUST1_Crowdstrike
LEFT JOIN machine_user
ON CUST1_Crowdstrike.device.hostname = machine_user.machine
LEFT JOIN user_info
ON machine_user.user = user_info.id_user

With this query, I'm getting no output, and I'm expecting one, only when "hostname"="DESKTOP-TEST" on the stream event)

How to reproduce it (as minimally and precisely as possible):

  • Use the provided output examples and the different describes of the streams/tables/rules to reproduce the problem.

Anything else we need to know?:
Nothing.

Thanks in advance for your help, and let me know if I can provide more information about this.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions