Source code for arrakis.utils.database

#!/usr/bin/env python
"""Database utilities"""

from __future__ import annotations

import warnings

import pymongo
from astropy.utils.exceptions import AstropyWarning
from pymongo.collection import Collection
from spectral_cube.utils import SpectralCubeWarning

from arrakis.logger import logger

warnings.filterwarnings(action="ignore", category=SpectralCubeWarning, append=True)
warnings.simplefilter("ignore", category=AstropyWarning)


[docs] def validate_sbid_field_pair(field_name: str, sbid: int, field_col: Collection) -> bool: """Validate field and sbid pair Args: field_name (str): Field name. sbid (int): SBID. field_col (Collection): Field collection. Raises: bool: If field name and sbid pair is valid. """ logger.info(f"Validating field name and SBID pair: {field_name}, {sbid}") field_data: dict | None = field_col.find_one({"SBID": sbid}) if field_data is None: raise ValueError(f"SBID {sbid} not found in database") return field_data["FIELD_NAME"] == field_name
[docs] def test_db( host: str, username: str | None = None, password: str | None = None ) -> bool: """Test connection to MongoDB Args: host (str): Mongo host IP. username (str, optional): Mongo username. Defaults to None. password (str, optional): Mongo password. Defaults to None. Returns: bool: True if connection successful Raises: Exception: If connection fails. """ logger.info("Testing MongoDB connection...") # default connection (ie, local) with pymongo.MongoClient( host=host, connect=False, username=username, password=password, authMechanism="SCRAM-SHA-256", ) as dbclient: # type: pymongo.MongoClient try: dbclient.list_database_names() except pymongo.errors.ServerSelectionTimeoutError: raise Exception("Please ensure 'mongod' is running") logger.info("MongoDB connection successful!") return True
[docs] def get_db( host: str, epoch: int, username: str | None = None, password: str | None = None, ) -> tuple[Collection, Collection, Collection]: """Get MongoDBs Args: host (str): Mongo host IP. username (str, optional): Username. Defaults to None. password (str, optional): Password. Defaults to None. Returns: Tuple[Collection, Collection, Collection]: beams_col, island_col, comp_col """ dbclient = pymongo.MongoClient( host=host, connect=True, username=username, password=password, authMechanism="SCRAM-SHA-256", ) mydb = dbclient[f"arrakis_epoch_{epoch}"] # Create/open database comp_col = mydb["components"] # Create/open collection island_col = mydb["islands"] # Create/open collection beams_col = mydb["beams"] # Create/open collection return beams_col, island_col, comp_col
[docs] def get_field_db(host: str, epoch: int, username=None, password=None) -> Collection: """Get MongoDBs Args: host (str): Mongo host IP. username (str, optional): Username. Defaults to None. password (str, optional): Password. Defaults to None. Returns: pymongo.Collection: beams_col, island_col, comp_col """ dbclient = pymongo.MongoClient( host=host, connect=False, username=username, password=password, authMechanism="SCRAM-SHA-256", ) # type: pymongo.MongoClient mydb = dbclient[f"arrakis_epoch_{epoch}"] # Create/open database field_col = mydb["fields"] # Create/open collection return field_col
[docs] def get_beam_inf_db(host: str, epoch: int, username=None, password=None) -> Collection: """Get MongoDBs Args: host (str): Mongo host IP. username (str, optional): Username. Defaults to None. password (str, optional): Password. Defaults to None. Returns: pymongo.Collection: beams_col, island_col, comp_col """ dbclient = pymongo.MongoClient( host=host, connect=False, username=username, password=password, authMechanism="SCRAM-SHA-256", ) # type: pymongo.MongoClient mydb = dbclient[f"arrakis_epoch_{epoch}"] # Create/open database beam_inf_col = mydb["beam_inf"] # Create/open collection return beam_inf_col