Skip to content

Create a script to take get posthog info #985

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
kevkevinpal opened this issue Jan 24, 2025 · 2 comments
Open

Create a script to take get posthog info #985

kevkevinpal opened this issue Jan 24, 2025 · 2 comments
Assignees

Comments

@kevkevinpal
Copy link
Collaborator

Description

We want a python script which calls posthog's endpoint which gets all activity entries by $session_id

Acceptance criteria

If the Event Type is backend_api_call then
we want to decode the elements_chain using a command similar to this xxd -r -p test.txt | gunzip -c > testuncompressed.txt
then we want to save the output to a file

you can see how we compress and encode elements_chain here

If you need the api key please message me directly on sphinx 03a2bbbed97ca00f80445872fb65ad6711f7c4de1b41c7097143e3aafa28a644c2_034bcc332390470cc4f9ef7491af1da2ffceefccd39ceb6acd87c83920543013d7_529771090576408576

ideally the script would work like such

./posthog_script.py <session_id> <file_path>

Below is a screenshot of the info we want to get

Image

@aliraza556
Copy link
Contributor

aliraza556 commented Jan 24, 2025

@kevkevinpal Please assign me

@kevkevinpal
Copy link
Collaborator Author

this is the code that was written by @aliraza556 that solves this issue

@tomsmith8 whenever you have the chance to run this please do, we can modify it to make it easier for you to work with

import sys
import os
import requests
import binascii
import gzip
import json
from datetime import datetime, timedelta


def decode_hex_gzip(hex_string):
    try:


        compressed_data = binascii.unhexlify(hex_string)
       
        try:


            return gzip.decompress(compressed_data).decode('utf-8')
        except:


            gzip_header = b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x00'
            full_gzip_data = gzip_header + compressed_data
            return gzip.decompress(full_gzip_data).decode('utf-8')
           
    except Exception as e:
        return f"Error decoding: {str(e)}"


def fetch_posthog_events(session_id, output_file):
    api_key = os.getenv('POSTHOG_KEY', 'phx_lnWxpCghsiW20SMUspeZqvwDXKiMCD6FiDzTCj5E9HTZHHw')
    base_url = os.getenv('POSTHOG_HOST', 'https://us.i.posthog.com')
   
    url = f"{base_url}/api/projects/@current/events"
   
    headers = {
        'Authorization': f'Bearer {api_key}',
        'Accept': 'application/json'
    }
   
    params = {
        'properties': json.dumps([{'key': '$session_id', 'value': session_id}]),
        'after': (datetime.now() - timedelta(days=1)).isoformat()
    }


    try:
        print(f"Fetching data from URL: {url}")
        response = requests.get(url, headers=headers, params=params)
        print(f"Response status code: {response.status_code}")
       
        if response.status_code != 200:
            print(f"Error response: {response.text}")
            return
           
        data = response.json()
        results = data.get('results', [])
        print(f"Total results found: {len(results)}")


        with open(output_file, 'w', encoding='utf-8') as f:
            for event in results:


                f.write(f"\n{'='*80}\n")
                f.write(f"Timestamp: {event.get('timestamp')}\n")
                f.write(f"URL: {event.get('properties', {}).get('$current_url')}\n")
                f.write(f"Event Type: {event.get('event')}\n")
                f.write(f"Session ID: {event.get('properties', {}).get('$session_id')}\n")
               
                if event.get('event') == 'backend_api_call':
                    elements_chain = event.get('elements_chain')
                    if elements_chain:
                        f.write(f"Elements Chain: {elements_chain}\n")
                        f.write(f"{'='*80}\n\n")
                       
                        try:
                            decoded_data = decode_hex_gzip(elements_chain)


                            try:
                                json_data = json.loads(decoded_data)
                                f.write("Decoded Data (JSON):\n")
                                f.write(json.dumps(json_data, indent=2))
                            except json.JSONDecodeError:
                                f.write("Decoded Data :\n")
                                f.write(decoded_data)
                            f.write("\n")
                        except Exception as e:
                            f.write(f"Error decoding elements_chain: {str(e)}\n")
               
                f.write(f"\n{'='*80}\n")
       
        print(f"\nResults have been written to {output_file}")
       
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data: {str(e)}")
        sys.exit(1)


def main():
    if len(sys.argv) != 3:
        print("Usage: ./posthog_script.py <session_id> <output_file>")
        sys.exit(1)
   
    session_id = sys.argv[1]
    output_file = sys.argv[2]
   
    fetch_posthog_events(session_id, output_file)


if __name__ == "__main__":
    main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants