sgmp/solicitacoes/admin.py

339 lines
9.8 KiB
Python
Raw Permalink Normal View History

from django import forms
2026-03-09 18:46:01 +00:00
from django.contrib import admin, messages
from django.core.exceptions import ValidationError
from django.forms import BaseInlineFormSet
2026-03-09 18:46:01 +00:00
from django.shortcuts import redirect
from django.urls import path, reverse
2026-03-09 18:46:01 +00:00
from django.utils import timezone
from .intf_sqlserver import listar_para_selecionar_colaborador
from .intf_winthor import buscar_colaborador_oracle
from .models import (
ConfiguracaoSGMP,
HeadGestor,
PessoaRM,
Solicitacao,
UsuarioPerfilExtra,
UsuarioSistema,
)
class ConfiguracaoSGMPForm(forms.ModelForm):
perfis_gerenciar_permissoes = forms.MultipleChoiceField(
label="Perfis com acesso à tela Gerenciar Permissões (/permissoes/)",
choices=UsuarioSistema.Perfil.choices,
widget=forms.CheckboxSelectMultiple,
required=True,
)
class Meta:
model = ConfiguracaoSGMP
fields = ("perfis_gerenciar_permissoes",)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.instance.pk:
v = self.instance.perfis_gerenciar_permissoes
if v is not None:
self.initial["perfis_gerenciar_permissoes"] = list(v)
else:
self.initial.setdefault(
"perfis_gerenciar_permissoes",
[UsuarioSistema.Perfil.ADMIN],
)
def clean_perfis_gerenciar_permissoes(self):
data = self.cleaned_data.get("perfis_gerenciar_permissoes") or []
if not data:
raise ValidationError("Selecione ao menos um perfil.")
valid = {c[0] for c in UsuarioSistema.Perfil.choices}
cleaned = [x for x in data if x in valid]
if not cleaned:
raise ValidationError("Nenhum código de perfil válido selecionado.")
return cleaned
@admin.register(ConfiguracaoSGMP)
class ConfiguracaoSGMPAdmin(admin.ModelAdmin):
form = ConfiguracaoSGMPForm
list_display = ("__str__",)
def has_add_permission(self, request):
return not ConfiguracaoSGMP.objects.filter(pk=1).exists()
def has_delete_permission(self, request, obj=None):
return False
def changelist_view(self, request, extra_context=None):
if ConfiguracaoSGMP.objects.filter(pk=1).exists():
return redirect(reverse("admin:solicitacoes_configuracaosgmp_change", args=(1,)))
return super().changelist_view(request, extra_context)
2026-03-09 18:46:01 +00:00
@admin.register(PessoaRM)
class PessoaRMAdmin(admin.ModelAdmin):
list_display = (
"nome",
"matricula",
"cargo",
"setor",
"centro_custo",
"matricula_winthor",
"sincronizado_em",
)
search_fields = (
"nome",
"matricula",
"id_rm",
"matricula_winthor",
)
list_filter = (
"setor",
"cargo",
)
readonly_fields = (
"id_rm",
"sincronizado_em",
"criado_em",
"atualizado_em",
)
# Diz ao Django para usar um template customizado
change_list_template = "admin/solicitacoes/pessoarm/pessoarm_changelist.html"
def get_urls(self):
urls = super().get_urls()
custom_urls = [
path(
"sync-rm/",
self.admin_site.admin_view(self.sync_rm),
name="pessoarm-sync-rm",
),
]
return custom_urls + urls
def sync_rm(self, request):
dados_rm = listar_para_selecionar_colaborador()
criados = 0
atualizados = 0
winthor_ok = 0
winthor_erro = 0
winthor_sem_cpf = 0
agora = timezone.now()
for row in dados_rm:
id_rm = f"{row['CODCOLIGADA']}-{row['CHAPA']}"
matricula_winthor = None
# =========================
# Integração Winthor (CPF)
# =========================
cpf = row.get("CPF")
if cpf:
try:
dados_w = buscar_colaborador_oracle(cpf)
if dados_w and dados_w.get("matricula"):
matricula_winthor = dados_w["matricula"]
winthor_ok += 1
else:
winthor_erro += 1
except Exception:
winthor_erro += 1
else:
winthor_sem_cpf += 1
# =========================
# Sync RM (sempre executa)
# =========================
_, created = PessoaRM.objects.update_or_create(
id_rm=id_rm,
defaults={
"matricula": row["CHAPA"],
"nome": row["NOME"],
"cargo": row["FUNCAO"],
"setor": row["SECAO"],
"centro_custo": row["CODSECAO"],
"cpf": row["CPF"],
"data_admissao": row["DATAADMISSAO"],
"situacao": row["CODSITUACAO"],
"cod_funcao": row["CODFUNCAO"],
"salario": row["SALARIO"],
"cod_sindicato": row["CODSINDICATO"],
"saldo_banco_horas_minutos": row["SALDO_MINUTOS"],
"inicio_periodo_banco_horas": row["INICIOPER"],
"fim_periodo_banco_horas": row["FIMPER"],
"matricula_winthor": matricula_winthor,
"sincronizado_em": agora,
},
)
if created:
criados += 1
else:
atualizados += 1
# =========================
# Mensagens finais
# =========================
messages.success(
request,
(
"Sincronização RM concluída com sucesso. "
f"Criados: {criados} | Atualizados: {atualizados}"
),
)
if winthor_ok:
messages.info(
request,
f"Winthor: {winthor_ok} matrícula(s) sincronizada(s) com sucesso."
)
if winthor_sem_cpf:
messages.warning(
request,
f"Winthor: {winthor_sem_cpf} colaborador(es) sem CPF — ignorados."
)
if winthor_erro:
messages.warning(
request,
f"Winthor: {winthor_erro} falha(s) ao buscar matrícula."
)
return redirect("..")
class UsuarioPerfilExtraInlineFormSet(BaseInlineFormSet):
def clean(self):
super().clean()
vistos = set()
principal = self.data.get("perfil") or getattr(self.instance, "perfil", None)
for form in self.forms:
if not hasattr(form, "cleaned_data"):
continue
if form.cleaned_data.get("DELETE"):
continue
perfil_extra = form.cleaned_data.get("perfil")
if not perfil_extra:
continue
if perfil_extra == principal:
raise ValidationError(
"Perfil extra não pode ser igual ao perfil principal."
)
if perfil_extra in vistos:
raise ValidationError(f"Perfil extra duplicado: {perfil_extra}.")
vistos.add(perfil_extra)
class UsuarioPerfilExtraInline(admin.TabularInline):
model = UsuarioPerfilExtra
formset = UsuarioPerfilExtraInlineFormSet
extra = 0
fields = ("perfil", "criado_em")
readonly_fields = ("criado_em",)
2026-03-09 18:46:01 +00:00
@admin.register(UsuarioSistema)
class UsuarioSistemaAdmin(admin.ModelAdmin):
list_display = (
"nome",
"matricula",
"perfil",
"perfis_ativos_display",
2026-03-09 18:46:01 +00:00
"ativo",
"criado_em",
)
list_filter = (
"perfil",
"ativo",
)
search_fields = (
"nome",
"matricula",
)
readonly_fields = (
"criado_em",
"atualizado_em",
)
inlines = (UsuarioPerfilExtraInline,)
2026-03-09 18:46:01 +00:00
fieldsets = (
("Informações Básicas", {
"fields": ("matricula", "nome", "ativo")
}),
("Permissões", {
"fields": ("perfil",)
}),
("Auditoria", {
"fields": ("criado_em", "atualizado_em"),
"classes": ("collapse",)
}),
)
def get_queryset(self, request):
qs = super().get_queryset(request)
return qs.prefetch_related("perfis_extras")
@admin.display(description="Perfis ativos")
def perfis_ativos_display(self, obj):
codigos = obj.perfis_ativos()
labels = dict(UsuarioSistema.Perfil.choices)
resolved = [str(labels.get(c, c)) for c in codigos]
return ", ".join(resolved)
@admin.register(UsuarioPerfilExtra)
class UsuarioPerfilExtraAdmin(admin.ModelAdmin):
list_display = ("usuario", "perfil", "criado_em")
list_filter = ("perfil",)
search_fields = ("usuario__nome", "usuario__matricula")
autocomplete_fields = ("usuario",)
2026-03-09 18:46:01 +00:00
@admin.register(HeadGestor)
class HeadGestorAdmin(admin.ModelAdmin):
list_display = ("head", "gestor")
list_filter = ("head",)
search_fields = ("head__nome", "gestor__nome", "head__matricula", "gestor__matricula")
autocomplete_fields = ("head", "gestor")
@admin.register(Solicitacao)
class SolicitacaoAdmin(admin.ModelAdmin):
list_display = [
"id",
"tipo",
"status",
"funcionario",
"solicitante",
"criado_em",
]
list_filter = ("tipo", "status", "funcionario", "solicitante")
search_fields = ("id", "tipo", "status", "funcionario", "solicitante")
readonly_fields = ("id", "criado_em", "atualizado_em")
fieldsets = (
("Informações Básicas", {
"fields": ("id", "tipo", "status", "funcionario", "solicitante")
}),
("Auditoria", {
"fields": ("criado_em", "atualizado_em"),
"classes": ("collapse",)
}),
)