Zdravím,
tentokrát jsem se inspiroval mým minulým projektem, kde jsem tvořil program na získání a sečtení hmotnosti filamentů za určitou dobu. Jelikož jsem již přišel na to jak získávat data z API, řekl jsem si, že by nebylo od věci převést tyto data do přehledného CSV.
Pomocí stejného procesu, jako minule, jsem spojil files s print-jobs. Pak jsem pomocí csv a os knihovny přidal funkce na zapsání dat do souboru ve stejné lokaci jako je uložen python soubor a unikátně je pojmenoval.
Po prohlédnutí vytvořeného csv jsem vybral položky, které mi přišli jako užitečné pro většinu uživatelů a přidal jsem funkci, která je filtruje a formátuje datumy a časy. Pár dalšími funkcemi jsem odstranil chyby, které se v API vyskytovaly, jako jsou například chybějící soubory (pole v tabulce zůstane prázdné).
import subprocess
import json
import csv
import os
from datetime import datetime
def get_print_jobs(page):
curl_command = [
"curl", "-k", "-H", "Authorization: Token <SEM-API-KLÍČ>",
f"https://backend.next.karmen.tech/api/2/groups/<SEM-GROUP-ID>/print-jobs/?page={page}"
]
result = subprocess.run(curl_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
return None
return json.loads(result.stdout.decode())
def get_files(page):
curl_command = [
"curl", "-k", "-H", "Authorization: Token <SEM-API-KLÍČ>",
f"https://backend.next.karmen.tech/api/2/groups/<SEM-GROUP-ID>/files/?page={page}"
]
result = subprocess.run(curl_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
return None
return json.loads(result.stdout.decode())
def fetch_all_print_jobs():
all_print_jobs = []
page = 1
while True:
print_jobs_data = get_print_jobs(page)
if not print_jobs_data or 'results' not in print_jobs_data:
break
all_print_jobs.extend(print_jobs_data['results'])
if not print_jobs_data.get('next'):
break
page += 1
return all_print_jobs
def fetch_all_files():
all_files = []
page = 1
while True:
files_data = get_files(page)
if not files_data or 'results' not in files_data:
break
all_files.extend(files_data['results'])
if not files_data.get('next'):
break
page += 1
return all_files
def format_date(date_string):
try:
date = datetime.fromisoformat(date_string[:-1])
return date.strftime("%d/%m/%Y")
except ValueError:
return date_string
def format_filament_used(used_g):
if used_g is not None:
used_g = float(used_g)
if used_g >= 1000:
return f"{used_g / 1000:.2f} kg"
if used_g == -1:
return ""
else:
return f"{used_g:.2f} g"
return ""
def format_printing_time(time_in_seconds):
time_in_seconds = abs(time_in_seconds)
if time_in_seconds is not None:
hours = time_in_seconds // 3600
minutes = (time_in_seconds % 3600) // 60
return f"{hours}h {minutes}m"
return ""
def combine_print_jobs_and_files(print_jobs, files):
combined_data = []
files_lookup = {file['id']: file for file in files}
for job in print_jobs:
job_file_id = job.get('file_id')
if job_file_id and job_file_id in files_lookup:
combined_row = {
'name': files_lookup[job_file_id].get('name', ''),
'uploaded on': format_date(files_lookup[job_file_id].get('uploadedOn', '')),
'printed on': format_date(job.get('started_on', '')),
'filament type': files_lookup[job_file_id].get('filament_type', ''),
'filament used g': format_filament_used(files_lookup[job_file_id].get('filament_used_g', 0)),
'printing time': format_printing_time(files_lookup[job_file_id].get('printing_time_sec', 0)),
'printed by': job.get('username', '')
}
combined_data.append(combined_row)
return combined_data
def write_to_csv(data):
filename = "combined_data.csv"
counter = 1
while os.path.exists(filename):
filename = f"combined_data_{counter}.csv"
counter += 1
fieldnames = data[0].keys() if data else []
with open(filename, mode='w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
print(f"Data written to {filename}")
print(f"file://{os.path.abspath(filename)}")
def main():
print_jobs = fetch_all_print_jobs()
files = fetch_all_files()
combined_data = combine_print_jobs_and_files(print_jobs, files)
if combined_data:
write_to_csv(combined_data)
else:
print("No combined data available.")
if __name__ == '__main__':
main()
Ve výsledné formě program zapíše do CSV:
- název souboru
- den kdy byl soubor nahrán
- den kdy byl soubor vytištěn
- typ filamentu
- hmotnost použitého filamentu
- čas tisku
- název účtu který tisk spustil
Celý proces komunikace s API zabere několik sekund, položky které vám zobrazí lze samozřejmě modifikovat, buď přepsáním kódu (nejdřív je potřeba zjistit jak je požadovaná položka pojmenována), nebo mi sem obratem napište a pošlu vám kód určený k potřebnému účelu. Každopádně ocením jakoukoli zpětnou vazbu.
Patrik