Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Oct 3, 2023
1 parent f817d71 commit 3b29216
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 24 deletions.
34 changes: 10 additions & 24 deletions dags/common/pull_ftp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import base64
import io
import os
import tarfile
import zipfile
from datetime import datetime
Expand All @@ -9,6 +8,7 @@
from common.ftp_service import FTPService
from common.repository import IRepository
from common.sftp_service import SFTPService
from common.utils import process_archive
from structlog import PrintLogger

SFTP_FTP_TYPE = (FTPService, SFTPService)
Expand All @@ -22,33 +22,19 @@ def migrate_files(
):
logger.msg("Processing files.", filenames=filenames)
extracted_filenames = []

for _file in filenames:
logger.msg("Getting file from SFTP.", file=_file)
file_bytes = s_ftp.get_file(_file)

if zipfile.is_zipfile(file_bytes):
file_bytes.seek(0)
with zipfile.ZipFile(file_bytes) as zip:
for zip_filename in zip.namelist():
file_prefix = ".".join(_file.split(".")[:-1])
zip_file_content = zip.read(zip_filename)
s3_filename = os.path.join(file_prefix, zip_filename)
repo.save(s3_filename, io.BytesIO(zip_file_content))
if repo.is_meta(s3_filename):
extracted_filenames.append("extracted/" + s3_filename)
repo.save(_file, file_bytes)

elif tarfile.is_tarfile(file_bytes):
file_bytes.seek(0)
with tarfile.open(fileobj=file_bytes, mode="r") as tar:
for tar_filename in tar.getnames():
file_prefix = ".".join(_file.split(".")[:-1])
tar_file_content = tar.extractfile(tar_filename).read()
s3_filename = os.path.join(file_prefix, tar_filename)
repo.save(s3_filename, io.BytesIO(tar_file_content))
if repo.is_meta(s3_filename):
extracted_filenames.append("extracted/" + s3_filename)
repo.save(_file, file_bytes)
if zipfile.is_zipfile(file_bytes) or tarfile.is_tarfile(file_bytes):
for (archive_file_content, s3_filename) in process_archive(
file_bytes=file_bytes, _file=_file
):
repo.save(s3_filename, io.BytesIO(archive_file_content))
if repo.is_meta(s3_filename):
extracted_filenames.append("extracted/" + s3_filename)
repo.save(_file, file_bytes)

else:
logger.info(
Expand Down
29 changes: 29 additions & 0 deletions dags/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import datetime
import json
import re
import tarfile
import xml.etree.ElementTree as ET
import zipfile
from ftplib import error_perm
from io import StringIO
from os.path import basename
Expand Down Expand Up @@ -198,3 +200,30 @@ def check_dagrun_state(dagrun: DagRun, not_allowed_states=[], allowed_states=[])
value = dagrun.get_state() == dag_run_states[allowed_state]
states_values.append(value)
return all(states_values)


def process_zip_file(file_bytes, _file):
file_bytes.seek(0)
with zipfile.ZipFile(file_bytes) as zip:
for zip_filename in zip.namelist():
file_prefix = ".".join(_file.split(".")[:-1])
zip_file_content = zip.read(zip_filename)
s3_filename = os.path.join(file_prefix, zip_filename)
yield (zip_file_content, s3_filename)


def process_tar_file(file_bytes, _file):
file_bytes.seek(0)
with tarfile.open(fileobj=file_bytes, mode="r") as tar:
for tar_filename in tar.getnames():
file_prefix = ".".join(_file.split(".")[:-1])
tar_file_content = tar.extractfile(tar_filename).read()
s3_filename = os.path.join(file_prefix, tar_filename)
yield (tar_file_content, s3_filename)


def process_archive(file_bytes, _file):
if zipfile.is_zipfile(file_bytes):
return process_zip_file(file_bytes, _file)
if tarfile.is_tarfile(file_bytes):
return process_tar_file(file_bytes, _file)

0 comments on commit 3b29216

Please sign in to comment.