Source code for VideoService.files_manager


#################################################*
##### ***  IMPORTS  *** #########################*
#################################################*

# *** External modules *** #
import cv2

# *** Python modules *** #
import shutil
import os
from datetime import datetime

# *** Internal modules *** #
from .video import Video
from .__errors__ import error_parser

#################################################*
##### ***  CODE  ***    #########################*
#################################################*

### *** FILE MANAGER CLASS *** ###
[docs]class FileManager: """ FileManager =========== Manages video and thumbnail files on a server and database. """ ## *** CLASS CONSTRUCTOR *** ## def __init__(self, UPLOADS: str = ".\\Uploads", VIDEOS: str = ".\\Database\\Videos", THUMBNAILS: str = ".\\Database\\THUMBNAILS" ) -> None: """ Class Constructor ================= Parameters ---------- UPLOADS : str, optional The path to the directory where the uploaded files are stored. VIDEOS : str, optional The path to the directory where the video files are stored. THUMBNAILS : str, optional The path to the directory where the thumbnail files are stored. """ # Set arguments as class attributes self.UPLOADS = UPLOADS self.VIDEOS = VIDEOS self.THUMBNAILS = THUMBNAILS ## *** CLASS METHODS *** ## # *** UPLOAD FILE TO SERVER AND DATABASE *** #
[docs] def upload_file(self, **kwargs) -> Video | dict: """ Upload File =========== Uploads a video and thumbnail files to the server and database. Parameters ---------- VIDEO_FILENAME : str The name of the video file. THUMBNAIL_FILENAME : str, optional The name of the thumbnail file. TITLE : str The title of the video. OWNER : str The username of the video owner. VISIBILITY : str The visibility of the video. DESCRIPTION : str, optional The description of the video. TAGS : list, optional The tags of the video. Returns ------- Video The video object if the files are uploaded successfully. dict A dictionary with an error message if the files are not uploaded. """ # Get the video and thumbnail filenames video_filename = kwargs.get("VIDEO_FILENAME") or None thumbnail_filename = kwargs.get("THUMBNAIL_FILENAME") or None # Check if the video filename is provided if not video_filename: # Return an error message if the video filename is not provided return { "message": "ERROR [FileManager]: 'VIDEO_FILENAME' is required", "status": 400 } # Get the next index for file naming index = self.__get_index() # Rename the files video_filename = self.__rename_file( file = video_filename, file_type = "video", index = index ) # Check if the thumbnail filename is provided if not thumbnail_filename: # Create a thumbnail if the thumbnail filename is not provided thumbnail_filename = self.__create_thumbnail( file = video_filename, index = index ) else: # Rename the thumbnail file if it is provided thumbnail_filename = self.__rename_file( file = thumbnail_filename, file_type = "thumbnail", index = index ) # Upload the files to the database self.__upload_to_database([video_filename, thumbnail_filename]) # Create a video object parameters video = { "TITLE": kwargs.get("TITLE"), "VIDEO_FILENAME": video_filename, "VIDEO_FILETYPE": video_filename.split('.')[1], "THUMBNAIL_FILENAME": thumbnail_filename, "THUMBNAIL_FILETYPE": thumbnail_filename.split('.')[1], "UPLOAD_DATE": datetime.now().strftime("%d/%m/%Y %H:%M:%S"), "OWNER": kwargs.get("OWNER"), "VISIBILITY": kwargs.get("VISIBILITY"), "LENGTH": self.__get_length(video_filename), "DESCRIPTION": kwargs.get("DESCRIPTION"), "TAGS": kwargs.get("TAGS") } # Return the video object return Video(**video)
# *** DELETE FILE FROM SERVER AND DATABASE *** #
[docs] @staticmethod def delete_file(file_name: str, directory: str) -> None: """ Delete File =========== Deletes a file from the server and database. Parameters ---------- file_name : str The name of the file to delete. directory : str The directory where the file is stored. """ try: # Get the file path file = os.path.join(directory, file_name) # Remove the file os.remove(file) except Exception as error: # Print the error message print(error_parser(error))
## *** PRIVATE METHODS *** ## # *** Upload files to the database *** # def __upload_to_database(self, files: list) -> None: try: # Iterate over the files for file in files: # Set the source and destiny paths source_dist = os.path.join(self.UPLOADS, file) # Set the destiny path destiny_dist = os.path.join( # Check if the file is a video or a thumbnail self.VIDEOS if "video" in file else self.THUMBNAILS, file ) # Move the file to the destiny shutil.move(source_dist, destiny_dist) except Exception as error: # Print the error message print(error_parser(error)) # *** Get the next index for file naming *** # def __get_index(self) -> int: # Set the index to 0 index: int = 0 try: # Get the files in the videos directory files = os.listdir(self.VIDEOS) # Get the last file file = files[-1] # Get the index from the file name index = int(file.split("_")[1].split(".")[0]) # Check if the index is valid if index < 0: raise ValueError("ERROR [FileManager]: Invalid index") except Exception as error: # Print the error message print(error_parser(error)) return index # *** Rename the file *** # def __rename_file(self, file_name: str, file_type: str, index: int) -> str: # Set the new name to None new_name: str = "" try: # Set the paths old_dist = os.path.join(self.UPLOADS, file_name) # Set the old path new_name = f"{file_type}_{index}.{file_name.split('.')[1]}" # Set the new name new_dist = os.path.join(self.UPLOADS, new_name) # Set the new path # Rename the file os.rename(old_dist, new_dist) except Exception as error: # Print the error message print(error_parser(error)) return new_name # *** Create a thumbnail *** # def __create_thumbnail(self, file_name: str, index: int) -> str: # Set the new name to None new_name: str = "" try: # Open the video video = os.path.join(self.UPLOADS, file_name) videoObj = cv2.VideoCapture(video) # Create a video object # Check if the video is opened if not videoObj.isOpened(): raise FileNotFoundError("ERROR [FileManager]: Video not found") # Get the frame located in the end of the first 10% of the video length = videoObj.get(cv2.CAP_PROP_FRAME_COUNT) if length == 0: raise ValueError("ERROR [FileManager]: Video has no frames") frame = length // 10 + 1 videoObj.set(cv2.CAP_PROP_POS_FRAMES, frame) success, image = videoObj.read() # Check if the frame was found if not success or image is None: raise RuntimeError("ERROR [FileManager]: Frame not found") # Create the thumbnail new_name = f"miniature_{index}.jpg" # Set the new name thumbnail_path = os.path.join(self.THUMBNAILS, new_name) # Set the thumbnail path cv2.imwrite(thumbnail_path, image) # Save the thumbnail except Exception as error: # Print the error message print(error_parser(error)) finally: # Release the video object if 'videoObj' in locals() and videoObj.isOpened(): videoObj.release() return new_name # *** Get the video length *** # def __get_length(self, file_name: str) -> int: # Set the duration to 0 duration: int = 0 try: # Open the video video = os.path.join(self.VIDEOS, file_name) videoObj = cv2.VideoCapture(video) # Create a video object # Check if the video is opened if not videoObj.isOpened(): raise FileNotFoundError("ERROR [FileManager]: Video not found") # Calculate the video duration in seconds using frames and fps length = videoObj.get(cv2.CAP_PROP_FRAME_COUNT) # Get the number of frames fps = videoObj.get(cv2.CAP_PROP_FPS) # Get the frames per second duration = length // fps # Calculate the duration except Exception as error: # Print the error message print(error_parser(error)) finally: # Release the video object if 'videoObj' in locals() and videoObj.isOpened(): videoObj.release() return duration