sgmp/solicitacoes/intf_sqlserver.py

614 lines
21 KiB
Python

# /SGMP_PROD/solicitacoes/intf_sqlserver.py
import pymssql
from django.conf import settings
def get_sqlserver_connection():
"""
Cria uma conexão com o banco SQL Server do TOTVS RM.
A configuração é obtida a partir do settings do Django,
permitindo separação por ambiente (dev, homologação, produção).
"""
return pymssql.connect(
server=settings.SQLSERVER_CONFIG["SERVER"],
port=settings.SQLSERVER_CONFIG["PORT"],
user=settings.SQLSERVER_CONFIG["USER"],
password=settings.SQLSERVER_CONFIG["PASSWORD"],
database=settings.SQLSERVER_CONFIG["DATABASE"],
)
def listar_para_selecionar_colaborador(apenas_desligados=False):
"""
Lista colaboradores do RM para seleção.
Args:
apenas_desligados (bool): Se True, retorna apenas pessoas DESLIGADAS
(útil para admissão por substituição).
Se False, retorna pessoas que não estão desligadas (padrão).
Returns:
list: Lista de dicionários com dados dos colaboradores
"""
# Define o filtro de situação baseado no parâmetro
if apenas_desligados:
# Para admissão por substituição: busca pessoas DESLIGADAS
# CODSITUACAO = 'D' e DATADEMISSAO IS NOT NULL (seguindo padrão do sgmp)
filtro_situacao = "PF.CODSITUACAO = 'D' AND PF.DATADEMISSAO IS NOT NULL"
else:
# Padrão: busca pessoas que não estão desligadas (inclui admitidos)
filtro_situacao = "PF.CODSITUACAO <> 'D'"
query = f"""
WITH SALDO_ATUAL AS (
SELECT
SB.CODCOLIGADA,
SB.CHAPA,
SB.INICIOPER,
SB.FIMPER,
((SB.EXTRAANT - SB.ATRASOANT - SB.FALTAANT) +
(SB.EXTRAATU - SB.ATRASOATU - SB.FALTAATU)) AS SALDO_MINUTOS,
ROW_NUMBER() OVER (
PARTITION BY SB.CODCOLIGADA, SB.CHAPA
ORDER BY SB.FIMPER DESC
) AS RN
FROM ASALDOBANCOHOR SB
)
SELECT
PF.CODCOLIGADA,
PF.CHAPA,
PF.NOME,
P.CPF,
PS.DESCRICAO AS SECAO,
PU.NOME AS FUNCAO,
PF.DATAADMISSAO,
PF.DATADEMISSAO,
PF.CODSITUACAO,
PF.CODSECAO,
PF.CODFUNCAO,
PF.SALARIO,
PF.CODSINDICATO,
SA.INICIOPER,
SA.FIMPER,
SA.SALDO_MINUTOS
FROM PFUNC PF
JOIN PPESSOA P
ON P.CODIGO = PF.CODPESSOA
JOIN PSECAO PS
ON PS.CODCOLIGADA = PF.CODCOLIGADA
AND PS.CODIGO = PF.CODSECAO
JOIN PFUNCAO PU
ON PU.CODCOLIGADA = PF.CODCOLIGADA
AND PU.CODIGO = PF.CODFUNCAO
LEFT JOIN SALDO_ATUAL SA
ON SA.CODCOLIGADA = PF.CODCOLIGADA
AND SA.CHAPA = PF.CHAPA
AND SA.RN = 1
WHERE
{filtro_situacao}
ORDER BY PF.NOME
"""
with get_sqlserver_connection() as conn:
cursor = conn.cursor(as_dict=True)
cursor.execute(query)
return cursor.fetchall()
def buscar_colaboradores_rm_desligados(nome: str = None):
"""
Busca colaboradores DESLIGADOS no RM para fluxo de Substituição.
Seguindo padrão do sgmp:
- CODSITUACAO = 'D'
- DATADEMISSAO IS NOT NULL
Args:
nome (str, optional): Nome para filtrar (busca parcial, case-insensitive)
Returns:
list: Lista de dicionários com dados dos colaboradores desligados
"""
query = """
SELECT
PF.CODCOLIGADA,
PF.CHAPA,
P.NOME,
P.CPF,
PS.DESCRICAO AS SECAO,
PU.NOME AS FUNCAO,
PF.DATAADMISSAO,
PF.DATADEMISSAO,
PF.CODSITUACAO,
PF.CODSECAO,
PF.CODFUNCAO,
PF.SALARIO,
PF.CODSINDICATO
FROM PFUNC PF
JOIN PPESSOA P
ON P.CODIGO = PF.CODPESSOA
JOIN PSECAO PS
ON PS.CODCOLIGADA = PF.CODCOLIGADA
AND PS.CODIGO = PF.CODSECAO
JOIN PFUNCAO PU
ON PU.CODCOLIGADA = PF.CODCOLIGADA
AND PU.CODIGO = PF.CODFUNCAO
WHERE
PF.CODSITUACAO = 'D'
AND PF.DATADEMISSAO IS NOT NULL
"""
params = []
if nome:
query += " AND LOWER(P.NOME) LIKE LOWER(%s)"
params.append(f"%{nome}%")
query += " ORDER BY P.NOME"
with get_sqlserver_connection() as conn:
cursor = conn.cursor(as_dict=True)
cursor.execute(query, tuple(params) if params else None)
return cursor.fetchall()
def listar_cargos_ativos_rm():
"""
Lista cargos existentes no RM com base em uso real:
- Apenas cargos associados a colaboradores ATIVOS
- Remove duplicidade por nome
- Não depende de codcoligada do usuário
Retorna lista simples para uso em selects.
"""
query = """
SELECT DISTINCT
PU.CODIGO AS cod_funcao,
PU.NOME AS nome_funcao,
PU.CARGO AS cargo
FROM PFUNC PF
JOIN PFUNCAO PU
ON PU.CODCOLIGADA = PF.CODCOLIGADA
AND PU.CODIGO = PF.CODFUNCAO
WHERE
PF.CODSITUACAO <> 'D'
AND PU.INATIVA = 0
ORDER BY PU.NOME
"""
with get_sqlserver_connection() as conn:
cursor = conn.cursor(as_dict=True)
cursor.execute(query)
rows = cursor.fetchall()
# Normalização leve
cargos = []
vistos = set()
for row in rows:
nome = (row.get("nome_funcao") or "").strip()
if not nome:
continue
chave = nome.lower()
if chave in vistos:
continue
vistos.add(chave)
cargos.append({
"codigo": row.get("cod_funcao"),
"nome": nome,
"cargo": row.get("cargo"),
})
return cargos
def listar_secoes_ativas_rm():
"""
Lista seções/centros de custo existentes no RM com base em uso real:
- Apenas seções associadas a colaboradores ATIVOS
- Remove duplicidade por nome
- Não depende de codcoligada do usuário
Retorna lista simples para uso em selects.
"""
query = """
SELECT DISTINCT
PS.CODIGO AS cod_secao,
PS.DESCRICAO AS descricao_secao
FROM PFUNC PF
JOIN PSECAO PS
ON PS.CODCOLIGADA = PF.CODCOLIGADA
AND PS.CODIGO = PF.CODSECAO
WHERE
PF.CODSITUACAO <> 'D'
ORDER BY PS.DESCRICAO
"""
with get_sqlserver_connection() as conn:
cursor = conn.cursor(as_dict=True)
cursor.execute(query)
rows = cursor.fetchall()
# Normalização leve
secoes = []
vistos = set()
for row in rows:
descricao = (row.get("descricao_secao") or "").strip()
if not descricao:
continue
chave = descricao.lower()
if chave in vistos:
continue
vistos.add(chave)
secoes.append({
"codigo": row.get("cod_secao"),
"descricao": descricao,
})
return secoes
def listar_coligadas_rm():
"""
Lista coligadas existentes no RM para uso em selects (ex.: formulário de admissão por aumento).
Tenta GCOLIGADA (NOME); senão usa CODCOLIGADA distintos de PFUNC.
Retorna lista de {"codigo": int, "nome": str}. Exclui sempre CODCOLIGADA=0 (inválido no RM).
"""
try:
query = """
SELECT CODCOLIGADA AS codigo, NOME AS nome
FROM GCOLIGADA
WHERE CODCOLIGADA <> 0 AND NOME IS NOT NULL
ORDER BY NOME
"""
with get_sqlserver_connection() as conn:
cursor = conn.cursor(as_dict=True)
cursor.execute(query)
rows = cursor.fetchall()
result = [
{"codigo": row["codigo"], "nome": (row.get("nome") or str(row["codigo"])).strip()}
for row in rows
if row.get("codigo") and ((row.get("nome") or str(row["codigo"])).strip())
]
except Exception:
query = """
SELECT DISTINCT CODCOLIGADA AS codigo
FROM PFUNC
WHERE CODSITUACAO <> 'D' AND CODCOLIGADA <> 0
ORDER BY CODCOLIGADA
"""
with get_sqlserver_connection() as conn:
cursor = conn.cursor(as_dict=True)
cursor.execute(query)
rows = cursor.fetchall()
result = [{"codigo": row["codigo"], "nome": str(row["codigo"])} for row in rows if row.get("codigo")]
return [r for r in result if r.get("codigo") != 0]
def listar_filiais_rm():
"""
Lista filiais existentes no RM para uso em selects (ex.: formulário de admissão por aumento).
Tenta GFILIAL; senão usa CODCOLIGADA/CODFILIAL distintos de PFUNC quando existir CODFILIAL.
Retorna lista de {"cod_coligada": int, "codigo": int, "nome": str} para permitir filtrar por coligada no front.
"""
try:
query = """
SELECT CODCOLIGADA AS cod_coligada, CODFILIAL AS codigo,
COALESCE(NOME, CAST(CODFILIAL AS VARCHAR)) AS nome
FROM GFILIAL
ORDER BY CODCOLIGADA, CODFILIAL
"""
with get_sqlserver_connection() as conn:
cursor = conn.cursor(as_dict=True)
cursor.execute(query)
rows = cursor.fetchall()
out = [
{
"cod_coligada": int(row["cod_coligada"]),
"codigo": int(row["codigo"]),
"nome": (row.get("nome") or str(row["codigo"])).strip(),
}
for row in rows
]
return out
except Exception:
try:
query = """
SELECT DISTINCT CODCOLIGADA AS cod_coligada, CODFILIAL AS codigo
FROM PFUNC
WHERE CODSITUACAO <> 'D'
ORDER BY CODCOLIGADA, CODFILIAL
"""
with get_sqlserver_connection() as conn:
cursor = conn.cursor(as_dict=True)
cursor.execute(query)
rows = cursor.fetchall()
out = [
{"cod_coligada": int(row["cod_coligada"]), "codigo": int(row["codigo"]), "nome": str(int(row["codigo"]))}
for row in rows
]
return out
except Exception:
return []
def verificar_estabilidades_colaborador(id_rm: str):
"""
Verifica todas as estabilidades legais/operacionais de um colaborador
que podem bloquear um desligamento.
O id_rm deve estar no formato "CODCOLIGADA-CHAPA" (ex: "3-052412")
Retorna uma lista de dicionários com informações sobre cada estabilidade encontrada.
Args:
id_rm: Identificador do colaborador no formato "CODCOLIGADA-CHAPA"
Returns:
list: Lista de dicionários com:
- tipo: Tipo de estabilidade (MATERNIDADE, PREVIDENCIARIO, ACIDENTE_TRABALHO, CIPA)
- bloqueado: bool - Se o desligamento está bloqueado
- data_fim: date - Data até quando a estabilidade vale (None se indefinida)
- mensagem: str - Mensagem explicativa para o usuário
- detalhes: dict - Detalhes adicionais do afastamento
"""
# Extrai codcoligada e chapa do id_rm
try:
codcoligada_str, chapa = id_rm.split('-', 1)
codcoligada = int(codcoligada_str)
except (ValueError, AttributeError):
# Se não conseguir extrair, retorna lista vazia
return []
# Chama a função auxiliar
return verificar_estabilidades_colaborador_por_codcoligada_chapa(codcoligada, chapa)
def _normalizar_para_date(valor):
"""
Normaliza um valor para datetime.date.
Aceita date, datetime, ou outros tipos que tenham método .date()
Garante que sempre retorne date (não datetime).
"""
from datetime import date, datetime
if valor is None:
return None
# Se já é date (e não datetime), retorna direto
if isinstance(valor, date):
# Verifica se é datetime (datetime é subclasse de date)
if isinstance(valor, datetime):
return valor.date()
return valor
# Se é datetime, converte para date
if isinstance(valor, datetime):
return valor.date()
# Se tem método date(), tenta usar
if hasattr(valor, 'date'):
try:
resultado = valor.date()
# Se o resultado ainda for datetime, converte novamente
if isinstance(resultado, datetime):
return resultado.date()
return resultado
except (AttributeError, TypeError):
pass
# Se chegou aqui e não conseguiu converter, retorna None ou tenta converter de outra forma
return valor
def verificar_estabilidades_colaborador_por_codcoligada_chapa(codcoligada: int, chapa: str):
"""
Versão alternativa que aceita codcoligada e chapa diretamente.
Usada internamente pela função principal.
"""
from datetime import date, timedelta
try:
from dateutil.relativedelta import relativedelta
except ImportError:
# Fallback se dateutil não estiver instalado
def relativedelta(**kwargs):
months = kwargs.get('months', 0)
if months:
# Aproximação simples: 30 dias por mês
return timedelta(days=months * 30)
return timedelta()
estabilidades = []
query = """
SELECT
HS.TIPO,
HS.MOTIVO,
HS.DTINICIO,
HS.DTFINAL,
HS.OBSERVACAO,
HS.CODESTABILIDADE,
HS.AFASTPELAPREVIDENCIA
FROM PFHSTAFT HS
WHERE
HS.CODCOLIGADA = %s
AND HS.CHAPA = %s
AND (
-- Licença Maternidade
(HS.TIPO = 'E' AND HS.MOTIVO = '03')
OR
-- Afastamento Previdenciário
(HS.TIPO = 'P' AND HS.AFASTPELAPREVIDENCIA = 1)
OR
-- Acidente de Trabalho
(HS.TIPO = 'T')
OR
-- Outros tipos que podem indicar estabilidade
(HS.CODESTABILIDADE IS NOT NULL AND HS.CODESTABILIDADE <> '')
)
ORDER BY HS.DTINICIO DESC
"""
with get_sqlserver_connection() as conn:
cursor = conn.cursor(as_dict=True)
cursor.execute(query, (codcoligada, chapa))
rows = cursor.fetchall()
hoje = date.today() # Garantir que hoje é sempre date
for row in rows:
tipo_afast = row.get('TIPO', '').upper()
motivo = row.get('MOTIVO', '')
observacao = (row.get('OBSERVACAO') or '').upper()
dt_inicio = row.get('DTINICIO')
dt_final = row.get('DTFINAL')
afast_previdencia = row.get('AFASTPELAPREVIDENCIA', 0)
# 1. LICENÇA MATERNIDADE
if tipo_afast == 'E' and motivo == '03' and 'MATERNIDADE' in observacao:
if dt_final:
# Estabilidade: durante licença + 30 dias após retorno
data_retorno = _normalizar_para_date(dt_final)
data_fim_estabilidade = data_retorno + timedelta(days=30)
# Garante que data_fim_estabilidade seja date
data_fim_estabilidade = _normalizar_para_date(data_fim_estabilidade)
bloqueado = hoje <= data_fim_estabilidade
estabilidades.append({
'tipo': 'MATERNIDADE',
'bloqueado': bloqueado,
'data_fim': data_fim_estabilidade,
'mensagem': f"Colaboradora em estabilidade por licença maternidade. O desligamento não é permitido até {data_fim_estabilidade.strftime('%d/%m/%Y')}.",
'detalhes': {
'dt_inicio': dt_inicio,
'dt_final': dt_final,
'observacao': row.get('OBSERVACAO')
}
})
else:
# Licença em andamento sem data final
estabilidades.append({
'tipo': 'MATERNIDADE',
'bloqueado': True,
'data_fim': None,
'mensagem': "Colaboradora em licença maternidade. O desligamento não é permitido durante a licença e até 30 dias após o retorno.",
'detalhes': {
'dt_inicio': dt_inicio,
'dt_final': dt_final,
'observacao': row.get('OBSERVACAO')
}
})
# 2. AFASTAMENTO PREVIDENCIÁRIO
elif tipo_afast == 'P' and afast_previdencia == 1:
if dt_final:
data_fim = _normalizar_para_date(dt_final)
bloqueado = hoje <= data_fim
else:
# Afastamento em andamento
bloqueado = True
data_fim = None
if bloqueado:
estabilidades.append({
'tipo': 'PREVIDENCIARIO',
'bloqueado': True,
'data_fim': data_fim,
'mensagem': "Colaborador encontra-se afastado pelo INSS. O desligamento só poderá ser avaliado após o término do afastamento." + (f" (até {data_fim.strftime('%d/%m/%Y')})" if data_fim else ""),
'detalhes': {
'dt_inicio': dt_inicio,
'dt_final': dt_final,
'observacao': row.get('OBSERVACAO')
}
})
# 3. ACIDENTE DE TRABALHO
elif tipo_afast == 'T':
if dt_final:
# Estabilidade de 12 meses após retorno
data_retorno = _normalizar_para_date(dt_final)
if data_retorno is None or not isinstance(data_retorno, date):
continue # Não conseguiu normalizar, pula este registro
# Soma os meses usando relativedelta
# IMPORTANTE: relativedelta pode retornar datetime mesmo quando somado a date
data_fim_estabilidade = data_retorno + relativedelta(months=12)
# Garante que seja sempre date (não datetime)
# relativedelta pode retornar datetime, então normaliza novamente
from datetime import datetime as dt
if isinstance(data_fim_estabilidade, dt):
data_fim_estabilidade = data_fim_estabilidade.date()
elif not isinstance(data_fim_estabilidade, date):
data_fim_estabilidade = _normalizar_para_date(data_fim_estabilidade)
if data_fim_estabilidade is None or not isinstance(data_fim_estabilidade, date):
continue # Não conseguiu normalizar, pula
bloqueado = hoje <= data_fim_estabilidade
if bloqueado:
estabilidades.append({
'tipo': 'ACIDENTE_TRABALHO',
'bloqueado': True,
'data_fim': data_fim_estabilidade,
'mensagem': f"Colaborador possui estabilidade por acidente de trabalho até {data_fim_estabilidade.strftime('%d/%m/%Y')}.",
'detalhes': {
'dt_inicio': dt_inicio,
'dt_final': dt_final,
'dt_retorno': data_retorno,
'observacao': row.get('OBSERVACAO')
}
})
# 4. CIPA (verificar em tabela separada se existir)
query_cipa = """
SELECT TOP 1
DTINICIO,
DTFINAL
FROM VCANDIDATOSCIPA
WHERE
CODCOLIGADA = %s
AND CHAPA = %s
AND (DTFINAL IS NULL OR DTFINAL >= GETDATE())
ORDER BY DTINICIO DESC
"""
try:
with get_sqlserver_connection() as conn:
cursor = conn.cursor(as_dict=True)
cursor.execute(query_cipa, (codcoligada, chapa))
cipa_row = cursor.fetchone()
if cipa_row:
dt_fim_cipa = cipa_row.get('DTFINAL')
if dt_fim_cipa:
data_fim = _normalizar_para_date(dt_fim_cipa)
bloqueado = hoje <= data_fim
else:
# CIPA em andamento
bloqueado = True
data_fim = None
if bloqueado:
estabilidades.append({
'tipo': 'CIPA',
'bloqueado': True,
'data_fim': data_fim,
'mensagem': "Colaborador possui estabilidade por participação na CIPA. Desligamento não permitido neste período." + (f" (até {data_fim.strftime('%d/%m/%Y')})" if data_fim else ""),
'detalhes': {
'dt_inicio': cipa_row.get('DTINICIO'),
'dt_final': dt_fim_cipa
}
})
except Exception:
# Se a tabela CIPA não existir, ignora silenciosamente
pass
return estabilidades