Python 连接 MySQL 处理 Excel 报表(一键备份和查询)

Python 连接 MySQL 处理 Excel 报表(一键备份和查询)一键备份 Excel 数据到 MySQL 数据库。def import_data:from sqlalchemy import create_e

大家好,欢迎来到IT知识分享网。

自从学了Python和MySQL,再也不用为处理Exce报表而挠头了。下面请看笔者是如何处理 Excel报表的。

一、Excel 文件

F:/public/玉米报表.xls

Python 连接 MySQL 处理 Excel 报表(一键备份和查询)

内容如下图:成品入库表

Python 连接 MySQL 处理 Excel 报表(一键备份和查询)

成品出库表

Python 连接 MySQL 处理 Excel 报表(一键备份和查询)

二、需求

1、一键备份 Excel 数据到 MySQL 数据库

2、一键查询库存报表

三、使用展示

1、点击Run,一键运行

Python 连接 MySQL 处理 Excel 报表(一键备份和查询)

2、登录

选择第1项登录,没有用户名的,可以根据提示进行注册,具体注册方法,这里不再展示。

Python 连接 MySQL 处理 Excel 报表(一键备份和查询)

3、第1项查看数据库,第2项选择使用数据库,第3项创建数据库及表

Python 连接 MySQL 处理 Excel 报表(一键备份和查询)

4、选择使用的数据库,这里笔者选择 demdb。

Python 连接 MySQL 处理 Excel 报表(一键备份和查询)

5、选择第1项导入Excel 报表,可以重复导入,以更新数据表。

Python 连接 MySQL 处理 Excel 报表(一键备份和查询)

6、选择第5项,可以查询入库情况

Python 连接 MySQL 处理 Excel 报表(一键备份和查询)

7、选择第6项,可以查询出库情况

Python 连接 MySQL 处理 Excel 报表(一键备份和查询)

8、选择第7项,可以查询库存情况

Python 连接 MySQL 处理 Excel 报表(一键备份和查询)

四、代码

def import_data(user, password, db_name):

from sqlalchemy import create_engine

# import pandas as pd

try:

engine = create_engine(f”mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306/{db_name}”, echo=False)

df1 = pd.read_excel(“F:/public/玉米报表.xls”, sheet_name=”大包装入库”)

df2 = pd.read_excel(“F:/public/玉米报表.xls”, sheet_name=”小包装入库”)

df3 = pd.read_excel(“F:/public/玉米报表.xls”, sheet_name=”半成品入库”)

df4 = pd.read_excel(“F:/public/玉米报表.xls”, sheet_name=”成品入库”)

df5 = pd.read_excel(“F:/public/玉米报表.xls”, sheet_name=”成品出库”)

print(“read excel data successfully ! “)

# engine=create_engine(“postgresql+psycopg2://wyj:wyj@127.0.0.1:5432/corn”)

# engine = create_engine(“mysql+mysqlconnector://wyj:wyj@127.0.0.1:3306/corn”, echo=False)

# sql1 = “truncate table `大包装入库`;”

# sql2 = “truncate table `小包装入库`;”

# sql3 = “truncate table `半成品入库`;”

# sql4 = “truncate table `成品入库`;”

# sql5 = “truncate table `成品出库`;”

sql1 = “drop table if exists `big_bag_in`;”

sql2 = “drop table if exists `small_bag_in`;”

sql3 = “drop table if exists `semi_in`;”

sql4 = “drop table if exists `finished_in`;”

sql5 = “drop table if exists `finished_out`;”

with engine.connect() as conn:

conn.execute(sql1)

conn.execute(sql2)

conn.execute(sql3)

conn.execute(sql4)

conn.execute(sql5)

print(“executed successfully !”)

df1.to_sql(name=”big_bag_in”, con=engine, index=False, if_exists=”replace”)

df2.to_sql(name=”small_bag_in”, con=engine, index=False, if_exists=”replace”)

df3.to_sql(name=”semi_in”, con=engine, index=False, if_exists=”replace”)

df4.to_sql(name=”finished_in”, con=engine, index=False, if_exists=”replace”)

df5.to_sql(name=”finished_out”, con=engine, index=False, if_exists=”replace”)

print(“imported successfully !”)

except Exception as e:

print(e)

def query_data(user, password, db_name, tb_name):

from sqlalchemy import create_engine

# import pandas as pd

try:

engine = create_engine(f”mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306/{db_name}”, echo=False)

sql = f”select * from {tb_name} limit 5;”

df = pd.read_sql(sql, engine)

print(“””

查询结果

“””)

print(“————————————————-“)

print(df)

print(“————————————————-“)

except Exception as e:

print(e)

def delete_data(user, password, db_name, tb_name, tb_id, product_name):

from sqlalchemy import create_engine

try:

engine = create_engine(f”mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306″, echo=False)

with engine.connect() as conn:

conn.execute(f”use {db_name}”)

print(f”{db_name} selected !”)

sql1 = “show tables;”

result = conn.execute(sql1).fetchall()

print(result)

sql2 = f”delete from {tb_name} where sht_id = %s and product=%s”

paras = (tb_id, product_name)

conn.execute(sql2, paras)

print(“delete successfully !”)

except Exception as e:

print(e)

def update(user, password, db_name, tb_name, tb_id, product_name, new_qty1, new_qty2):

from sqlalchemy import create_engine

try:

engine = create_engine(f”mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306″, echo=False)

with engine.connect() as conn:

conn.execute(f”use {db_name}”)

print(f”{db_name} selected !”)

sql1 = “show tables;”

result = conn.execute(sql1).fetchall()

print(result)

sql2 = f”update {tb_name} set 发货=%s,实销=%s where sht_id = %s and product=%s”

paras = (new_qty1, new_qty2, tb_id, product_name)

conn.execute(sql2, paras)

print(“updated successfully !”)

except Exception as e:

print(e)

def stockIn_list(user, password, db_name, tb_in):

from sqlalchemy import create_engine

# import pandas as pd

try:

engine = create_engine(f”mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306/{db_name}”, echo=False)

sql = f”select product,small_bag,小袋数量 as 入库 from {tb_in} where product is not null;”

df = pd.read_sql(sql, engine)

print(“””

入库明细

“””)

print(“——————————————–“)

print(df)

print(“——————————————–“)

except Exception as e:

print(e)

def stockOut_list(user, password, db_name, tb_out):

from sqlalchemy import create_engine

# import pandas as pd

try:

engine = create_engine(f”mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306/{db_name}”, echo=False)

sql = f”select customer,product,包装类型,发货 as 出库 from {tb_out};”

df = pd.read_sql(sql, engine)

print(“””

出库明细

“””)

print(“——————————————–“)

print(df)

print(“——————————————–“)

except Exception as e:

print(e)

def inventory(user, password, db_name, tb_in, tb_out):

from sqlalchemy import create_engine

# import pandas as pd

pd.set_option(‘display.unicode.ambiguous_as_wide’, True)

pd.set_option(‘display.unicode.east_asian_width’, True)

pd.set_option(‘expand_frame_repr’, False)

pd.set_option(‘display.max_columns’, None, ‘display.max_rows’, None)

pd.set_option(‘display.max_rows’, 5000)

try:

engine = create_engine(f”mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306/{db_name}”, echo=False)

sql1 = “select product,description,小袋数量 as 入库 from %s where product is not null;” % tb_in

sql2 = “select product,description,发货 as 出库,其他出库 from %s;” % tb_out

df1 = pd.read_sql(sql1, engine)

df2 = pd.read_sql(sql2, engine)

df1.fillna(0, inplace=True)

df2.fillna(0, inplace=True)

print(“””

入库明细

“””)

print(“——————————————–“)

print(df1)

print(“——————————————–“)

print(“””

出库明细

“””)

print(“——————————————–“)

print(df2)

print(“——————————————–“)

df1 = df1.groupby([‘product’, ‘description’])[[‘入库’]].sum()

df2 = df2.groupby([‘product’, ‘description’])[[‘出库’, ‘其他出库’]].sum()

df = df1.merge(df2, how=’left’, left_on=[‘product’, ‘description’], right_on=[‘product’, ‘description’])

df.fillna(0, inplace=True)

df[‘库存’] = df[‘入库’] – df[‘出库’] – df[‘其他出库’]

df = pd.pivot_table(df, index=[‘product’, ‘description’], values=[‘入库’, ‘出库’, ‘其他出库’, ‘库存’], aggfunc=sum,

margins=True)

print(“””

库存汇总

“””)

print(“————————————————————“)

print(df)

print(“————————————————————“)

print(‘Finished !’)

except Exception as e:

print(e)

def show_databases(user, password):

from sqlalchemy import create_engine

# import pandas as pd

try:

engine = create_engine(f”mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306″, echo=False)

sql = “show databases;”

df = pd.read_sql(sql, engine)

print(df)

except Exception as e:

print(e)

def use_database(user, password, db_name):

from sqlalchemy import create_engine

try:

engine = create_engine(f”mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306″, echo=False)

with engine.connect() as conn:

conn.execute(f”use {db_name}”)

print(f”{db_name} selected !”)

sql = “show tables;”

result = conn.execute(sql).fetchall()

print(“table list:”)

print(“————-“)

for re in result:

print(re[0])

print(“————-“)

except Exception as e:

print(e)

def create_db(user, password, db_name, tb1_name, tb2_name):

from sqlalchemy import create_engine

try:

engine = create_engine(f”mysql+mysqlconnector://{user}:{password}@127.0.0.1:3306″, echo=False)

sql1 = f”create database if not exists {db_name};”

with engine.connect() as conn:

conn.execute(sql1)

conn.execute(“commit;”)

print(f”{db_name} created successfully !”)

with engine.connect() as conn:

conn.execute(f”use {db_name}”)

conn.execute(“commit;”)

sql2 = f”””create table if not exists {tb1_name}(

product_id int auto_increment primary key,

product_name varchar(25) not null ,

description varchar(50),

price decimal(4,2),

qty int not null default 0,

ctime datetime default current_timestamp on update current_timestamp,

utime datetime default current_timestamp on update current_timestamp

) engine=InnoDB default charset=utf8;

“””

sql3 = f”””create table if not exists {tb2_name}(

product_id int auto_increment primary key,

product_name varchar(25) not null ,

description varchar(50),

price decimal(4,2),

qty int not null default 0,

ctime datetime default current_timestamp on update current_timestamp,

utime datetime default current_timestamp on update current_timestamp

) engine=InnoDB default charset=utf8;

“””

with engine.connect() as conn:

conn.execute(sql2)

conn.execute(“commit;”)

print(f”{tb1_name} created successfully !”)

conn.execute(sql3)

print(f”{tb2_name} created successfully !”)

except Exception as e:

print(e)

def main():

# import pandas as pd

import login_func as login

import getpass

pd.set_option(‘display.unicode.ambiguous_as_wide’, True)

pd.set_option(‘display.unicode.east_asian_width’, True)

pd.set_option(‘expand_frame_repr’, False)

pd.set_option(‘display.max_columns’, None, ‘display.max_rows’, None)

pd.set_option(‘display.max_rows’, 5000)

msg1 = “””

登录界面

——————————————–

0. exit/quit

1. login

2. register

3. insert employee info

——————————————–

“””

msg2 = “””

欢迎进入商品进销存管理系统 2.0 版

————————————————

0. quit

1. show databases

2. use database

3. create new database and new table

————————————————-

“””

while True:

print(msg1)

choice = input(‘please input your choice: ‘)

if choice == ‘q’ or choice == ‘Q’ or choice == ‘0’:

print(‘quit !’)

break

elif choice == ‘1’:

user = input(“please input user name: “)

password = getpass.getpass(“enter password:”)

# password = input(“please input password: “)

is_login = login.login(user, password)

if not is_login:

print(“用户名或密码错误!”)

else:

print(f”{user} login successfully!”)

while True:

print(msg2)

handle = input(‘请选择操作项目: ‘)

if handle == ‘q’ or handle == ‘Q’ or handle == ‘0’:

print(‘quit !’)

break

elif handle == ‘1’:

user = ‘wyj’

password = ‘wyj’

show_databases(user, password)

elif handle == ‘2’:

user = ‘wyj’

password = ‘wyj’

db_name = input(“please select database name: “)

use_database(user, password, db_name)

msg3 = “””

商品管理系统 2.0 版

——————————–

0. exit

1. 导入 Excel 数据表

2. 查询

3. 删除数据

4. 修改数据

5. 入库查询

6. 出库查询

7. 库存查询

——————————–

“””

while True:

print(msg3)

num = input(‘请选择操作项目编号: ‘)

if num == ‘q’ or num == ‘Q’ or num == ‘0’:

print(‘退出系统 !’)

break

elif num == ‘1’:

import_data(user, password, db_name)

elif num == ‘2’:

print(“———-query table data———-“)

tb_name = input(“table name: “)

query_data(user, password, db_name, tb_name)

elif num == ‘3’:

print(“———-delete table data———-“)

tb_name = input(“table name: “)

tb_id = input(“table id: “)

product_name = input(“input product name: “)

delete_data(user, password, db_name, tb_name, tb_id, product_name)

elif num == ‘4’:

print(“———-update table data———-“)

tb_name = input(“please input table name: “)

tb_id = input(“please input table id: “)

product_name = input(“please input product name: “)

new_qty1 = input(“please input new qty1: “)

new_qty2 = input(“please input new qty2: “)

update(user, password, db_name, tb_name, tb_id, product_name, new_qty1, new_qty2)

elif num == ‘5’:

tb_in = “finished_in”

stockIn_list(user, password, db_name, tb_in)

elif num == ‘6’:

tb_out = “finished_out”

stockOut_list(user, password, db_name, tb_out)

elif num == ‘7’:

tb_in = “finished_in”

tb_out = “finished_out”

inventory(user, password, db_name, tb_in, tb_out)

else:

print(‘input error!,please input again !’)

elif handle == ‘3’:

user = ‘wyj’

password = ‘wyj’

db_name = input(“please input new database name: “)

tb1_name = input(“please input new table1 name: “)

tb2_name = input(“please input new table2 name: “)

create_db(user, password, db_name, tb1_name, tb2_name)

break

else:

print(‘Error! please input again !’)

elif choice == ‘2’:

register_name = input(“please input register name: “)

employee_id = input(“please input employee id : “)

is_user_exists = login.user_exists(register_name)

is_employee_exists = login.employee_exists(employee_id)

if is_user_exists:

print(“user exists ,can not register,please change another user name.”)

elif not is_employee_exists:

print(‘not employee , can not register.’)

else:

pwd1 = input(‘please input your password: ‘)

pwd2 = input(‘please confirm your password: ‘)

if pwd1 == pwd2:

login.register(register_name, pwd1)

print(“ok,registered successfully!”)

elif choice == ‘3’:

employee_id = input(“please input employee id: “)

employee_name = input(“please input employee name: “)

login.insert_employee_info(employee_name, employee_id)

else:

print(‘Error! please input again !’)

if __name__ == ‘__main__’:

import pandas as pd

pd.set_option(‘display.unicode.ambiguous_as_wide’, True)

pd.set_option(‘display.unicode.east_asian_width’, True)

pd.set_option(‘expand_frame_repr’, False)

pd.set_option(‘display.max_columns’, None, ‘display.max_rows’, None)

pd.set_option(‘display.max_rows’, 5000)

main()

好了,还不赶快去试一下。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/57354.html

(0)
上一篇 2024-07-12 19:33
下一篇 2024-07-12 21:26

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信