Browse Source

Version 2

new version of pytutils
master
Josue Gomez 1 year ago
parent
commit
2d12eeffd1
8 changed files with 220 additions and 26 deletions
  1. +4
    -0
      .gitignore
  2. +26
    -24
      __init__.py
  3. +55
    -0
      localenv.py
  4. +16
    -0
      oracle.py
  5. +3
    -1
      pysyspath2
  6. +3
    -1
      pysyspath3
  7. +52
    -0
      remote.py
  8. +61
    -0
      sendmail.py

+ 4
- 0
.gitignore View File

@@ -1,2 +1,6 @@

__pycache__/

\.idea/

*.pyc

+ 26
- 24
__init__.py View File

@@ -4,47 +4,49 @@ r"""
To use, simply 'import pyutils'

shellExecute can receive a cmd as str or arr example
>>> pyutils.shellExecute('date')
>>> pyutils.shell_execute('date')
(b'mi\xc3\xa9 ene 30 11:35:00 CST 2019\n', b'')
>>> pyutils.shellExecute(['echo',"'hola mundo'"])
>>> pyutils.shell_execute(['echo',"'hola mundo'"])
(b"'hola mundo'\n", b'')
"""

__author__ = 'Josue Gomez <jgomez@jesrat.com>'
__maintainer__ = "Josue Gomez"
__email__ = "jgomez@binkfe.com"
__license__ = "GPL"
__version__ = '2.0'
__all__ = [ 'resizeTTY', 'shellExecute', 'progressBar', 'getSensible', ]
__status__ = "production"
__date__ = "30 January 2019"
__all__ = ['', ]
__status__ = "production"
__date__ = "30 January 2019"

import os, sys, subprocess
from dotenv import load_dotenv

import sys
import subprocess

result = os.environ.get('MYENV')
load_dotenv(result)

def getSensible(key):
return os.environ.get(key)

def pysyspath():
print(sys.version)
for pth in sys.path:
print(pth)

def resizeTTY(rows, cols):

def resize_tty(rows, cols):
sys.stdout.write("\x1b[8;{rows};{cols}t".format(rows=rows, cols=cols))

def shellExecute(cmd,stdin=None):
rpt = subprocess.Popen(cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = rpt.communicate()
return stdout, stderr

def progressBar(progress, total, status=''):
ttySize = shellExecute(['stty','size'])
ttySize = ttySize[0].decode().split(' ')
barLen = round(int(ttySize[1])/100*90)
fillLen = int(round(barLen * progress / float(total)))
def shell_execute(cmd, stdin=None):
proc = subprocess.Popen(cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
return stdout, stderr, proc.returncode


def progress_bar(progress, total, status=''):
ttysize = shell_execute(['stty','size'])
ttysize = ttysize[0].decode().split(' ')
barlen = round(int(ttysize[1])/100*90)
fill_len = int(round(barlen * progress / float(total)))
percent = round(100.0 * progress / float(total), 1)
bar = '■' * fillLen + '-' * (barLen - fillLen)
bar = '■' * fill_len + '-' * (barlen - fill_len)
sys.stdout.write('[%s] %s%s ...%s\r' % (bar, percent, '%', status))
sys.stdout.flush()
sys.stdout.flush()

+ 55
- 0
localenv.py View File

@@ -0,0 +1,55 @@
import os
import sys


class NoEnvironmentFile(Exception):
pass


class LocalEnv:
def __init__(self):
self.file = None
self.data = {}

def load(self, file=None):
"""
If no file is defined, the .env file will be searched
in invoker module's directory
"""
if file is not None:
self.file = file
else:
self.file = self._invoker()

if not os.path.isfile(self.file):
raise NoEnvironmentFile(f'for file {self.file}')

with open(self.file) as f:
for line in f:
line = line.strip()
if not line or line.startswith('#') or '=' not in line:
continue
key, value = line.split('=', 1)
key = key.replace('export', '')
key = key.strip()
value = value.strip().strip('\'"')
self.data[key] = value

def get(self, key, cast=None):
if cast is None:
return self.data[key]

return cast(self.data[key])

@staticmethod
def _invoker():
# tip from:
# https://github.com/henriquebastos/python-decouple/blob/master/decouple.py
# MAGIC! Get the caller's module path.
frame = sys._getframe()
path = os.path.dirname(frame.f_back.f_back.f_code.co_filename)
file = os.path.join(path, '.env')
return file


localenv = LocalEnv()

+ 16
- 0
oracle.py View File

@@ -0,0 +1,16 @@
import cx_Oracle


class OraConn:
def __init__(self, str_conn, conn_name=None):
self.conn = cx_Oracle.connect(str_conn, encoding='UTF-8')
self.cur = self.conn.cursor()
if conn_name is not None:
self.cur.callproc('DBMS_APPLICATION_INFO.SET_MODULE', [conn_name, None])
self.cur.close()

def __enter__(self):
return self.conn

def __exit__(self, exc_type, exc_val, exc_tb):
self.conn.close()

+ 3
- 1
pysyspath2 View File

@@ -1,8 +1,10 @@
#!/usr/bin/env python2
import pyutils as utl


def main():
utl.pysyspath()


if __name__ == "__main__":
main()
main()

+ 3
- 1
pysyspath3 View File

@@ -1,8 +1,10 @@
#!/usr/bin/env python3
import pyutils as utl


def main():
utl.pysyspath()


if __name__ == "__main__":
main()
main()

+ 52
- 0
remote.py View File

@@ -0,0 +1,52 @@
import paramiko


class RemoteSSH:
def __init__(self, auth_info):
if not isinstance(auth_info, tuple) or len(auth_info) != 4:
raise AssertionError(f'expected tuple (host, port, user, password) instead got {auth_info}')

self.host = auth_info[0]
self.port = auth_info[1]
self.user = auth_info[2]
self.pssw = auth_info[3]
self.sock = paramiko.Transport((self.host, self.port))
self.sock.connect(username=self.user, password=self.pssw)

def __exit__(self, exc_type, exc_val, exc_tb):
self.sock.close()


class RemoteExecute(RemoteSSH):
def __init__(self, auth_info):
super().__init__(auth_info)
self.session = self.sock.open_channel(kind='session')

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.session.close()
super().__exit__(exc_type, exc_val, exc_tb)

def execute(self, command):
stdout = b''
self.session.exec_command(command)
while True:
stdout += self.session.recv(4096)
if self.session.exit_status_ready():
break
return stdout


class Sftp(RemoteSSH):
def __init__(self, auth_info):
super().__init__(auth_info)
self.sftp = paramiko.SFTPClient.from_transport(self.sock)

def __enter__(self):
return self.sftp

def __exit__(self, exc_type, exc_val, exc_tb):
self.sftp.close()
super().__exit__(exc_type, exc_val, exc_tb)

+ 61
- 0
sendmail.py View File

@@ -0,0 +1,61 @@
import smtplib
from os.path import basename
from email.mime.text import MIMEText
from email.utils import COMMASPACE
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication


class SendMail:
def __init__(self, auth_info):
if not isinstance(auth_info, tuple) or len(auth_info) != 4:
raise AssertionError(f'expected tuple (host, port, user, password) instead got {auth_info}')

self.host = auth_info[0]
self.port = auth_info[1]
self.user = auth_info[2]
self.pssw = auth_info[3]
self.conn = None
self.msg = None

def __enter__(self):
self.open_conn()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.conn.close()

def open_conn(self):
if int(self.port) == 465:
self.conn = smtplib.SMTP_SSL(self.host, 465)
else:
self.conn = smtplib.SMTP(self.host, self.port)
self.conn.starttls()
self.conn.login(self.user, self.pssw)

def content(self, fromaddr, toaddr, subject, msg):
if not isinstance(toaddr, list):
raise AssertionError('destination address should be a list []')
self.msg = MIMEMultipart()
self.msg['Subject'] = subject
self.msg['From'] = fromaddr
self.msg['To'] = COMMASPACE.join(toaddr)
self.msg.attach(MIMEText(msg, 'html'))

def attach(self, files):
if not isinstance(files, list):
raise AssertionError('file(s) to attach should be a list []')

for file in files:
filename = basename(file)
with open(file, "rb") as f:
fl = MIMEApplication(f.read(), Name=filename)
fl['Content-Disposition'] = f'attachment; filename="{filename}"'
self.msg.attach(fl)

print(self.msg)

def send(self):
if self.msg is None:
raise Exception('msg is not defined')
self.conn.send_message(self.msg)

Loading…
Cancel
Save