551 lines
16 KiB
Python
551 lines
16 KiB
Python
# from libs import lib_mariadb as mdb
|
|
# from libs import lib_csv as csv
|
|
# from libs import lib_matplotlib as mtp
|
|
import configparser as cp
|
|
import os
|
|
from sankeyflow import Sankey
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib.patches as mpatches
|
|
import re
|
|
import datetime
|
|
import mariadb
|
|
import sys
|
|
import math
|
|
import numpy as np
|
|
from colorsys import hls_to_rgb
|
|
#node_central = 'total'
|
|
|
|
class BudgetLine():
|
|
|
|
def __init__(self, desc:str, charge:int, revenue:int):
|
|
self.desc = desc
|
|
self.charge = charge
|
|
self.revenue = revenue
|
|
|
|
|
|
|
|
|
|
class DBInfos():
|
|
name:str
|
|
user:str
|
|
password:str
|
|
host:str
|
|
port:int
|
|
|
|
|
|
class Account():
|
|
def __init__(self, label:str, number:int, tmp:int):
|
|
self.label = label
|
|
self.number = number
|
|
self.type = tmp
|
|
|
|
|
|
def main(path_b:str,
|
|
path_wp:str,
|
|
save_dir:str):
|
|
budget=import_csv(path_b)
|
|
budget = sort_des_budget(budget, 0)
|
|
#create_budget_sankey(budget,save_dir)
|
|
db_infos = get_db_infos(path_wp)
|
|
cursor = connect_mariadb(db_infos)
|
|
create_accounting_sankey(cursor, save_dir)
|
|
|
|
finances, last_update = import_finances(cursor)
|
|
#finances = import_csv(path_b)
|
|
#finances = test_finance(finances)
|
|
#last_update = rf'2025-01-01'
|
|
finances = sort_des_budget(finances, 0)
|
|
tot_rev = get_total_revenues(finances)
|
|
tot_ch = get_total_charges(finances)
|
|
totals = get_totals_lists_for_charts(
|
|
tot_ch_bud = get_total_charges(budget),
|
|
tot_ch_real = tot_ch,
|
|
tot_re_bud = get_total_revenues(budget),
|
|
tot_re_real = tot_rev
|
|
)
|
|
revenue_ratios = get_revenue_ratios(tot_rev, finances)
|
|
charge_ratios = get_charge_ratios(tot_ch, finances)
|
|
create_total_diagram(save_dir, totals, last_update)
|
|
create_pie_diagrams(save_dir, revenue_ratios, charge_ratios, last_update)
|
|
print('oucou')
|
|
|
|
#connection à la db
|
|
# Lister les comptes qui m'intéressent
|
|
|
|
# Pour chaque compte 7 puis 6 :
|
|
# - calculer somme entre le 1 er janvier et ajd
|
|
# - ajouter valeur dans flow et nodes
|
|
|
|
|
|
def test_finance(budget:list[BudgetLine]):
|
|
coef = -500
|
|
for line in budget:
|
|
if coef > 0 :
|
|
if line.revenue == 0 :
|
|
line.charge += coef
|
|
else :
|
|
line.revenue += coef
|
|
if coef < 0 :
|
|
if line.revenue == 0 and line.charge > abs(coef) :
|
|
line.charge += coef
|
|
elif line.charge == 0 and line.revenue > abs(coef) :
|
|
line.revenue += coef
|
|
return budget
|
|
|
|
|
|
def sort_des_budget(budget:list[BudgetLine], li:int):
|
|
if li < len(budget)-1 :
|
|
ch = budget[li].charge
|
|
rev = budget[li].revenue
|
|
if (ch > 0 and ch >= budget[li+1].charge) or (ch == 0 and budget[li+1].charge ==0 and rev >=budget[li+1].revenue ):
|
|
budget = sort_des_budget(budget, li+1)
|
|
else :
|
|
if li == 0 :
|
|
budget = [budget[li+1], budget[li]] + budget[li+2:]
|
|
budget = sort_des_budget(budget, li)
|
|
elif li < len(budget) - 2 :
|
|
budget = budget[:li] + [budget[li+1], budget[li]] + budget[li+2:]
|
|
budget = sort_des_budget(budget, li-1)
|
|
else :
|
|
budget = budget[:li] + [budget[li+1], budget[li]]
|
|
budget = sort_des_budget(budget, li-1)
|
|
|
|
return budget
|
|
|
|
|
|
def get_revenue_ratios(tot:int, budget:list[BudgetLine]):
|
|
values = {}
|
|
oth = 0
|
|
coef = 4
|
|
i = 0
|
|
for line in budget:
|
|
if line.revenue >0 :
|
|
if i < coef:
|
|
values.update(
|
|
{line.desc:line.revenue}
|
|
)
|
|
i+=1
|
|
else:
|
|
oth += line.revenue
|
|
if oth > 0 :
|
|
values.update(
|
|
{'Autres':oth}
|
|
)
|
|
return values
|
|
|
|
|
|
def get_charge_ratios(tot:int, budget:list[BudgetLine]):
|
|
values = {}
|
|
oth = 0
|
|
coef = 4
|
|
i = 0
|
|
for line in budget:
|
|
if line.charge >0 :
|
|
if i < coef:
|
|
values.update(
|
|
{line.desc:line.charge}
|
|
)
|
|
i+=1
|
|
else:
|
|
oth += line.charge
|
|
if oth > 0 :
|
|
values.update(
|
|
{'Autres':oth}
|
|
)
|
|
return values
|
|
|
|
def import_finances(cursor):
|
|
# extraire le compte et la caisse au 1 er janvier de l'année
|
|
year = datetime.datetime.now().year
|
|
first_day = rf'{year}-01-01'
|
|
accounts = get_income_and_expense_accounts(cursor)
|
|
finances = []
|
|
last_update = first_day
|
|
for a in accounts:
|
|
tot_deb, tot_cred, last_entrie = get_sum_of_operations(
|
|
cursor,
|
|
first_day,
|
|
a
|
|
)
|
|
if a.type == 6 :
|
|
#charge
|
|
finances.append(BudgetLine(a.label, tot_deb - tot_cred, 0))
|
|
elif a.type == 7:
|
|
finances.append(BudgetLine(a.label, 0, tot_cred - tot_deb))
|
|
if last_entrie >= last_update :
|
|
last_update = last_entrie
|
|
|
|
return finances, last_update
|
|
|
|
|
|
def get_total_charges(budget:list[BudgetLine]):
|
|
tot = 0
|
|
for line in budget:
|
|
tot += line.charge
|
|
return int(tot)
|
|
|
|
|
|
def get_total_revenues(budget:list[BudgetLine]):
|
|
tot = 0
|
|
for line in budget:
|
|
tot += line.revenue
|
|
return int(tot)
|
|
|
|
|
|
def get_totals_lists_for_charts(tot_ch_bud, tot_ch_real, tot_re_bud, tot_re_real):
|
|
rest_ch = abs(tot_ch_bud - tot_ch_real)
|
|
rest_re = abs(tot_re_bud - tot_re_real)
|
|
if tot_ch_bud >= tot_ch_real and tot_re_bud >= tot_re_real:
|
|
|
|
values = {
|
|
f'actuel' : [tot_re_real, tot_ch_real],
|
|
f'budget' : [rest_re, rest_ch]
|
|
}
|
|
else :
|
|
values = {
|
|
f'bot_budget' : [int(0.98 * tot_re_bud), int(0.98 * tot_ch_bud)],
|
|
f'budget' : [int(0.02 *tot_re_bud), int(0.02 *tot_ch_bud)],
|
|
f'actuel' : [rest_re, rest_ch]
|
|
}
|
|
return values
|
|
|
|
|
|
def hls_to_rgbs(s_hsl):
|
|
rgb = hls_to_rgb(s_hsl["Hue"], s_hsl["Luminosité"], s_hsl["Saturation"] )
|
|
return "#" + "".join("%02X" % round(i*255) for i in rgb )
|
|
|
|
|
|
def get_color_list(nb, s_hsl, sens = "Blanc") :
|
|
temp = s_hsl
|
|
luminosite = temp["Luminosité"]
|
|
|
|
if sens == "Blanc":
|
|
ecart = (1-luminosite ) /(nb +1)
|
|
else :
|
|
ecart = luminosite / (nb + 1)
|
|
|
|
l_couleur = []
|
|
for x in range(nb) :
|
|
l_couleur.append(hls_to_rgbs(temp ))
|
|
if sens == "Blanc" :
|
|
luminosite += ecart
|
|
else :
|
|
print(luminosite)
|
|
luminosite -= ecart
|
|
temp["Luminosité"] = luminosite
|
|
return l_couleur
|
|
|
|
|
|
def create_pie_diagrams(path:str,
|
|
revenue_ratios:dict[str, int],
|
|
charge_ratios:dict[str, int],
|
|
last_update:str):
|
|
|
|
bleu_hls = {"Hue":190 / 360,
|
|
"Saturation":100/100,
|
|
"Luminosité":30/100}
|
|
|
|
marron_hls = {"Hue":23 / 360,
|
|
"Saturation":72/100,
|
|
"Luminosité":30/100}
|
|
|
|
fig, axs = plt.subplots(ncols=2,figsize=(10, 4), layout='constrained')
|
|
labels, values = transform_dict_for_diag(revenue_ratios)
|
|
colors = get_color_list(len(values),bleu_hls)
|
|
p = axs[0].pie(x=values, labels=labels, colors=colors)
|
|
|
|
labels, values = transform_dict_for_diag(charge_ratios)
|
|
colors = get_color_list(len(values),marron_hls)
|
|
p = axs[1].pie(x=values, labels=labels, colors=colors)
|
|
|
|
axs[1].set_title('Répartition des dépenses réalisées')
|
|
axs[0].set_title('Répartition des recettes réalisées')
|
|
path = os.path.join(path, 'ratios.svg')
|
|
plt.savefig(path)
|
|
plt.close()
|
|
|
|
|
|
def transform_dict_for_diag(dicts:dict):
|
|
labels = []
|
|
values = []
|
|
for label, value in dicts.items():
|
|
labels.append(f'{label}\n{value}€')
|
|
values.append(value)
|
|
|
|
return labels, values
|
|
|
|
|
|
def create_total_diagram(path:str,
|
|
totals: dict[str, list[int]],
|
|
last_upadate:str):
|
|
|
|
fig, ax = plt.subplots()
|
|
plt.grid(True, 'major', 'y', color="#555555")
|
|
width = 0.6
|
|
types = ('dépenses', 'revenus')
|
|
leg = [mpatches.Patch(color='#007e97', label=f'au {last_upadate}'),
|
|
mpatches.Patch(color='#999999', label='budgeté')
|
|
]
|
|
bottom = np.zeros(2)
|
|
if len(totals) ==2 :
|
|
col = ['#007e97', '#999999']
|
|
else:
|
|
col = ['#007e97', '#999999', '#007e97']
|
|
|
|
x = 0
|
|
for position, values in totals.items():
|
|
p = ax.bar(types, values, width, label=position, bottom=bottom, color=col[x] )
|
|
bottom += values
|
|
x+=1
|
|
|
|
#ax.invert_yaxis() # labels read top-to-bottom
|
|
|
|
ax.set_title('Total des dépenses et des revenus de 2025')
|
|
box = ax.get_position()
|
|
ax.set_position(
|
|
[box.x0, box.y0 + box.height * 0.1,
|
|
box.width, box.height * 0.9]# type: ignore
|
|
)
|
|
ax.legend(handles=leg, loc='upper center', bbox_to_anchor=(0.5, -0.07),
|
|
fancybox=True, shadow=True, ncol=2)
|
|
path = os.path.join(path, 'total.svg')
|
|
plt.savefig(path)
|
|
plt.close()
|
|
|
|
|
|
def create_accounting_sankey(cursor, save_dir):
|
|
NR = []
|
|
NC = []
|
|
flows = []
|
|
|
|
# extraire le compte et la caisse au 1 er janvier de l'année
|
|
year = datetime.datetime.now().year
|
|
first_day = rf'{year}-01-01'
|
|
reserves = int(get_bank_reserves(cursor, first_day))
|
|
ro_name = 'reserves au début de l\'année'
|
|
ra_name = 'reserves aujourd\'hui'
|
|
t_name = 'Total'
|
|
NR.append((ro_name, reserves, {'label_pos':'left', 'color':'#007e97'}))
|
|
flows.append((ro_name, t_name, reserves))
|
|
|
|
# lister les comptes de charges et produit
|
|
accounts = get_income_and_expense_accounts(cursor)
|
|
tmp_res = reserves
|
|
tot = reserves
|
|
for a in accounts:
|
|
tot_deb, tot_cred, last_entrie = get_sum_of_operations(
|
|
cursor,
|
|
first_day,
|
|
a
|
|
)
|
|
tmp_res = tmp_res + tot_cred - tot_deb
|
|
if a.type == 6 :
|
|
tmp_tot = tot_deb-tot_cred
|
|
if tmp_tot > 0 :
|
|
NC.append((a.label, tmp_tot, {'label_pos':'right', 'color':'#007e97' }))
|
|
flows.append((t_name, a.label, tmp_tot))
|
|
else :
|
|
tmp_tot = tot_cred-tot_deb
|
|
if tmp_tot > 0 :
|
|
tot += tmp_tot
|
|
NR.append((a.label, tmp_tot, {'label_pos':'left', 'color':'#007e97' }))
|
|
flows.append((a.label, t_name, tmp_tot))
|
|
|
|
NC.insert(0, (ra_name, tmp_res, {'label_pos':'right', 'color':'#007e97'}))
|
|
flows.insert(0, (t_name, ra_name, tmp_res))
|
|
nodes = [
|
|
NR,
|
|
[('Total', tot, {'label_pos':'top', 'color':'#007e97'})],
|
|
NC
|
|
]
|
|
|
|
plt.figure(figsize=(30, 10), dpi=144)
|
|
s = Sankey(
|
|
flows=flows,
|
|
nodes=nodes,
|
|
node_opts=dict(label_format='{label}:{value}')
|
|
)
|
|
s.draw()
|
|
path = os.path.join(save_dir, 'compta.svg')
|
|
plt.savefig(path)
|
|
plt.close()
|
|
|
|
|
|
def get_income_and_expense_accounts(cursor):
|
|
VERBOSE = False
|
|
if VERBOSE : print("get_income_and_expense_accounts")
|
|
cursor.execute(
|
|
"SELECT account_number, label FROM llx_accounting_account WHERE fk_pcg_version = 'PCG-BSC' AND active = 1;"
|
|
)
|
|
accounts = []
|
|
for account_number, label in cursor:
|
|
tmp = account_number[0]
|
|
if (tmp == '6' or tmp == '7') and int(account_number) > 10:
|
|
accounts.append(Account(label, account_number, int(tmp)))
|
|
|
|
if VERBOSE : [print(f'account {a.number} / {a.label}') for a in accounts]
|
|
return accounts
|
|
|
|
|
|
def get_bank_reserves(cursor, date:str):
|
|
VERBOSE = False
|
|
if VERBOSE : print('get_bank_reserves')
|
|
total = 0
|
|
cursor.execute(
|
|
"SELECT piece_num, numero_compte, label_compte, doc_date, code_journal, debit, credit FROM llx_accounting_bookkeeping WHERE (numero_compte = ? OR numero_compte = ?) AND doc_date = ? AND code_journal = ?;",
|
|
(5121, 5311, date, 'AN',)
|
|
)
|
|
for piece_num, numero_compte, label_compte, doc_date, code_journal, debit, credit in cursor:
|
|
#print(piece_num, numero_compte, label_compte, doc_date, code_journal, debit, credit, sep=" | ")
|
|
if VERBOSE : print(f'ajout de : {debit} provenant du compte {label_compte} au total')
|
|
total += debit
|
|
if VERBOSE : print(f'total = {total}')
|
|
return total
|
|
|
|
|
|
def connect_mariadb(db_infos:DBInfos):
|
|
try:
|
|
conn = mariadb.connect(
|
|
user=db_infos.user,
|
|
password=db_infos.password,
|
|
host=db_infos.host,
|
|
port=db_infos.port,
|
|
database=db_infos.name
|
|
|
|
)
|
|
except mariadb.Error as e:
|
|
print(f"Error connecting to MariaDB Platform: {e}")
|
|
sys.exit(1)
|
|
|
|
# Get Cursor
|
|
return conn.cursor()
|
|
|
|
|
|
def import_csv(csv:str):
|
|
with open(csv, "r") as file:
|
|
content = file.read()
|
|
values = []
|
|
for line in content.split('\n'):
|
|
tmp = line.split(';')
|
|
values.append(
|
|
BudgetLine(
|
|
desc=tmp[0],
|
|
charge=int(tmp[1]),
|
|
revenue=int(tmp[2])
|
|
)
|
|
)
|
|
return values
|
|
|
|
|
|
def extract_value(content:str, param:str):
|
|
tmp = re.search(rf'define...{param}.+', content)
|
|
if tmp is not None:
|
|
return tmp.group().split(',')[1].split('\'')[1]
|
|
else:
|
|
return ""
|
|
|
|
|
|
def get_sum_of_operations(cursor, date:str, account:Account):
|
|
VERBOSE = False
|
|
tot_cred = tot_deb = 0
|
|
cursor.execute(
|
|
"SELECT piece_num, numero_compte, label_compte, doc_date, code_journal, debit, credit FROM llx_accounting_bookkeeping WHERE numero_compte = ? AND doc_date >= ?",
|
|
(account.number, date)
|
|
)
|
|
last_entrie = date
|
|
for piece_num, numero_compte, label_compte, doc_date, code_journal, debit, credit in cursor :
|
|
tot_cred += credit
|
|
tot_deb += debit
|
|
if str(doc_date) > last_entrie :
|
|
last_entrie = str(doc_date)
|
|
|
|
if VERBOSE : print(f'pour le compte {account.number} : credit = {tot_cred} / debit = {tot_deb} à partir du {date}')
|
|
return int(tot_deb), int(tot_cred), last_entrie
|
|
|
|
|
|
def get_db_infos(path_wp:str):
|
|
with open(path_wp, "r") as file:
|
|
content = file.read()
|
|
db_infos = DBInfos()
|
|
db_infos.name = "doli_bsc"
|
|
db_infos.user = extract_value(content, 'DB_USER')
|
|
db_infos.password = extract_value(content, 'DB_PASSWORD')
|
|
db_infos.host = extract_value(content, 'DB_HOST')
|
|
db_infos.port = 3306
|
|
return db_infos
|
|
|
|
|
|
|
|
def get_budget_elements(budget:list[BudgetLine]):
|
|
nodes = []
|
|
flows=[]
|
|
total = 0
|
|
CL = []
|
|
RL = []
|
|
for line in budget:
|
|
charge = line.charge
|
|
desc = line.desc
|
|
revenue = line.revenue
|
|
if charge != 0 and revenue == 0:
|
|
total += charge
|
|
CL.append((desc, charge, {'label_pos':'right', 'color':'#007e97' }))
|
|
flows.append(('Total', desc, charge))
|
|
|
|
elif charge == 0 and revenue != 0:
|
|
RL.append((desc, revenue, {'label_pos':'left', 'color':'#007e97' }))
|
|
flows.append((desc, 'Total', revenue))
|
|
|
|
nodes = [
|
|
RL,
|
|
[('Total', total, {'label_pos':'top', 'color':'#007e97' })],
|
|
CL
|
|
]
|
|
|
|
return nodes, flows
|
|
|
|
|
|
def create_budget_sankey(
|
|
budget:list[BudgetLine],
|
|
save:str):
|
|
|
|
nodes, flows = get_budget_elements(budget)
|
|
plt.figure(figsize=(25, 10), dpi=144)
|
|
s = Sankey(
|
|
flows=flows,
|
|
nodes=nodes,
|
|
node_opts=dict(label_format='{label}:{value}')
|
|
)
|
|
s.draw()
|
|
path = os.path.join(save, 'budget.svg')
|
|
plt.savefig(path)
|
|
plt.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
VERBOSE = False
|
|
conf = cp.ConfigParser()
|
|
conf.read('config')
|
|
|
|
path_b = conf['path']['budget']
|
|
path_wp_config = conf['path']['wp-config']
|
|
path_save = conf['path']['save_directory']
|
|
|
|
|
|
|
|
if os.path.exists(path_b) and os.path.exists(path_wp_config):
|
|
print("les 2 fichiers budget et wp-config ont été trouvé")
|
|
main(path_b, path_wp_config, path_save)
|
|
else :
|
|
if not os.path.exists(path_b) :
|
|
msg = "le chemin indiqué pour le paramètre budget dans le fichier config est incorrect. Le document a été supprimé ou déplacer. Merci de préciser un autre chemin"
|
|
print(msg)
|
|
print(f'budget = {path_b}')
|
|
if not os.path.exists(path_wp_config) :
|
|
msg = "le chemin indiqué pour le paramètre wp-config dans le fichier config est incorrect. Le document a été supprimé ou déplacer. Merci de préciser un autre chemin"
|
|
print(msg)
|
|
print(f'wp-config = {path_wp_config}')
|
|
|
|
if VERBOSE :
|
|
respo = input('Appuyer sur entrée pour terminer')
|
|
|