-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgspread_updater.py
More file actions
201 lines (168 loc) · 6.2 KB
/
gspread_updater.py
File metadata and controls
201 lines (168 loc) · 6.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
from typing import List, Optional
import gspread
from datetime import datetime
from job_store import PrintJob
CREDENTIALS_PATH = 'printer-monitoring-474822-bdfc6f0da109.json'
SPREADSHEET_NAME = "print-records"
class SheetClient:
def __init__(self, worksheet_name):
self.credentials_path = CREDENTIALS_PATH
self.spreadsheet_name = SPREADSHEET_NAME
self.worksheet_name = worksheet_name
self._client = None
self._spreadsheet = None
self._sheet = None
def _connect(self):
"""Lazy initialization of gspread client and worksheet."""
if self._client is None:
self._client = gspread.service_account(filename=self.credentials_path)
if self._spreadsheet is None:
self._spreadsheet = self._client.open(self.spreadsheet_name)
if self._sheet is None:
self._sheet = self._spreadsheet.worksheet(self.worksheet_name)
return self._sheet
def find_job_row(self, name: str, date: datetime):
"""
Checks the sheet for a job with the given name and date.
If found, returns the row number and row.
If not found, returns the next available empty row number.
"""
ws = self._connect()
all_values = ws.get_all_values()
for i, row in enumerate(all_values, start=1):
if len(row) < 3:
continue
row_name = row[0]
row_date_str = row[2].strip()
try:
row_date = datetime.strptime(row_date_str, "%m/%d/%Y %H:%M")
except Exception:
continue
if row_name == name and row_date == date:
return i, row
return len(all_values) + 1, ""
def update_job(self, job):
"""
Finds the row for the given job by name and date.
Overwrites that row’s data, or writes it to the next empty row if not found.
"""
ws = self._connect()
i, row = self.find_job_row(job.name, job.date)
if i > ws.row_count:
ws.add_rows(i - ws.row_count)
values = self.map_job_to_row(job, row)
ws.update(f"A{i}:H{i}", [values])
def map_job_to_row(self, job, row = None):
"""
Maps PrintJob object to literals for data entry
"""
errors = row[7] if row and len(row) > 7 else ""
errors += (" " + job.errors) if job.errors not in errors else ""
values = [
job.name,
job.status,
job.date.strftime("%m/%d/%Y %H:%M"),
job.duration,
job.machine,
job.weight,
", ".join(job.materials),
errors
]
return values
def get_oldest_in_progress_job(self) -> Optional[PrintJob]:
"""
Returns the oldest PrintJob that is still in progress.
"""
ws = self._connect()
rows = ws.get_all_values()
if not rows:
return None
# Convert and filter
jobs = [
self.row_to_printjob(r)
for r in rows
if len(r) >= 2 and r[1].strip().lower() in ("printing", "printing paused")
]
if not jobs:
return None
return min(jobs, key=lambda j: j.date)
def get_most_recent_job(self) -> Optional[PrintJob]:
"""
Returns the most recent PrintJob based on the date column.
"""
ws = self._connect()
rows = ws.get_all_values()
if not rows:
return None
jobs = [self.row_to_printjob(r) for r in rows if len(r) >= 3 and r[2].strip()]
if not jobs:
return None
return max(jobs, key=lambda j: j.date)
def row_to_printjob(self, row: List[str]) -> PrintJob:
"""
Convert a Google Sheet row into a PrintJob instance.
name, status, date, duration, machine, weight, materials, errors
"""
return PrintJob(
name=row[0],
status=row[1],
date=datetime.strptime(row[2], "%m/%d/%Y %H:%M"),
duration=float(row[3]) if row[3] else 0.0,
machine=row[4],
weight=float(row[5]) if row[5] else 0.0,
materials=[m.strip() for m in row[6].split(",")] if row[6] else [],
errors= row[7] if len(row) > 7 else ""
)
def get_printer_config(self, max_rows: int = 32):
"""
Reads up to max_rows of column A for printer configuration.
Returns a list of (row_number, printer_name) tuples for non-empty rows.
"""
ws = self._connect()
data = ws.get(f"A1:A{max_rows}")
printers = []
for idx, row in enumerate(data, start=1):
if not row:
continue
printer = row[0].strip()
if printer:
printers.append((idx, printer))
return printers
def set_mfa_display_info(self, row_number: int, row_data: dict):
"""
Tracks the current status and remaining time on a device.
This is specifically for the heads-up display on the mfa monitor
"""
ws = self._connect()
try:
values = [
row_data.get("Status", ""),
row_data.get("Completion", ""),
row_data.get("Time", "")
]
ws.update(f'B{row_number}:D{row_number}', [values])
print(f"[SheetClient] Row {row_number} updated (columns B-D): {values}")
except Exception as e:
print(f"[SheetClient Error] Failed to update row {row_number}: {e}")
def get_mfa_display_info(self):
"""
Expects columns A: Printer, B: Status, C: Completion, D: Time.
"""
sheet = self._connect()
data = sheet.get('A1:D32')
results = []
for row in data:
if len(row) < 4:
continue
printer, status, completion, time_left = row
try:
completion = float(completion)
except (TypeError, ValueError):
completion = None
results.append({
"printer": printer,
"status": status,
"completion": completion,
"time_left": time_left,
})
return results