Data z API do CSV

Zdravím,
tentokrát jsem se inspiroval mým minulým projektem, kde jsem tvořil program na získání a sečtení hmotnosti filamentů za určitou dobu. Jelikož jsem již přišel na to jak získávat data z API, řekl jsem si, že by nebylo od věci převést tyto data do přehledného CSV.

Pomocí stejného procesu, jako minule, jsem spojil files s print-jobs. Pak jsem pomocí csv a os knihovny přidal funkce na zapsání dat do souboru ve stejné lokaci jako je uložen python soubor a unikátně je pojmenoval.

Po prohlédnutí vytvořeného csv jsem vybral položky, které mi přišli jako užitečné pro většinu uživatelů a přidal jsem funkci, která je filtruje a formátuje datumy a časy. Pár dalšími funkcemi jsem odstranil chyby, které se v API vyskytovaly, jako jsou například chybějící soubory (pole v tabulce zůstane prázdné).

import subprocess
import json
import csv
import os
from datetime import datetime

def get_print_jobs(page):
    curl_command = [
        "curl", "-k", "-H", "Authorization: Token <SEM-API-KLÍČ>", 
        f"https://backend.next.karmen.tech/api/2/groups/<SEM-GROUP-ID>/print-jobs/?page={page}"
    ]
    result = subprocess.run(curl_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    
    if result.returncode != 0:
        return None
    
    return json.loads(result.stdout.decode())

def get_files(page):
    curl_command = [
        "curl", "-k", "-H", "Authorization: Token <SEM-API-KLÍČ>", 
        f"https://backend.next.karmen.tech/api/2/groups/<SEM-GROUP-ID>/files/?page={page}"
    ]
    result = subprocess.run(curl_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    
    if result.returncode != 0:
        return None
    
    return json.loads(result.stdout.decode())

def fetch_all_print_jobs():
    all_print_jobs = []
    page = 1

    while True:
        print_jobs_data = get_print_jobs(page)
        if not print_jobs_data or 'results' not in print_jobs_data:
            break

        all_print_jobs.extend(print_jobs_data['results'])

        if not print_jobs_data.get('next'):
            break
        
        page += 1

    return all_print_jobs

def fetch_all_files():
    all_files = []
    page = 1

    while True:
        files_data = get_files(page)
        if not files_data or 'results' not in files_data:
            break

        all_files.extend(files_data['results'])

        if not files_data.get('next'):
            break
        
        page += 1

    return all_files

def format_date(date_string):
    try:
        date = datetime.fromisoformat(date_string[:-1]) 
        return date.strftime("%d/%m/%Y")
    except ValueError:
        return date_string

def format_filament_used(used_g):
    if used_g is not None:
        used_g = float(used_g)
        if used_g >= 1000:
            return f"{used_g / 1000:.2f} kg" 
        if used_g == -1:
            return ""
        else:
            return f"{used_g:.2f} g" 
    return ""

def format_printing_time(time_in_seconds):
    time_in_seconds = abs(time_in_seconds)
    
    if time_in_seconds is not None:
        hours = time_in_seconds // 3600
        minutes = (time_in_seconds % 3600) // 60
        return f"{hours}h {minutes}m"
    return ""

def combine_print_jobs_and_files(print_jobs, files):
    combined_data = []
    
    files_lookup = {file['id']: file for file in files}

    for job in print_jobs:
        job_file_id = job.get('file_id')
        
        if job_file_id and job_file_id in files_lookup:
            combined_row = {
                'name': files_lookup[job_file_id].get('name', ''),
                'uploaded on': format_date(files_lookup[job_file_id].get('uploadedOn', '')),
                'printed on': format_date(job.get('started_on', '')),
                'filament type': files_lookup[job_file_id].get('filament_type', ''),
                'filament used g': format_filament_used(files_lookup[job_file_id].get('filament_used_g', 0)),
                'printing time': format_printing_time(files_lookup[job_file_id].get('printing_time_sec', 0)),
                'printed by': job.get('username', '')
            }
            combined_data.append(combined_row)

    return combined_data

def write_to_csv(data):
    filename = "combined_data.csv"
    counter = 1

    while os.path.exists(filename):
        filename = f"combined_data_{counter}.csv"
        counter += 1

    fieldnames = data[0].keys() if data else []

    with open(filename, mode='w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(data)

    print(f"Data written to {filename}")
    print(f"file://{os.path.abspath(filename)}") 

def main():
    print_jobs = fetch_all_print_jobs()
    files = fetch_all_files()

    combined_data = combine_print_jobs_and_files(print_jobs, files)

    if combined_data:
        write_to_csv(combined_data)
    else:
        print("No combined data available.")

if __name__ == '__main__':
    main()

Ve výsledné formě program zapíše do CSV:

  • název souboru
  • den kdy byl soubor nahrán
  • den kdy byl soubor vytištěn
  • typ filamentu
  • hmotnost použitého filamentu
  • čas tisku
  • název účtu který tisk spustil

Celý proces komunikace s API zabere několik sekund, položky které vám zobrazí lze samozřejmě modifikovat, buď přepsáním kódu (nejdřív je potřeba zjistit jak je požadovaná položka pojmenována), nebo mi sem obratem napište a pošlu vám kód určený k potřebnému účelu. Každopádně ocením jakoukoli zpětnou vazbu.

Patrik