Loading Lasco.py 0 → 100644 +387 −0 Original line number Diff line number Diff line from astropy.io import fits import numpy as np import cupy as cp import os # <-- change import import urllib.request, urllib.error import time from pathlib import Path import shutil import pandas as pd import sys import requests from datetime import datetime, timedelta from bs4 import BeautifulSoup def download_file(url: str, dest_path: Path): """Download a single file, creating parent directories as needed.""" dest_path.parent.mkdir(parents=True, exist_ok=True) NetworkNOK = True while NetworkNOK: try: urllib.request.urlretrieve(url, dest_path) except: print("Network down, retrying in 15 seconds…") time.sleep(15) else: NetworkNOK = False def get_lasco_data(dateList, CI=['c2', 'c3'], archive=None): # presently we consider only one date at time # CI = ['c2', 'c3'] # instruments to download, can be 'c2', 'c3' or both # dateList = ['20150701', '20150702'] # list of dates to download in YYYYMMDD format # ------------------------------------------------------------------ # Check the date # ------------------------------------------------------------------ # Try a few common formats – adjust as needed. for date in dateList: for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%Y%m%d", "%Y %m %d"): try: dt = datetime.strptime(date, fmt) break except ValueError: continue else: raise ValueError(f"Unable to parse date string: {date}") year, month, day = dt.year, dt.month, dt.day # Format components exactly as IDL does y = f"{year:04d}"[2:] # last two digits m = f"{month:02d}" d = f"{day:02d}" for camera in CI: # ------------------------------------------------------------------ # 2. Build base URL # ------------------------------------------------------------------ base_url = f"https://umbra.nascom.nasa.gov/pub/lasco_level05/{y}{m}{d}/{camera}/" # ------------------------------------------------------------------ # 3. Download the header file (img_hdr.txt) # ------------------------------------------------------------------ hdr_local_dir = Path("data/lasco") / camera.upper() hdr_local_path = hdr_local_dir / "img_hdr.txt" hdr_url = base_url + "img_hdr.txt" print(f"Downloading header file from {hdr_url} …") try: download_file(hdr_url, dest_path=hdr_local_path) except: print('Error: ' + hdr_url + ' not fount, skipping') continue # ------------------------------------------------------------------ # 4. Parse the header file # ------------------------------------------------------------------ # Expected columns (IDL format string): # a, a, a, a, f, i, i, i, i, a, a, a # => filename, d, t, c, e (float), n1, n2, a, b (ints), f, p, pr (strings) # # We'll split on whitespace and cast accordingly. # selected_files = [] with open(hdr_local_path, "r") as f: for line in f: parts = line.strip().split() if len(parts) != 14: continue # skip malformed lines filename, _d, _t, _c, e_str, n1_str, n2_str, a_str, b_str, f_str, p_str, pr_str, z_str, os_num = parts # Apply the same filter as IDL: # (f == 'Orange' or f == 'Clear') and p == 'Clear' and pr == 'Normal' if (f_str in ("Orange", "Clear")) and (p_str == "Clear") and (pr_str == "Normal"): selected_files.append(filename) # ------------------------------------------------------------------ # 5. Download each selected image file # ------------------------------------------------------------------ for fname in selected_files: img_url = base_url + fname img_dest = hdr_local_dir / fname # Let's cjeck if the fname exists already in archive if Path(archive+os.sep+fname).is_file(): print(f"{fname} already exists, skipping download.") # move the existing file to the local archive shutil.move(archive+os.sep+fname,img_dest) else: print(f"Downloading {fname} …") download_file(img_url, img_dest) if Path(archive).is_dir(): a = Path(archive+f"{year:04d}"+os.sep+m+os.sep+d) a.mkdir(parents=True, exist_ok=True) shutil.move(hdr_local_dir,a) print(camera + " files have been downloaded.") print(date + " files have been downloaded.") print("All selected files have been downloaded.") def get_halo_list(): # ------------------------------------------------------------------ # 1 Fetch the HTML page # ------------------------------------------------------------------ URL = "https://cdaw.gsfc.nasa.gov/CME_list/halo/halo.html" try: resp = requests.get(URL, timeout=10) resp.raise_for_status() except Exception as exc: raise RuntimeError(f"Failed to download page: {exc}") html = resp.text # ------------------------------------------------------------------ # 2 Parse with BeautifulSoup (lxml parser is fast & tolerant) # ------------------------------------------------------------------ soup = BeautifulSoup(html, "lxml") def _extract_tables(soup_obj): """Return a list of raw HTML strings for each <table> found.""" tables_html = [] # 1) Normal <table> tags (just in case) for tbl in soup_obj.find_all("table"): tables_html.append(str(tbl)) # # 2) Tables that live inside HTML comments <!-- ... --> # # BeautifulSoup treats comment nodes as NavigableString objects. # for comment in soup_obj.find_all(string=lambda t: isinstance(t, type(soup_obj.Comment))): # # Parse the comment content as its own mini‑HTML document # comment_soup = BeautifulSoup(comment, "lxml") # for tbl in comment_soup.find_all("table"): # tables_html.append(str(tbl)) return tables_html raw_tables = _extract_tables(soup) if not raw_tables: raise RuntimeError("No <table> elements could be located on the page.") # ------------------------------------------------------------------ # 3 Convert each raw table HTML into a pandas DataFrame # ------------------------------------------------------------------ dfs = [] for idx, tbl_html in enumerate(raw_tables): # saveeach table to file for debbuging with open(f"halo_table_{idx+1}.html", "w") as f: f.write(tbl_html) # read_html expects an iterable of HTML strings; we give it a list with one element df_list = pd.read_html(tbl_html, header=0) # header=0 picks the first row as column names if df_list: dfs.append(df_list[0]) else: print(f"⚠️ Table #{idx} could not be parsed by pandas.") # At this point `dfs[0]` is the halo‑CME list, `dfs[1]` (if present) is the notes table. heather_df = dfs[0] # the main table you care about halo_df = dfs[1] if len(dfs) > 1 else None # rename column 'First C2 Appearance Date Time [UT]' to 'DATE_OBS' halo_df = halo_df.rename(columns={'First C2 Appearance Date Time [UT]': 'DATE_OBS'}) halo_df = halo_df.rename(columns={'First C2 Appearance Date Time [UT].1': 'TIME_OBS'}) # create a new column 'DATE_TIME' by merging DATE_OBS TIME_OBS in iso format halo_df['DATE_TIME'] = pd.to_datetime(halo_df['DATE_OBS'] + ' ' + halo_df['TIME_OBS'], format='%Y/%m/%d %H:%M:%S') # convert DATE_TIME to the format YYYY-MM-DDTHH:MM:SS halo_df['DATE_TIME'] = halo_df['DATE_TIME'].dt.strftime('%Y-%m-%dT%H:%M:%S') # keep only columns DATE_OBS, CDAW Speed [ print("Halo CME list has been downloaded.") halo_df.to_csv(archive + 'HaloList.csv', index=False) print('HaloList.csv file has been created.') # Lets create a class for LASCO images class LascoImage: def __init__(self, filepath): self.filepath = filepath self.hdul = fits.open(filepath) self.header = self.hdul[0].header def get_keyword(self, keyword): return self.header.get(keyword, None) def get_data(self): return self.hdul[0].data def close(self): self.hdul.close() def get_masks(self,cuda = False): # Center of the image cy, cx = 512, 512 ny, nx = 1024, 1024 # ------------------------------------------------- # Build radial distance map once # ------------------------------------------------- if cuda: y_idx, x_idx = cp.ogrid[:ny, :nx] # cheap broadcasting grids r = cp.sqrt((y_idx - cy)**2 + (x_idx - cx)**2) else: y_idx, x_idx = np.ogrid[:ny, :nx] # cheap broadcasting grids r = np.sqrt((y_idx - cy)**2 + (x_idx - cx)**2) # ------------------------------------------------- # Create boolean masks for each region # ------------------------------------------------- self.mask_DC = r > self.DcornersR # Dark corners self.mask_NC = r > self.NcornersR # Normal corners self.mask_OC = r < self.occulterR # Occulter self.mask_FOV = (r >= self.fovR[0]) & (r <= self.fovR[1]) # Field‑of‑view def compute_stat(self,DC,NC,OC,FOV,bias,nsigma): self.stat = {} self.stat_header = ['FILENAME','DC_mean','DC_count','DC_std', 'NC_mean','NC_count','NC_std', 'OC_mean','OC_count','OC_std', 'FOV_mean','FOV_count','FOV_std', 'bias_mean','bias_count','bias_std'] for nx in range(1, nsigma +1): self.stat_header.append(f'DC_{nx}sigma_count') self.stat_header.append(f'NC_{nx}sigma_count') self.stat_header.append(f'OC_{nx}sigma_count') self.stat_header.append(f'FOV_{nx}sigma_count') self.stat_header.append(f'bias_{nx}sigma_count') # compute mean and std self.stat['FILENAME'] = os.path.basename(self.filepath) if self.cuda: DCm = cp.mean(DC) if DC.size > 0 else np.nan NCm = cp.mean(NC) if NC.size > 0 else np.nan OCm = cp.mean(OC) if OC.size > 0 else np.nan FOVm = cp.mean(FOV) if FOV.size > 0 else np.nan biasm = cp.mean(bias) if bias.size > 0 else np.nan DCstd = cp.std(DC) if DC.size > 0 else np.nan NCstd = cp.std(NC) if NC.size > 0 else np.nan OCstd = cp.std(OC) if OC.size > 0 else np.nan FOVstd = cp.std(FOV) if FOV.size > 0 else np.nan biasstd = cp.std(bias) if bias.size > 0 else np.nan else: DCm = np.mean(DC) if DC.size > 0 else np.nan NCm = np.mean(NC) if NC.size > 0 else np.nan OCm = np.mean(OC) if OC.size > 0 else np.nan FOVm = np.mean(FOV) if FOV.size > 0 else np.nan biasm = np.mean(bias) if bias.size > 0 else np.nan DCstd = np.std(DC) if DC.size > 0 else np.nan NCstd = np.std(NC) if NC.size > 0 else np.nan OCstd = np.std(OC) if OC.size > 0 else np.nan FOVstd = np.std(FOV) if FOV.size > 0 else np.nan biasstd = np.std(bias) if bias.size > 0 else np.nan self.stat['DC_mean'] = DCm self.stat['DC_count'] = DC.size self.stat['NC_mean'] = NCm self.stat['NC_count'] = NC.size self.stat['OC_mean'] = OCm self.stat['OC_count'] = OC.size self.stat['FOV_mean'] = FOVm self.stat['FOV_count'] = FOV.size self.stat['bias_mean'] = biasm self.stat['bias_count'] = bias.size self.stat['DC_std'] = DCstd self.stat['NC_std'] = NCstd self.stat['OC_std'] = OCstd self.stat['FOV_std'] = FOVstd self.stat['bias_std'] = biasstd for nx in range(1, nsigma +1): # compute for each area the number of pixels haveing value > n*sigma from the mean if self.cuda: DCn = cp.sum(cp.abs(DC - DCm) > nx * DCstd) if DC.size > 0 else np.nan NCn = cp.sum(cp.abs(NC - NCm) > nx * NCstd) if NC.size > 0 else np.nan OCn = cp.sum(cp.abs(OC - OCm) > nx * OCstd) if OC.size > 0 else np.nan FOVn = cp.sum(cp.abs(FOV - FOVm) > nx * FOVstd) if FOV.size > 0 else np.nan biasn = cp.sum(cp.abs(bias - biasm) > nx * biasstd) if bias.size > 0 else np.nan else: DCn = np.sum(np.abs(DC - DCm) > nx * DCstd) if DC.size > 0 else np.nan NCn = np.sum(np.abs(NC - NCm) > nx * NCstd) if NC.size > 0 else np.nan OCn = np.sum(np.abs(OC - OCm) > nx * OCstd) if OC.size > 0 else np.nan FOVn = np.sum(np.abs(FOV - FOVm) > nx * FOVstd) if FOV.size > 0 else np.nan biasn = np.sum(np.abs(bias - biasm) > nx * biasstd) if bias.size > 0 else np.nan self.stat[f'DC_{nx}sigma_count'] = DCn self.stat[f'NC_{nx}sigma_count'] = NCn self.stat[f'OC_{nx}sigma_count'] = OCn self.stat[f'FOV_{nx}sigma_count'] = FOVn self.stat[f'bias_{nx}sigma_count'] = biasn # Let create a child class for C2 images class C2(LascoImage): def __init__(self, filepath, cuda=False): super().__init__(filepath) if self.get_keyword('DETECTOR') != 'C2': raise ValueError("The provided file is not a LASCO C2 image.") # Lasco C2 specific geometric parameters: # ------------------------------------------------- # Define the radii thresholds (same values you used) # ----------------------------------- self.DcornersR = 703 # pixels; everiting above this radius is a dark corner self.NcornersR = 653 # pixels; everything above this radius is a normal corner self.occulterR = 60 # pixels; everything below this radius is the occulter self.fovR = [150, 555] # pixels; field of view min and max radii self.cuda = cuda self.get_masks() # Add any C2 specific methods here def get_stat(self,nsigma=3,previous=None): # if previous is None, compute the statistics on the array given by self.get_data() # if previous is not None, compute the statistics on the array given by the difference of sel.get_data() - previous.get_data() if previous is None: if self.cuda: data = cp.asarray(self.get_data()) # Transfer data to GPU else: data = self.get_data() else: if self.cuda: data = cp.asarray(self.get_data()) - cp.asarray(previous.get_data()) # Transfer data to GPU and compute difference else: data = self.get_data() - previous.get_data() ny, nx = data.shape if nx != 1024 or ny != 1024: raise ValueError("Unexpected image dimensions ("+str(nx)+","+str(ny)+") for LASCO C2.") DC = data[self.mask_DC] NC = data[self.mask_NC] OC = data[self.mask_OC] FOV = data[self.mask_FOV] if self.cuda: # use cupy for GPU acceleration (less efficient than masks, but faster than pure numpy) bias = cp.concatenate([DC, OC]) else: bias = np.concatenate([DC, OC]) self.compute_stat(DC, NC, OC, FOV, bias, nsigma) # Let create a child class for C3 images class C3(LascoImage): def __init__(self, filepath, cuda=False): super().__init__(filepath) if self.get_keyword('DETECTOR') != 'C3': raise ValueError("The provided file is not a LASCO C3 image.") # Lasco C3 specific geometric parameters: # ------------------------------------------------- # Define the radii thresholds (same values you used) # ----------------------------------- self.DcornersR = 618 # pixels; everiting above this radius is a dark corner self.NcornersR = 582 # pixels; everything above this radius is a normal corner self.occulterR = 30 # pixels; everything below this radius is the occulter self.fovR = [90, 554] # pixels; field of view min and max radii self.cuda = cuda self.get_masks() # Add any C3 specific methods here def get_stat(self,nsigma=3.0,previous=None): # if previous is None, compute the statistics on the array given by self.get_data() # if previous is not None, compute the statistics on the array given by the difference of sel.get_data() - previous.get_data() if previous is None: if self.cuda: data = cp.asarray(self.get_data()) # Transfer data to GPU else: data = self.get_data() else: if self.cuda: data = cp.asarray(self.get_data()) - cp.asarray(previous.get_data()) # Transfer data to GPU and compute difference else: data = self.get_data() - previous.get_data() ny, nx = data.shape if nx != 1024 or ny != 1024: raise ValueError("Unexpected image dimensions ("+str(nx)+","+str(ny)+") for LASCO C2.") DC = data[self.mask_DC] NC = data[self.mask_NC] OC = data[self.mask_OC] FOV = data[self.mask_FOV] if self.cuda: # use cupy for GPU acceleration (less efficient than masks, but faster than pure numpy) bias = cp.concatenate([DC, OC]) else: bias = np.concatenate([DC, OC]) self.compute_stat(DC, NC, OC, FOV, bias, nsigma) Loading
Lasco.py 0 → 100644 +387 −0 Original line number Diff line number Diff line from astropy.io import fits import numpy as np import cupy as cp import os # <-- change import import urllib.request, urllib.error import time from pathlib import Path import shutil import pandas as pd import sys import requests from datetime import datetime, timedelta from bs4 import BeautifulSoup def download_file(url: str, dest_path: Path): """Download a single file, creating parent directories as needed.""" dest_path.parent.mkdir(parents=True, exist_ok=True) NetworkNOK = True while NetworkNOK: try: urllib.request.urlretrieve(url, dest_path) except: print("Network down, retrying in 15 seconds…") time.sleep(15) else: NetworkNOK = False def get_lasco_data(dateList, CI=['c2', 'c3'], archive=None): # presently we consider only one date at time # CI = ['c2', 'c3'] # instruments to download, can be 'c2', 'c3' or both # dateList = ['20150701', '20150702'] # list of dates to download in YYYYMMDD format # ------------------------------------------------------------------ # Check the date # ------------------------------------------------------------------ # Try a few common formats – adjust as needed. for date in dateList: for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%Y%m%d", "%Y %m %d"): try: dt = datetime.strptime(date, fmt) break except ValueError: continue else: raise ValueError(f"Unable to parse date string: {date}") year, month, day = dt.year, dt.month, dt.day # Format components exactly as IDL does y = f"{year:04d}"[2:] # last two digits m = f"{month:02d}" d = f"{day:02d}" for camera in CI: # ------------------------------------------------------------------ # 2. Build base URL # ------------------------------------------------------------------ base_url = f"https://umbra.nascom.nasa.gov/pub/lasco_level05/{y}{m}{d}/{camera}/" # ------------------------------------------------------------------ # 3. Download the header file (img_hdr.txt) # ------------------------------------------------------------------ hdr_local_dir = Path("data/lasco") / camera.upper() hdr_local_path = hdr_local_dir / "img_hdr.txt" hdr_url = base_url + "img_hdr.txt" print(f"Downloading header file from {hdr_url} …") try: download_file(hdr_url, dest_path=hdr_local_path) except: print('Error: ' + hdr_url + ' not fount, skipping') continue # ------------------------------------------------------------------ # 4. Parse the header file # ------------------------------------------------------------------ # Expected columns (IDL format string): # a, a, a, a, f, i, i, i, i, a, a, a # => filename, d, t, c, e (float), n1, n2, a, b (ints), f, p, pr (strings) # # We'll split on whitespace and cast accordingly. # selected_files = [] with open(hdr_local_path, "r") as f: for line in f: parts = line.strip().split() if len(parts) != 14: continue # skip malformed lines filename, _d, _t, _c, e_str, n1_str, n2_str, a_str, b_str, f_str, p_str, pr_str, z_str, os_num = parts # Apply the same filter as IDL: # (f == 'Orange' or f == 'Clear') and p == 'Clear' and pr == 'Normal' if (f_str in ("Orange", "Clear")) and (p_str == "Clear") and (pr_str == "Normal"): selected_files.append(filename) # ------------------------------------------------------------------ # 5. Download each selected image file # ------------------------------------------------------------------ for fname in selected_files: img_url = base_url + fname img_dest = hdr_local_dir / fname # Let's cjeck if the fname exists already in archive if Path(archive+os.sep+fname).is_file(): print(f"{fname} already exists, skipping download.") # move the existing file to the local archive shutil.move(archive+os.sep+fname,img_dest) else: print(f"Downloading {fname} …") download_file(img_url, img_dest) if Path(archive).is_dir(): a = Path(archive+f"{year:04d}"+os.sep+m+os.sep+d) a.mkdir(parents=True, exist_ok=True) shutil.move(hdr_local_dir,a) print(camera + " files have been downloaded.") print(date + " files have been downloaded.") print("All selected files have been downloaded.") def get_halo_list(): # ------------------------------------------------------------------ # 1 Fetch the HTML page # ------------------------------------------------------------------ URL = "https://cdaw.gsfc.nasa.gov/CME_list/halo/halo.html" try: resp = requests.get(URL, timeout=10) resp.raise_for_status() except Exception as exc: raise RuntimeError(f"Failed to download page: {exc}") html = resp.text # ------------------------------------------------------------------ # 2 Parse with BeautifulSoup (lxml parser is fast & tolerant) # ------------------------------------------------------------------ soup = BeautifulSoup(html, "lxml") def _extract_tables(soup_obj): """Return a list of raw HTML strings for each <table> found.""" tables_html = [] # 1) Normal <table> tags (just in case) for tbl in soup_obj.find_all("table"): tables_html.append(str(tbl)) # # 2) Tables that live inside HTML comments <!-- ... --> # # BeautifulSoup treats comment nodes as NavigableString objects. # for comment in soup_obj.find_all(string=lambda t: isinstance(t, type(soup_obj.Comment))): # # Parse the comment content as its own mini‑HTML document # comment_soup = BeautifulSoup(comment, "lxml") # for tbl in comment_soup.find_all("table"): # tables_html.append(str(tbl)) return tables_html raw_tables = _extract_tables(soup) if not raw_tables: raise RuntimeError("No <table> elements could be located on the page.") # ------------------------------------------------------------------ # 3 Convert each raw table HTML into a pandas DataFrame # ------------------------------------------------------------------ dfs = [] for idx, tbl_html in enumerate(raw_tables): # saveeach table to file for debbuging with open(f"halo_table_{idx+1}.html", "w") as f: f.write(tbl_html) # read_html expects an iterable of HTML strings; we give it a list with one element df_list = pd.read_html(tbl_html, header=0) # header=0 picks the first row as column names if df_list: dfs.append(df_list[0]) else: print(f"⚠️ Table #{idx} could not be parsed by pandas.") # At this point `dfs[0]` is the halo‑CME list, `dfs[1]` (if present) is the notes table. heather_df = dfs[0] # the main table you care about halo_df = dfs[1] if len(dfs) > 1 else None # rename column 'First C2 Appearance Date Time [UT]' to 'DATE_OBS' halo_df = halo_df.rename(columns={'First C2 Appearance Date Time [UT]': 'DATE_OBS'}) halo_df = halo_df.rename(columns={'First C2 Appearance Date Time [UT].1': 'TIME_OBS'}) # create a new column 'DATE_TIME' by merging DATE_OBS TIME_OBS in iso format halo_df['DATE_TIME'] = pd.to_datetime(halo_df['DATE_OBS'] + ' ' + halo_df['TIME_OBS'], format='%Y/%m/%d %H:%M:%S') # convert DATE_TIME to the format YYYY-MM-DDTHH:MM:SS halo_df['DATE_TIME'] = halo_df['DATE_TIME'].dt.strftime('%Y-%m-%dT%H:%M:%S') # keep only columns DATE_OBS, CDAW Speed [ print("Halo CME list has been downloaded.") halo_df.to_csv(archive + 'HaloList.csv', index=False) print('HaloList.csv file has been created.') # Lets create a class for LASCO images class LascoImage: def __init__(self, filepath): self.filepath = filepath self.hdul = fits.open(filepath) self.header = self.hdul[0].header def get_keyword(self, keyword): return self.header.get(keyword, None) def get_data(self): return self.hdul[0].data def close(self): self.hdul.close() def get_masks(self,cuda = False): # Center of the image cy, cx = 512, 512 ny, nx = 1024, 1024 # ------------------------------------------------- # Build radial distance map once # ------------------------------------------------- if cuda: y_idx, x_idx = cp.ogrid[:ny, :nx] # cheap broadcasting grids r = cp.sqrt((y_idx - cy)**2 + (x_idx - cx)**2) else: y_idx, x_idx = np.ogrid[:ny, :nx] # cheap broadcasting grids r = np.sqrt((y_idx - cy)**2 + (x_idx - cx)**2) # ------------------------------------------------- # Create boolean masks for each region # ------------------------------------------------- self.mask_DC = r > self.DcornersR # Dark corners self.mask_NC = r > self.NcornersR # Normal corners self.mask_OC = r < self.occulterR # Occulter self.mask_FOV = (r >= self.fovR[0]) & (r <= self.fovR[1]) # Field‑of‑view def compute_stat(self,DC,NC,OC,FOV,bias,nsigma): self.stat = {} self.stat_header = ['FILENAME','DC_mean','DC_count','DC_std', 'NC_mean','NC_count','NC_std', 'OC_mean','OC_count','OC_std', 'FOV_mean','FOV_count','FOV_std', 'bias_mean','bias_count','bias_std'] for nx in range(1, nsigma +1): self.stat_header.append(f'DC_{nx}sigma_count') self.stat_header.append(f'NC_{nx}sigma_count') self.stat_header.append(f'OC_{nx}sigma_count') self.stat_header.append(f'FOV_{nx}sigma_count') self.stat_header.append(f'bias_{nx}sigma_count') # compute mean and std self.stat['FILENAME'] = os.path.basename(self.filepath) if self.cuda: DCm = cp.mean(DC) if DC.size > 0 else np.nan NCm = cp.mean(NC) if NC.size > 0 else np.nan OCm = cp.mean(OC) if OC.size > 0 else np.nan FOVm = cp.mean(FOV) if FOV.size > 0 else np.nan biasm = cp.mean(bias) if bias.size > 0 else np.nan DCstd = cp.std(DC) if DC.size > 0 else np.nan NCstd = cp.std(NC) if NC.size > 0 else np.nan OCstd = cp.std(OC) if OC.size > 0 else np.nan FOVstd = cp.std(FOV) if FOV.size > 0 else np.nan biasstd = cp.std(bias) if bias.size > 0 else np.nan else: DCm = np.mean(DC) if DC.size > 0 else np.nan NCm = np.mean(NC) if NC.size > 0 else np.nan OCm = np.mean(OC) if OC.size > 0 else np.nan FOVm = np.mean(FOV) if FOV.size > 0 else np.nan biasm = np.mean(bias) if bias.size > 0 else np.nan DCstd = np.std(DC) if DC.size > 0 else np.nan NCstd = np.std(NC) if NC.size > 0 else np.nan OCstd = np.std(OC) if OC.size > 0 else np.nan FOVstd = np.std(FOV) if FOV.size > 0 else np.nan biasstd = np.std(bias) if bias.size > 0 else np.nan self.stat['DC_mean'] = DCm self.stat['DC_count'] = DC.size self.stat['NC_mean'] = NCm self.stat['NC_count'] = NC.size self.stat['OC_mean'] = OCm self.stat['OC_count'] = OC.size self.stat['FOV_mean'] = FOVm self.stat['FOV_count'] = FOV.size self.stat['bias_mean'] = biasm self.stat['bias_count'] = bias.size self.stat['DC_std'] = DCstd self.stat['NC_std'] = NCstd self.stat['OC_std'] = OCstd self.stat['FOV_std'] = FOVstd self.stat['bias_std'] = biasstd for nx in range(1, nsigma +1): # compute for each area the number of pixels haveing value > n*sigma from the mean if self.cuda: DCn = cp.sum(cp.abs(DC - DCm) > nx * DCstd) if DC.size > 0 else np.nan NCn = cp.sum(cp.abs(NC - NCm) > nx * NCstd) if NC.size > 0 else np.nan OCn = cp.sum(cp.abs(OC - OCm) > nx * OCstd) if OC.size > 0 else np.nan FOVn = cp.sum(cp.abs(FOV - FOVm) > nx * FOVstd) if FOV.size > 0 else np.nan biasn = cp.sum(cp.abs(bias - biasm) > nx * biasstd) if bias.size > 0 else np.nan else: DCn = np.sum(np.abs(DC - DCm) > nx * DCstd) if DC.size > 0 else np.nan NCn = np.sum(np.abs(NC - NCm) > nx * NCstd) if NC.size > 0 else np.nan OCn = np.sum(np.abs(OC - OCm) > nx * OCstd) if OC.size > 0 else np.nan FOVn = np.sum(np.abs(FOV - FOVm) > nx * FOVstd) if FOV.size > 0 else np.nan biasn = np.sum(np.abs(bias - biasm) > nx * biasstd) if bias.size > 0 else np.nan self.stat[f'DC_{nx}sigma_count'] = DCn self.stat[f'NC_{nx}sigma_count'] = NCn self.stat[f'OC_{nx}sigma_count'] = OCn self.stat[f'FOV_{nx}sigma_count'] = FOVn self.stat[f'bias_{nx}sigma_count'] = biasn # Let create a child class for C2 images class C2(LascoImage): def __init__(self, filepath, cuda=False): super().__init__(filepath) if self.get_keyword('DETECTOR') != 'C2': raise ValueError("The provided file is not a LASCO C2 image.") # Lasco C2 specific geometric parameters: # ------------------------------------------------- # Define the radii thresholds (same values you used) # ----------------------------------- self.DcornersR = 703 # pixels; everiting above this radius is a dark corner self.NcornersR = 653 # pixels; everything above this radius is a normal corner self.occulterR = 60 # pixels; everything below this radius is the occulter self.fovR = [150, 555] # pixels; field of view min and max radii self.cuda = cuda self.get_masks() # Add any C2 specific methods here def get_stat(self,nsigma=3,previous=None): # if previous is None, compute the statistics on the array given by self.get_data() # if previous is not None, compute the statistics on the array given by the difference of sel.get_data() - previous.get_data() if previous is None: if self.cuda: data = cp.asarray(self.get_data()) # Transfer data to GPU else: data = self.get_data() else: if self.cuda: data = cp.asarray(self.get_data()) - cp.asarray(previous.get_data()) # Transfer data to GPU and compute difference else: data = self.get_data() - previous.get_data() ny, nx = data.shape if nx != 1024 or ny != 1024: raise ValueError("Unexpected image dimensions ("+str(nx)+","+str(ny)+") for LASCO C2.") DC = data[self.mask_DC] NC = data[self.mask_NC] OC = data[self.mask_OC] FOV = data[self.mask_FOV] if self.cuda: # use cupy for GPU acceleration (less efficient than masks, but faster than pure numpy) bias = cp.concatenate([DC, OC]) else: bias = np.concatenate([DC, OC]) self.compute_stat(DC, NC, OC, FOV, bias, nsigma) # Let create a child class for C3 images class C3(LascoImage): def __init__(self, filepath, cuda=False): super().__init__(filepath) if self.get_keyword('DETECTOR') != 'C3': raise ValueError("The provided file is not a LASCO C3 image.") # Lasco C3 specific geometric parameters: # ------------------------------------------------- # Define the radii thresholds (same values you used) # ----------------------------------- self.DcornersR = 618 # pixels; everiting above this radius is a dark corner self.NcornersR = 582 # pixels; everything above this radius is a normal corner self.occulterR = 30 # pixels; everything below this radius is the occulter self.fovR = [90, 554] # pixels; field of view min and max radii self.cuda = cuda self.get_masks() # Add any C3 specific methods here def get_stat(self,nsigma=3.0,previous=None): # if previous is None, compute the statistics on the array given by self.get_data() # if previous is not None, compute the statistics on the array given by the difference of sel.get_data() - previous.get_data() if previous is None: if self.cuda: data = cp.asarray(self.get_data()) # Transfer data to GPU else: data = self.get_data() else: if self.cuda: data = cp.asarray(self.get_data()) - cp.asarray(previous.get_data()) # Transfer data to GPU and compute difference else: data = self.get_data() - previous.get_data() ny, nx = data.shape if nx != 1024 or ny != 1024: raise ValueError("Unexpected image dimensions ("+str(nx)+","+str(ny)+") for LASCO C2.") DC = data[self.mask_DC] NC = data[self.mask_NC] OC = data[self.mask_OC] FOV = data[self.mask_FOV] if self.cuda: # use cupy for GPU acceleration (less efficient than masks, but faster than pure numpy) bias = cp.concatenate([DC, OC]) else: bias = np.concatenate([DC, OC]) self.compute_stat(DC, NC, OC, FOV, bias, nsigma)