Commit 1b069ae7 authored by Dario Barghini's avatar Dario Barghini
Browse files

First commit

parents
Loading
Loading
Loading
Loading

LICENSE

0 → 100644
+0 −0

File added.

Preview size limit exceeded, changes collapsed.

README.md

0 → 100644
+45 −0
Original line number Diff line number Diff line
# SQM Mark Test Data

A command-line tool to mark or un-mark test data in an SQM (Sky Quality Meter) directory tree.

### Features

Mark Data: Moves data within a specified date range to .test files.
For daily data, it renames files from file.ext to file.test.ext.
For monthly data, it moves relevant rows from file.txt to a corresponding file.test.txt.

Un-mark Data: A --revert flag reverses the operation, moving data from .test files back to their original locations.

The script can be run multiple times without causing data duplication or errors. It correctly integrates new data on subsequent runs.

### Usage

Place the mark_test_data.py script in a directory. To run it, you provide a start and end date.

To Mark Data as Test: 
```
python mark_test_data.py 20240115 20240120 --root /path/to/data/root
```

To Un-mark (Revert) Data: 
```
python mark_test_data.py 20240115 20240120 --revert --root /path/to/data/root
```

### Installation

You can install this tool as a command-line utility using pip. After installation, you can run it from anywhere using the pysqm_mark_test command.

Run the installation from the directory containing setup.py:
```
pip install . -e
```

### Installed Command Usage

Once installed, you can use it like this:
```
pysqm-mark-test 20240115 20240120
pysqm-mark-test 20240115 20240120 --revert
```
 No newline at end of file

pysqm_mark_test.py

0 → 100644
+291 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
"""
Mark or un-mark test data in an SQM directory tree.

Features
- Mark: Moves data within a date range to '.test' files.
  - daily: Renames 'file.ext' to 'file.test.ext'.
  - monthly: Moves rows from 'file.txt' to 'file.test.txt'.
- Un-mark: A --revert flag moves data from '.test' files back to their original locations.

Usage
# Mark data as test
python pysqm_mark_test.py 20240115 20240120 [--root C:\\path\\to\\base]

# Un-mark data (revert)
python pysqm_mark_test.py 20240115 20240120 --revert [--root C:\\path\\to\\base]

Notes
- The script can be run multiple times without causing issues.
- Marking skips already marked files and integrates new data into existing monthly .test files.
- Reverting skips files that have no .test version and moves data correctly between existing files.
"""

from __future__ import annotations
import argparse
import csv
import datetime as dt
import os
import re
from pathlib import Path
from typing import Iterable, Tuple, List

# ---------------------------- Helpers ---------------------------------

def julian_window(start_yyyymmdd: str, end_yyyymmdd: str) -> Tuple[dt.datetime, dt.datetime]:
    """Parses start/end dates and returns a datetime window."""
    def parse_ymd(s: str) -> dt.date:
        if not re.fullmatch(r"\d{8}", s):
            raise ValueError(f"Invalid YYYYMMDD: {s}")
        return dt.date(int(s[:4]), int(s[4:6]), int(s[6:8]))

    start_date = parse_ymd(start_yyyymmdd)
    end_date = parse_ymd(end_yyyymmdd)
    if end_date < start_date:
        raise ValueError("end date must be >= start date")

    t0 = dt.datetime.combine(start_date, dt.time(12, 0))
    t1 = dt.datetime.combine(end_date + dt.timedelta(days=1), dt.time(12, 0))
    return t0, t1


def months_between(d0: dt.date, d1: dt.date) -> Iterable[str]:
    """Generates YYYYMM strings for each month between two dates."""
    y, m = d0.year, d0.month
    while (y, m) <= (d1.year, d1.month):
        yield f"{y:04d}{m:02d}"
        if m == 12:
            y, m = y + 1, 1
        else:
            m += 1


def ensure_parent(p: Path) -> None:
    """Ensures the parent directory of a path exists."""
    p.parent.mkdir(parents=True, exist_ok=True)


def add_test_suffix(p: Path) -> Path:
    """Adds a .test suffix before the file extension."""
    return p.with_suffix(".test" + p.suffix)

def remove_test_suffix(p: Path) -> Path:
    """Removes a .test suffix before the file extension."""
    if ".test" in p.suffixes:
        new_name = p.name.replace(".test", "", 1)
        return p.with_name(new_name)
    return p


_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"

def parse_local_datetime(s: str) -> dt.datetime | None:
    """Parses the specific local datetime format, returning None on failure."""
    s = s.strip()
    try:
        return dt.datetime.strptime(s, _DATE_FORMAT)
    except (ValueError, TypeError):
        return None

def read_csv_data(path: Path) -> Tuple[List[List[str]], List[List[str]]]:
    """Reads a CSV, separating header and data rows based on a parseable date in column 2."""
    if not path.exists():
        return [], []
    header_rows, data_rows = [], []
    with path.open("r", encoding="utf-8", newline="") as fh:
        reader = csv.reader(fh, delimiter=";")
        for row in reader:
            if len(row) < 2 or parse_local_datetime(row[1]) is None:
                header_rows.append(row)
            else:
                data_rows.append(row)
    return header_rows, data_rows

def write_csv_data(path: Path, header_rows: List[List[str]], data_rows: List[List[str]]):
    """Writes header and data rows to a CSV file atomically."""
    ensure_parent(path)
    tmp = path.with_suffix(path.suffix + ".tmp")
    with tmp.open("w", encoding="utf-8", newline="") as out:
        writer = csv.writer(out, delimiter=";")
        writer.writerows(header_rows)
        writer.writerows(data_rows)
    os.replace(tmp, path)

# ---------------------------- Core logic: Mark ---------------------------------

def process_daily(root: Path, t0: dt.datetime, t1: dt.datetime) -> int:
    """Renames daily files, skipping any that have already been renamed."""
    total_renamed = 0
    cur = t0.date()
    end_date = t1.date()

    while cur < end_date:
        yyyymm = f"{cur.year:04d}{cur.month:02d}"
        yyyymmdd = f"{cur.year:04d}{cur.month:02d}{cur.day:02d}"
        folder = root / "data" / "daily" / yyyymm
        if folder.exists():
            for ext in (".txt", ".png"):
                for name in folder.glob(f"{yyyymmdd}_*{ext}"):
                    if ".test" in name.suffixes:
                        continue
                    dest = add_test_suffix(name)
                    if dest.exists():
                        print(f"  Skipped: {dest.name} already exists.")
                        continue
                    ensure_parent(dest)
                    os.replace(name, dest)
                    print(f"  Renamed: {name.name} -> {dest.name}")
                    total_renamed += 1
        cur += dt.timedelta(days=1)
    return total_renamed


def process_monthly(root: Path, t0: dt.datetime, t1: dt.datetime) -> Tuple[int, int]:
    """Moves data to .test.txt files, integrating with existing data if present."""
    files_processed = 0
    rows_moved = 0
    d0 = t0.date()
    d1 = (t1 - dt.timedelta(seconds=1)).date()
    monthly_dir = root / "data" / "monthly"
    if not monthly_dir.exists():
        return files_processed, rows_moved

    for yyyymm in months_between(d0, d1):
        for src_path in monthly_dir.glob(f"{yyyymm}_*.txt"):
            if ".test" in src_path.suffixes:
                continue
            files_processed += 1
            test_path = add_test_suffix(src_path)
            src_headers, src_data = read_csv_data(src_path)
            _, existing_test_data = read_csv_data(test_path)
            new_test_rows, remaining_src_rows = [], []
            for row in src_data:
                when = parse_local_datetime(row[1])
                if when and t0 <= when < t1:
                    new_test_rows.append(row)
                else:
                    remaining_src_rows.append(row)
            if not new_test_rows:
                print(f"  No new rows in range for {src_path.name}.")
                continue
            combined_data = existing_test_data + new_test_rows
            unique_rows = [list(r) for r in sorted(list({tuple(row) for row in combined_data}), key=lambda r: parse_local_datetime(r[1]))]
            write_csv_data(test_path, src_headers, unique_rows)
            write_csv_data(src_path, src_headers, remaining_src_rows)
            newly_added_count = len(unique_rows) - len(existing_test_data)
            rows_moved += newly_added_count
            print(f"  {src_path.name}: moved {newly_added_count} new rows to {test_path.name}. Total test rows: {len(unique_rows)}")
    return files_processed, rows_moved

# ---------------------------- Core logic: Revert ---------------------------------

def revert_daily(root: Path, t0: dt.datetime, t1: dt.datetime) -> int:
    """Renames daily .test files back to their original names."""
    total_reverted = 0
    cur = t0.date()
    end_date = t1.date()
    while cur < end_date:
        yyyymm = f"{cur.year:04d}{cur.month:02d}"
        yyyymmdd = f"{cur.year:04d}{cur.month:02d}{cur.day:02d}"
        folder = root / "data" / "daily" / yyyymm
        if folder.exists():
            for ext in (".txt", ".png"):
                for name in folder.glob(f"{yyyymmdd}_*.test{ext}"):
                    dest = remove_test_suffix(name)
                    if dest.exists():
                        print(f"  Skipped revert: {dest.name} already exists.")
                        continue
                    ensure_parent(dest)
                    os.replace(name, dest)
                    print(f"  Reverted: {name.name} -> {dest.name}")
                    total_reverted += 1
        cur += dt.timedelta(days=1)
    return total_reverted


def revert_monthly(root: Path, t0: dt.datetime, t1: dt.datetime) -> Tuple[int, int]:
    """Moves data from .test.txt files back to main files based on date range."""
    files_processed = 0
    rows_moved = 0
    d0 = t0.date()
    d1 = (t1 - dt.timedelta(seconds=1)).date()
    monthly_dir = root / "data" / "monthly"
    if not monthly_dir.exists():
        return files_processed, rows_moved

    for yyyymm in months_between(d0, d1):
        for test_path in monthly_dir.glob(f"{yyyymm}_*.test.txt"):
            files_processed += 1
            src_path = remove_test_suffix(test_path)
            test_headers, test_data = read_csv_data(test_path)
            src_headers, src_data = read_csv_data(src_path)
            rows_to_revert, rows_to_keep = [], []
            for row in test_data:
                when = parse_local_datetime(row[1])
                if when and t0 <= when < t1:
                    rows_to_revert.append(row)
                else:
                    rows_to_keep.append(row)
            if not rows_to_revert:
                print(f"  No rows in range to revert for {test_path.name}.")
                continue
            
            final_src_headers = src_headers if src_headers else test_headers
            combined_src_data = src_data + rows_to_revert
            unique_src_rows = [list(r) for r in sorted(list({tuple(row) for row in combined_src_data}), key=lambda r: parse_local_datetime(r[1]))]
            
            write_csv_data(src_path, final_src_headers, unique_src_rows)

            if rows_to_keep:
                write_csv_data(test_path, test_headers, rows_to_keep)
            else:
                os.remove(test_path)
                print(f"  Removed empty file: {test_path.name}")

            rows_moved += len(rows_to_revert)
            print(f"  {test_path.name}: reverted {len(rows_to_revert)} rows to {src_path.name}.")
    return files_processed, rows_moved

# ---------------------------- CLI ---------------------------------

def main():
    """Main function to parse arguments and run processing."""
    ap = argparse.ArgumentParser(
        description="Mark or un-mark SQM test data.",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter
    )
    ap.add_argument("start", help="Start date YYYYMMDD (inclusive, Julian day starts at 12:00)")
    ap.add_argument("end", help="End date YYYYMMDD (inclusive, Julian day ends next day at 12:00)")
    ap.add_argument("--root", default=Path.cwd(), type=Path, help="Root folder containing the 'data' directory")
    ap.add_argument("--revert", action="store_true", help="Un-mark data in the date range instead of marking it.")
    args = ap.parse_args()

    try:
        t0, t1 = julian_window(args.start, args.end)
    except ValueError as e:
        print(f"Error: {e}")
        return

    print(f"Window: [{t0} -> {t1}) (Julian noon boundaries)")
    print(f"Root: {args.root.resolve()}")

    if args.revert:
        print("\n--- Reverting Daily Files (Un-marking) ---")
        daily_count = revert_daily(args.root, t0, t1)
        print(f"Daily summary: Reverted {daily_count} files.")

        print("\n--- Reverting Monthly Files (Un-marking) ---")
        monthly_files, moved_rows = revert_monthly(args.root, t0, t1)
        print(f"Monthly summary: Processed {monthly_files} .test.txt files, reverted {moved_rows} rows.")
    else:
        print("\n--- Processing Daily Files (Marking) ---")
        daily_count = process_daily(args.root, t0, t1)
        print(f"Daily summary: Renamed {daily_count} new files.")

        print("\n--- Processing Monthly Files (Marking) ---")
        monthly_files, moved_rows = process_monthly(args.root, t0, t1)
        print(f"Monthly summary: Processed {monthly_files} source files, moved {moved_rows} new rows to .test.txt files.")


if __name__ == "__main__":
    main()

setup.py

0 → 100644
+29 −0
Original line number Diff line number Diff line
from setuptools import setup

# Read the contents of the README file
with open("README.md", "r", encoding="utf-8") as fh:
    long_description = fh.read()

setup(
    name="pysqm_mark_test",
    version="1.0",
    author="Dario Barghini",
    author_email="dario.barghini@inaf.it",
    description="A tool to mark or un-mark test data in an SQM directory tree.",
    long_description=long_description,
    long_description_content_type="text/markdown",
    url="https://www.ict.inaf.it/gitlab/dario.barghini/pysqm_mark_test.git",
    py_modules=["mark_test_data"],
    classifiers=[
        "Programming Language :: Python :: 3",
        "License :: OSI Approved :: GNU General Public License (GPL)",
        "Operating System :: OS Independent",
        "Topic :: Utilities",
    ],
    python_requires='>=3.7',
    entry_points={
        'console_scripts': [
            'pysqm_mark_test = pysqm_mark_test:main',
        ],
    },
)