Python modules
Note
This page shows literal source listings only — Sphinx does not import
or execute these modules. Paths are relative to this file (docs/source/python.rst
),
so ../../app.py
points to the repo root.
If you later switch to autodoc (e.g. .. automodule:: app
), ensure your
modules have no import-time side effects (no network calls, file I/O, or
starting servers). Put run logic under:
if __name__ == "__main__":
main()
Warning
Don’t expose secrets. Keep .env
out of the repo and out of docs builds.
In conf.py
you can guard with:
exclude_patterns += ["**/.env", "**/*.env"]
Tip
Long files? Show a slice with :lines:
,
or highlight parts with :emphasize-lines:
:
.. literalinclude:: ../../app.py
:language: python
:lines: 1-120
:emphasize-lines: 12-18,45-47
app.py
1#!/usr/bin/env python
2
3"""
4Copyright (C) 2025 Mukharbek Organokov
5Website: www.circassiandna.com
6
7This program is free software: you can redistribute it and/or modify
8it under the terms of the GNU General Public License as published by
9the Free Software Foundation, either version 3 of the License, or
10(at your option) any later version.
11
12This program is distributed in the hope that it will be useful,
13but WITHOUT ANY WARRANTY; without even the implied warranty of
14MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15GNU General Public License for more details.
16
17You should have received a copy of the GNU General Public License
18along with this program. If not, see <https://www.gnu.org/licenses/>.
19"""
20
21import json
22import os
23from typing import Any, Dict, List, Optional, Tuple, Union
24
25from flask import (
26 Flask,
27 Response,
28 jsonify,
29 render_template,
30 request,
31 send_from_directory,
32)
33from flask_cors import CORS
34from openai import OpenAI, OpenAIError
35from pinecone import Pinecone # ServerlessSpec
36from pinecone.exceptions import PineconeException
37
38from utils import get_module_logger
39
40LOGGER = get_module_logger(__name__)
41
42# -------------------
43# Environment
44# -------------------
45KB = "circassiandna-knowledgebase"
46OPENAI_API_KEY: Optional[str] = os.environ.get("OPENAI_API_KEY")
47PINECONE_API_KEY: Optional[str] = os.environ.get("PINECONE_API_KEY")
48PINECONE_INDEX: str = os.environ.get("PINECONE_INDEX", KB)
49PINECONE_ENVIRONMENT: Optional[str] = os.environ.get("PINECONE_ENVIRONMENT")
50PINECONE_CLOUD: Optional[str] = os.environ.get("PINECONE_CLOUD")
51PINECONE_REGION: Optional[str] = os.environ.get("PINECONE_REGION")
52PINECONE_NAMESPACE: Optional[str] = os.environ.get("PINECONE_NAMESPACE")
53PORT_STR: Optional[str] = os.environ.get("PORT")
54PORT: int = int(PORT_STR) if PORT_STR is not None else 8080
55
56LOGGER.info("Starting app on port %d", PORT)
57
58# -------------------
59# Config
60# -------------------
61MODEL = "gpt-4o-mini"
62TOP_N = 3 # for model
63BATCH_SIZE = 50 # for indexes
64
65
66if not OPENAI_API_KEY:
67 LOGGER.critical("OPENAI_API_KEY is missing!")
68 raise RuntimeError("OPENAI_API_KEY is required")
69
70client = OpenAI(api_key=OPENAI_API_KEY)
71LOGGER.info("OpenAI client initialized.")
72
73
74# -------------------
75# Load Knowledge Base
76# -------------------
77try:
78 with open("knowledgebase.json", "r", encoding="utf-8") as f:
79 KNOWLEDGEBASE: Dict[str, str] = json.load(f)
80 LOGGER.info("Knowledge base loaded: %d entries", len(KNOWLEDGEBASE))
81except (OSError, json.JSONDecodeError) as err:
82 LOGGER.critical("Failed to load knowledgebase.json: %s", err)
83 raise RuntimeError(f"Failed to load knowledgebase.json: {err}") from err
84
85
86# -------------------
87# Embedding
88# -------------------
89def embed_text(text: str) -> List[float]:
90 """
91 Generate an embedding using OpenAI.
92 Uses the `text-embedding-3-small` embedding model.
93 The dimension (1536) matches the Pinecone index configuration.
94 :param text: The input text to be converted into a vector representation.
95 :return: A 1536-dim embedding vector corresponding to the input text.
96 """
97 try:
98 model = "text-embedding-3-small"
99 response = client.embeddings.create(model=model, input=text)
100 LOGGER.debug("Embedding generated for text of length %d", len(text))
101 return response.data[0].embedding
102 except OpenAIError as err:
103 LOGGER.error(
104 "Embedding generation failed for text='%s': %s", text, err
105 )
106 raise
107 except Exception:
108 LOGGER.exception("Unexpected error during embedding generation.")
109 raise
110
111
112# -------------------
113# Build indexes
114# -------------------
115def build_index(pc, index_name: str = PINECONE_INDEX) -> None:
116 """
117 Build or rebuild the Pinecone index with knowledgebase data.
118 This function checks if the specified Pinecone index exists and creates it
119 if necessary. It then embeds each entry from the knowledge base using
120 OpenAI's embedding API and uploads the vectors to Pinecone in batches.
121 :param pc: An initialized Pinecone client instance.
122 :param index_name: Name of the index to create or update.
123 :return: None.
124 """
125 try:
126 indexes = pc.list_indexes().names()
127 LOGGER.info("Current Pinecone indexes: %s", indexes)
128
129 if index_name not in indexes:
130 LOGGER.info("Creating Pinecone index: %s", index_name)
131 if not PINECONE_CLOUD or not PINECONE_REGION:
132 LOGGER.error("PINECONE_CLOUD or PINECONE_REGION missing!")
133 raise RuntimeError(
134 "PINECONE_CLOUD and PINECONE_REGION are not found."
135 )
136 pc.create_index(
137 name=index_name,
138 dimension=1536, # matches text-embedding-3-small
139 metric="cosine",
140 # spec=ServerlessSpec(
141 # cloud=PINECONE_CLOUD, region=PINECONE_REGION
142 # ),
143 )
144 LOGGER.info("Created new Pinecone index: %s", index_name)
145
146 idx = pc.Index(index_name)
147 LOGGER.info("Indexes stats: %s", idx.describe_index_stats())
148
149 vectors = []
150 for i, (k, v) in enumerate(KNOWLEDGEBASE.items()):
151 try:
152 text = f"{k}\n{v}"
153 emb = embed_text(text)
154 vectors.append((str(i), emb, {"title": k, "text": v}))
155 except OpenAIError:
156 LOGGER.warning("Skipping entry %r due to embedding failure", k)
157 continue
158
159 for i in range(0, len(vectors), BATCH_SIZE):
160 batch = vectors[i : i + BATCH_SIZE]
161 LOGGER.info(
162 "Uploading batch %s to %s: (%s vectors)",
163 i,
164 i + len(batch),
165 len(batch),
166 )
167 # LOGGER.info(
168 # "Uploading batch %s to %s (%s vectors)",
169 # i,
170 # i + len(batch) - 1,
171 # len(batch),
172 # )
173 idx.upsert(vectors=batch)
174 LOGGER.debug("Uploaded batch %d-%d", i, i + len(batch))
175
176 LOGGER.info("Index build complete with %d vectors.", len(vectors))
177 except PineconeException:
178 LOGGER.exception("Pinecone operation failed during index build.")
179 raise
180 except Exception:
181 LOGGER.exception("Unexpected error while building Pinecone index.")
182 raise
183
184
185# -------------------
186# Query
187# -------------------
188def query_index(query: str, top_k: int = 3) -> List[Dict[str, Any]]:
189 """
190 Query Pinecone index, fallback to knowledge base if unavailable.
191 :param query: Search query.
192 :param top_k: Max number of results.
193 :return: List of matched documents with
194 - entry title;
195 - entry text;
196 - similarity score (1.0 for keyword matches).
197 """
198 LOGGER.debug("Query received: %s (top_k=%d)", query, top_k)
199 if top_k < 1:
200 raise ValueError("top_k must be >= 1")
201
202 def _fallback_search() -> List[Dict[str, Union[str, float]]]:
203 """
204 Perform a simple keyword search over the local knowledge base.
205 This function searches both the keys (titles) and values (content)
206 of the knowledge base dictionary for the given query string.
207 Matching entries are returned with a fixed score of 1.0.
208 :return: A list of matching entries.
209 """
210 results = []
211 for k, v in KNOWLEDGEBASE.items():
212 if query.lower() in k.lower() or query.lower() in v.lower():
213 results.append({"title": k, "text": v, "score": 1.0})
214 LOGGER.debug("Fallback search found %d results", len(results))
215 return results[:top_k]
216
217 try:
218 if not PINECONE_API_KEY:
219 LOGGER.warning(
220 "PINECONE_API_KEY not set. Using fallback KB search."
221 )
222 return _fallback_search()
223
224 # Initialize Pinecone client
225 pc = Pinecone(
226 api_key=PINECONE_API_KEY, environment=PINECONE_ENVIRONMENT
227 )
228
229 if PINECONE_INDEX not in pc.list_indexes().names():
230 LOGGER.warning(
231 "Pinecone index not found: %s. Using fallback KB search",
232 PINECONE_INDEX,
233 )
234 return _fallback_search()
235
236 idx = pc.Index(PINECONE_INDEX)
237 try:
238 q_emb = embed_text(query)
239 except OpenAIError as err:
240 LOGGER.error("OpenAI embedding generation failed: %s", err)
241 return _fallback_search()
242
243 # Query Pinecone API
244 results = idx.query(
245 vector=q_emb,
246 top_k=top_k,
247 namespace=PINECONE_NAMESPACE,
248 include_metadata=True,
249 )
250
251 # Extract matches
252 # Access matches correctly from the QueryResponse object
253 # Uses match.metadata.get() so missing keys won’t cause crashes.
254 data = [
255 {
256 "title": match.metadata.get("title"),
257 "text": match.metadata.get("text"),
258 "score": match.score,
259 }
260 for match in results.matches
261 ]
262 LOGGER.debug("Pinecone query returned %d matches", len(data))
263 return data
264 except PineconeException as err:
265 LOGGER.error("Pinecone query failed: %s", err)
266 return _fallback_search()
267
268 except Exception as err:
269 LOGGER.exception("Unexpected error during query_index(): %s", err)
270 return _fallback_search()
271
272
273# -------------------
274# Flask App
275# -------------------
276app = Flask(__name__, static_folder="static", template_folder="templates")
277
278# CORS(app)
279# CORS(app, origins=["https://circassiandna.com"])
280CORS(
281 app,
282 resources={
283 r"/api/*": {
284 "origins": [
285 "https://www.circassiandna.com",
286 "https://circassiandna.com",
287 "http://localhost:5000",
288 "http://localhost:8000",
289 "http://localhost:8080",
290 ]
291 }
292 },
293 # supports_credentials=True
294)
295
296LOGGER.info("Flask app initialized.")
297
298
299# -------------------
300# Context Retrieval
301# -------------------
302def retrieve_context(
303 question: str, top_n: int = TOP_N
304) -> List[Dict[str, Any]]:
305 """
306 Retrieve the most relevant knowledge base entries for a given question.
307 :param question: The user's input question.
308 :param top_n: Maximum number of relevant entries to return.
309 :return: A list of dictionaries, each containing
310 (q) The question/title from the knowledge base.
311 (a) The corresponding answer/text.
312 (score) Relevance score from Pinecone, if available.
313 """
314 LOGGER.debug("Retrieving context for question: %s", question)
315 try:
316 hits = query_index(question, top_k=top_n)
317 LOGGER.info("Context retrieval found %d hits", len(hits))
318 return [
319 {"q": h["title"], "a": h["text"], "score": h.get("score")}
320 for h in hits
321 ]
322 except PineconeException as err:
323 LOGGER.exception("Pinecone query failed. %s", err)
324 return []
325 except Exception as err:
326 LOGGER.exception("Unexpected error during context retrieval. %s", err)
327 return []
328
329
330# -------------------
331# Serve Static Files
332# -------------------
333@app.route("/static/<path:filename>")
334def serve_static(filename) -> Response:
335 """
336 Serve static files from the 'static' directory.
337 Ensures that requests like /static/chat-widget.js will work under Gunicorn
338 in Docker since Flask doesn't serve static files unless a route to them is
339 explicitely added. By default, Flask can serve static files in debug mode.
340 :param filename: The relative path of the file within the 'static' folder.
341 :return: Flask response containing the requested static file.
342 """
343 LOGGER.debug("Serving static file: %s", filename)
344 return send_from_directory(os.path.join(app.root_path, "static"), filename)
345
346
347# -------------------
348# Main UI
349# -------------------
350@app.route("/")
351def index() -> str:
352 """
353 Render the main chatbot UI page.
354 :return: Rendered HTML for the chatbot interface.
355 """
356 LOGGER.info("Main UI requested")
357 return render_template("index.html")
358
359
360# -------------------
361# Heatlh Check
362# -------------------
363@app.route("/healthz")
364def healthz() -> Tuple[str, int]:
365 """
366 Health check endpoint to verify server status.
367 :return: A tuple containing a status message and HTTP status code.
368 """
369 LOGGER.debug("Health check requested")
370 return "OK", 200
371
372
373# -------------------
374# Chat API Endpoint
375# -------------------
376@app.route("/api/chat", methods=["POST", "OPTIONS"])
377def chat() -> Response:
378 """
379 Handle chat requests from the client.
380
381 This endpoint:
382 - Receives a JSON payload with a "question" field.
383 - Retrieves relevant context from Pinecone or a fallback knowledge base.
384 - Constructs a prompt for the OpenAI model.
385 - Returns the model's answer as JSON.
386
387 Request JSON:
388 {
389 "question": "<user's question>"
390 }
391
392 Response JSON:
393 {
394 "answer": "<generated answer>"
395 }
396 :return: Flask response as JSON with generated answer or an error msg.
397 """
398 LOGGER.info("Chat API.")
399 if request.method == "OPTIONS":
400 # This is the preflight request
401 return "", 204
402
403 def error_response(
404 message: str, status_code: int
405 ) -> Tuple[Dict[str, str], int]:
406 """Helper to format error responses consistently."""
407 LOGGER.warning("Error response: %s", message)
408 return {"error": message}, status_code
409
410 response_data = None
411 status_code = 200
412
413 try:
414 data = request.json or {}
415 LOGGER.debug("Received request data: %s", data)
416 except Exception as err:
417 LOGGER.exception("Invalid JSON in request. %s", err)
418 response_data, status_code = error_response(
419 "Invalid JSON payload", 400
420 )
421 else:
422 question = data.get("question")
423 if not question:
424 response_data, status_code = error_response(
425 "No question provided", 400
426 )
427 else:
428 try:
429 contexts = retrieve_context(question)
430 LOGGER.info("Contexts retrieved: %s", contexts)
431 except PineconeException as err:
432 LOGGER.exception("Pinecone query failed.")
433 response_data, status_code = error_response(
434 f"Pinecone error: {str(err)}", 500
435 )
436 except Exception as err:
437 LOGGER.exception("Unexpected error during context retrieval.")
438 response_data, status_code = error_response(
439 f"Context retrieval error: {str(err)}", 500
440 )
441 else:
442 LOGGER.info("Pinecone results: %s", contexts)
443 combined_context = "\n\n".join(
444 f"Q: {c['q']}\nA: {c['a']}" for c in contexts
445 )
446
447 prompt = (
448 "You are a helpful assistant for Circassian DNA.\n"
449 "First, check the knowledge base entries below.\n"
450 "If you find a relevant answer, use it directly.\n"
451 "If the knowledge base does not have a clear answer, "
452 "you may use your own knowledge.\n"
453 "Always prefer the knowledge base if there is a match.\n\n"
454 f"Knowledge base: {combined_context}\n\n"
455 f"Question: {question}\n"
456 "Answer:"
457 )
458
459 try:
460 completion = client.chat.completions.create(
461 model=MODEL,
462 messages=[{"role": "user", "content": prompt}],
463 )
464 answer = completion.choices[0].message.content
465 except OpenAIError as err:
466 LOGGER.exception("OpenAI API request failed.")
467 response_data, status_code = error_response(
468 f"OpenAI API error: {str(err)}", 500
469 )
470 except Exception as err:
471 LOGGER.exception("Unexpected error during OpenAI request.")
472 response_data, status_code = error_response(
473 f"Answer generation error: {str(err)}", 500
474 )
475 else:
476 response_data = {"answer": answer}
477
478 return jsonify(response_data), status_code
479
480
481# -------------------
482# Entrypoint
483# -------------------
484if __name__ == "__main__":
485
486 import sys
487
488 if len(sys.argv) > 1 and sys.argv[1] == "build":
489 if not PINECONE_API_KEY:
490 raise RuntimeError("PINECONE_API_KEY is required for index build.")
491 pc = Pinecone(api_key=PINECONE_API_KEY)
492 build_index(pc)
493 else:
494 port = int(os.environ.get("PORT", PORT))
495 app.run(host="0.0.0.0", port=port)
combine_jsons.py
1#!/usr/bin/env python3
2
3"""
4Copyright (C) 2025 Mukharbek Organokov
5Website: www.circassiandna.com
6
7This program is free software: you can redistribute it and/or modify
8it under the terms of the GNU General Public License as published by
9the Free Software Foundation, either version 3 of the License, or
10(at your option) any later version.
11
12This program is distributed in the hope that it will be useful,
13but WITHOUT ANY WARRANTY; without even the implied warranty of
14MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15GNU General Public License for more details.
16
17You should have received a copy of the GNU General Public License
18along with this program. If not, see <https://www.gnu.org/licenses/>.
19"""
20
21import json
22from pathlib import Path
23from typing import List
24
25from utils import get_module_logger, remove_trailing_commas
26
27LOGGER = get_module_logger(__name__)
28
29
30def check_subdirs(input_dir: str, required_subdirs: str) -> bool:
31 """
32 Checks directory structure.
33 :param input_dir: An input directory. Default is data.
34 :param required_subdirs: A list of required subdirectories.
35 :return: Boolean.
36 """
37 missing_subdirs = []
38 dirpath = Path(input_dir)
39
40 if not dirpath.is_dir():
41 LOGGER.error("Directory does not exist: %s", input_dir)
42 return False
43
44 for sub in required_subdirs:
45 subdir = dirpath / sub
46 if not subdir.is_dir():
47 missing_subdirs.append(subdir)
48
49 if missing_subdirs:
50 for m in missing_subdirs:
51 LOGGER.error("Missing required subdir: %s", m)
52 return False
53
54 return True
55
56
57def combine_json_files(
58 input_dir: str,
59 required_subdirs: str,
60 input_filenames: List[str],
61 output_file: str,
62) -> None:
63 """
64 Combine multiple JSON dict files from a directory into one JSON file.
65 :param input_dir: Directory containing JSON files.
66 :param input_filenames: List of JSON file names to combine.
67 :param output_file: Path to the output combined JSON file.
68 """
69 combined_data = {}
70 if check_subdirs(input_dir=input_dir, required_subdirs=required_subdirs):
71 LOGGER.info("All required subdirectories exist!")
72
73 dirpath = Path(input_dir)
74 for sub in required_subdirs:
75 for filename in input_filenames:
76 filepath = dirpath / sub / f"{filename}_{sub}.json"
77 if not filepath.is_file():
78 LOGGER.warning("File not found, skipping: %s", filepath)
79 continue
80
81 try:
82 read_raw_text = filepath.read_text(encoding="utf-8")
83 fixed_text = remove_trailing_commas(read_raw_text)
84 data = json.loads(fixed_text)
85 if not isinstance(data, dict):
86 LOGGER.warning(
87 "Skipping %s: not a JSON object at top level",
88 filepath,
89 )
90 continue
91 combined_data.update(data)
92 LOGGER.info("Added %s", filepath)
93 except json.JSONDecodeError as err:
94 LOGGER.error("Invalid JSON in %s: %s", filepath, err)
95 except Exception as err:
96 LOGGER.error("Error reading %s: %s", filepath, err)
97
98 if combined_data:
99 try:
100 # output_dir = os.path.dirname(output_file)
101 with open(output_file, "w", encoding="utf-8") as f:
102 json.dump(
103 combined_data, f, indent=2, ensure_ascii=False
104 )
105 LOGGER.info("Combined JSON saved to %s", output_file)
106 except Exception as err:
107 LOGGER.error(
108 "Failed to write output file %s: %s", output_file, err
109 )
110 else:
111 LOGGER.warning("No valid JSON data to write.")
112
113 else:
114 LOGGER.error(
115 "Aborting combine: required subdirectories %s not found in %s",
116 required_subdirs,
117 input_dir,
118 )
119
120
121def main() -> None:
122 """
123 Main entry point for the script.
124 - Looks for explicit list of `.json` files in a INPUT_DIR directory.
125 - Expects each file to contain a JSON object (dict at top-level).
126 - Handles missing files and invalid JSON gracefully.
127 - Merges all dicts into one combined dict.
128 - Saves the result into OUTPUT_FILE.
129 """
130
131 input_dir = "data"
132 required_subdirs = ["en", "ru"]
133 input_filenames = [
134 "base",
135 "ftdna",
136 "order",
137 "hg",
138 "archeology",
139 "history",
140 "projects",
141 "tree",
142 "genealogy",
143 "general",
144 "miscellaneous",
145 ]
146 output_path = "knowledgebase.json"
147 combine_json_files(
148 input_dir, required_subdirs, input_filenames, output_path
149 )
150
151
152if __name__ == "__main__":
153 main()
lambda_handler.py
1#!/usr/bin/env python
2
3"""
4Copyright (C) 2025 Mukharbek Organokov
5Website: www.circassiandna.com
6
7This program is free software: you can redistribute it and/or modify
8it under the terms of the GNU General Public License as published by
9the Free Software Foundation, either version 3 of the License, or
10(at your option) any later version.
11
12This program is distributed in the hope that it will be useful,
13but WITHOUT ANY WARRANTY; without even the implied warranty of
14MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15GNU General Public License for more details.
16
17You should have received a copy of the GNU General Public License
18along with this program. If not, see <https://www.gnu.org/licenses/>.
19"""
20
21from apig_wsgi import make_lambda_handler
22
23from app import app
24
25# Create the AWS Lambda handler
26# This produces a function with signature (event, context) -> dict
27# Works for both API Gateway REST (v1) and HTTP API (v2) events.
28handler = make_lambda_handler(app)
29
30# Note: The above handler is a simplified version using apig_wsgi.
31# Custom handler with awsgi2; TODO: debug
32
33# import json
34# from typing import Any, Dict, Mapping
35
36# import awsgi2
37
38# from app import app
39# from utils import get_module_logger
40
41# LOGGER = get_module_logger(__name__)
42
43
44# def handler(event: Dict[str, Any], context: Any) -> Mapping[str, Any]:
45# """
46# AWS Lambda entry point for the Flask application.
47# This function acts as a bridge between AWS Lambda and the Flask app
48# using the `awsgi2.response` function to handle API Gateway events.
49# :param event: The API Gateway event dictionary.
50# :param context: The Lambda context object, which contains runtime info.
51# :return: A response dictionary formatted for API Gateway.
52# """
53# try:
54# LOGGER.info("Received event: %s", event)
55
56# # # Detect API Gateway version
57# # if "version" in event and event["version"] == "2.0":
58# # is_v2 = True # HTTP API
59# # elif "httpMethod" in event:
60# # is_v2 = False # REST API
61# # else:
62# # raise ValueError("Unknown API Gateway event type")
63
64# # Let awsgi2 auto-detect API Gateway version
65# return awsgi2.response(app, event, context)
66
67# except Exception as exc:
68# # Exception captures stack trace in CloudWatch.
69# LOGGER.exception("Unhandled exception in Lambda handler.")
70
71# # Return a JSON-formatted 500 error for API Gateway
72# err_msg = "An unexpected error occurred."
73# return {
74# "statusCode": 500,
75# "headers": {"Content-Type": "application/json"},
76# "body": json.dumps(
77# {
78# "error": "Internal Server Error",
79# "message": (
80# str(exc) if app.debug else err_msg
81# ),
82# }
83# ),
84# }
utils.py
1#!/usr/bin/env python
2
3"""
4Copyright (C) 2025 Mukharbek Organokov
5Website: www.circassiandna.com
6
7This program is free software: you can redistribute it and/or modify
8it under the terms of the GNU General Public License as published by
9the Free Software Foundation, either version 3 of the License, or
10(at your option) any later version.
11
12This program is distributed in the hope that it will be useful,
13but WITHOUT ANY WARRANTY; without even the implied warranty of
14MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15GNU General Public License for more details.
16
17You should have received a copy of the GNU General Public License
18along with this program. If not, see <https://www.gnu.org/licenses/>.
19"""
20
21import logging
22import re
23from typing import Optional
24
25
26def remove_trailing_commas(text: str) -> str:
27 """
28 Remove trailing commas before } or ] in a JSON string.
29 """
30 # Remove comma before }
31 text = re.sub(r",\s*}", "}", text)
32 # Remove comma before ]
33 text = re.sub(r",\s*]", "]", text)
34 return text
35
36
37def get_module_logger(mod_name: Optional[str] = None) -> logging.Logger:
38 """
39 Create and configure a logger for multi-module usage.
40 :param mod_name: The name of the logger. If None, the root logger is used.
41 :return: A configured logger instance.
42 """
43 logger = logging.getLogger(mod_name)
44 if not logger.handlers:
45 handler = logging.StreamHandler()
46 formatter = logging.Formatter(
47 "%(asctime)s %(name)-12s %(levelname)-8s %(message)s"
48 )
49 handler.setFormatter(formatter)
50 logger.addHandler(handler)
51 logger.setLevel(logging.DEBUG)
52
53 return logger