# /SGMP_PROD/solicitacoes/intf_sqlserver.py import pymssql from django.conf import settings def get_sqlserver_connection(): """ Cria uma conexão com o banco SQL Server do TOTVS RM. A configuração é obtida a partir do settings do Django, permitindo separação por ambiente (dev, homologação, produção). """ return pymssql.connect( server=settings.SQLSERVER_CONFIG["SERVER"], port=settings.SQLSERVER_CONFIG["PORT"], user=settings.SQLSERVER_CONFIG["USER"], password=settings.SQLSERVER_CONFIG["PASSWORD"], database=settings.SQLSERVER_CONFIG["DATABASE"], ) def listar_para_selecionar_colaborador(apenas_desligados=False): """ Lista colaboradores do RM para seleção. Args: apenas_desligados (bool): Se True, retorna apenas pessoas DESLIGADAS (útil para admissão por substituição). Se False, retorna pessoas que não estão desligadas (padrão). Returns: list: Lista de dicionários com dados dos colaboradores """ # Define o filtro de situação baseado no parâmetro if apenas_desligados: # Para admissão por substituição: busca pessoas DESLIGADAS # CODSITUACAO = 'D' e DATADEMISSAO IS NOT NULL (seguindo padrão do sgmp) filtro_situacao = "PF.CODSITUACAO = 'D' AND PF.DATADEMISSAO IS NOT NULL" else: # Padrão: busca pessoas que não estão desligadas (inclui admitidos) filtro_situacao = "PF.CODSITUACAO <> 'D'" query = f""" WITH SALDO_ATUAL AS ( SELECT SB.CODCOLIGADA, SB.CHAPA, SB.INICIOPER, SB.FIMPER, ((SB.EXTRAANT - SB.ATRASOANT - SB.FALTAANT) + (SB.EXTRAATU - SB.ATRASOATU - SB.FALTAATU)) AS SALDO_MINUTOS, ROW_NUMBER() OVER ( PARTITION BY SB.CODCOLIGADA, SB.CHAPA ORDER BY SB.FIMPER DESC ) AS RN FROM ASALDOBANCOHOR SB ) SELECT PF.CODCOLIGADA, PF.CHAPA, PF.NOME, P.CPF, PS.DESCRICAO AS SECAO, PU.NOME AS FUNCAO, PF.DATAADMISSAO, PF.DATADEMISSAO, PF.CODSITUACAO, PF.CODSECAO, PF.CODFUNCAO, PF.SALARIO, PF.CODSINDICATO, SA.INICIOPER, SA.FIMPER, SA.SALDO_MINUTOS FROM PFUNC PF JOIN PPESSOA P ON P.CODIGO = PF.CODPESSOA JOIN PSECAO PS ON PS.CODCOLIGADA = PF.CODCOLIGADA AND PS.CODIGO = PF.CODSECAO JOIN PFUNCAO PU ON PU.CODCOLIGADA = PF.CODCOLIGADA AND PU.CODIGO = PF.CODFUNCAO LEFT JOIN SALDO_ATUAL SA ON SA.CODCOLIGADA = PF.CODCOLIGADA AND SA.CHAPA = PF.CHAPA AND SA.RN = 1 WHERE {filtro_situacao} ORDER BY PF.NOME """ with get_sqlserver_connection() as conn: cursor = conn.cursor(as_dict=True) cursor.execute(query) return cursor.fetchall() def buscar_colaboradores_rm_desligados(nome: str = None): """ Busca colaboradores DESLIGADOS no RM para fluxo de Substituição. Seguindo padrão do sgmp: - CODSITUACAO = 'D' - DATADEMISSAO IS NOT NULL Args: nome (str, optional): Nome para filtrar (busca parcial, case-insensitive) Returns: list: Lista de dicionários com dados dos colaboradores desligados """ query = """ SELECT PF.CODCOLIGADA, PF.CHAPA, P.NOME, P.CPF, PS.DESCRICAO AS SECAO, PU.NOME AS FUNCAO, PF.DATAADMISSAO, PF.DATADEMISSAO, PF.CODSITUACAO, PF.CODSECAO, PF.CODFUNCAO, PF.SALARIO, PF.CODSINDICATO FROM PFUNC PF JOIN PPESSOA P ON P.CODIGO = PF.CODPESSOA JOIN PSECAO PS ON PS.CODCOLIGADA = PF.CODCOLIGADA AND PS.CODIGO = PF.CODSECAO JOIN PFUNCAO PU ON PU.CODCOLIGADA = PF.CODCOLIGADA AND PU.CODIGO = PF.CODFUNCAO WHERE PF.CODSITUACAO = 'D' AND PF.DATADEMISSAO IS NOT NULL """ params = [] if nome: query += " AND LOWER(P.NOME) LIKE LOWER(%s)" params.append(f"%{nome}%") query += " ORDER BY P.NOME" with get_sqlserver_connection() as conn: cursor = conn.cursor(as_dict=True) cursor.execute(query, tuple(params) if params else None) return cursor.fetchall() def listar_cargos_ativos_rm(): """ Lista cargos existentes no RM com base em uso real: - Apenas cargos associados a colaboradores ATIVOS - Remove duplicidade por nome - Não depende de codcoligada do usuário Retorna lista simples para uso em selects. """ query = """ SELECT DISTINCT PU.CODIGO AS cod_funcao, PU.NOME AS nome_funcao, PU.CARGO AS cargo FROM PFUNC PF JOIN PFUNCAO PU ON PU.CODCOLIGADA = PF.CODCOLIGADA AND PU.CODIGO = PF.CODFUNCAO WHERE PF.CODSITUACAO <> 'D' AND PU.INATIVA = 0 ORDER BY PU.NOME """ with get_sqlserver_connection() as conn: cursor = conn.cursor(as_dict=True) cursor.execute(query) rows = cursor.fetchall() # Normalização leve cargos = [] vistos = set() for row in rows: nome = (row.get("nome_funcao") or "").strip() if not nome: continue chave = nome.lower() if chave in vistos: continue vistos.add(chave) cargos.append({ "codigo": row.get("cod_funcao"), "nome": nome, "cargo": row.get("cargo"), }) return cargos def listar_secoes_ativas_rm(): """ Lista seções/centros de custo existentes no RM com base em uso real: - Apenas seções associadas a colaboradores ATIVOS - Remove duplicidade por nome - Não depende de codcoligada do usuário Retorna lista simples para uso em selects. """ query = """ SELECT DISTINCT PS.CODIGO AS cod_secao, PS.DESCRICAO AS descricao_secao FROM PFUNC PF JOIN PSECAO PS ON PS.CODCOLIGADA = PF.CODCOLIGADA AND PS.CODIGO = PF.CODSECAO WHERE PF.CODSITUACAO <> 'D' ORDER BY PS.DESCRICAO """ with get_sqlserver_connection() as conn: cursor = conn.cursor(as_dict=True) cursor.execute(query) rows = cursor.fetchall() # Normalização leve secoes = [] vistos = set() for row in rows: descricao = (row.get("descricao_secao") or "").strip() if not descricao: continue chave = descricao.lower() if chave in vistos: continue vistos.add(chave) secoes.append({ "codigo": row.get("cod_secao"), "descricao": descricao, }) return secoes def listar_coligadas_rm(): """ Lista coligadas existentes no RM para uso em selects (ex.: formulário de admissão por aumento). Tenta GCOLIGADA (NOME); senão usa CODCOLIGADA distintos de PFUNC. Retorna lista de {"codigo": int, "nome": str}. Exclui sempre CODCOLIGADA=0 (inválido no RM). """ try: query = """ SELECT CODCOLIGADA AS codigo, NOME AS nome FROM GCOLIGADA WHERE CODCOLIGADA <> 0 AND NOME IS NOT NULL ORDER BY NOME """ with get_sqlserver_connection() as conn: cursor = conn.cursor(as_dict=True) cursor.execute(query) rows = cursor.fetchall() result = [ {"codigo": row["codigo"], "nome": (row.get("nome") or str(row["codigo"])).strip()} for row in rows if row.get("codigo") and ((row.get("nome") or str(row["codigo"])).strip()) ] except Exception: query = """ SELECT DISTINCT CODCOLIGADA AS codigo FROM PFUNC WHERE CODSITUACAO <> 'D' AND CODCOLIGADA <> 0 ORDER BY CODCOLIGADA """ with get_sqlserver_connection() as conn: cursor = conn.cursor(as_dict=True) cursor.execute(query) rows = cursor.fetchall() result = [{"codigo": row["codigo"], "nome": str(row["codigo"])} for row in rows if row.get("codigo")] return [r for r in result if r.get("codigo") != 0] def listar_filiais_rm(): """ Lista filiais existentes no RM para uso em selects (ex.: formulário de admissão por aumento). Tenta GFILIAL; senão usa CODCOLIGADA/CODFILIAL distintos de PFUNC quando existir CODFILIAL. Retorna lista de {"cod_coligada": int, "codigo": int, "nome": str} para permitir filtrar por coligada no front. """ try: query = """ SELECT CODCOLIGADA AS cod_coligada, CODFILIAL AS codigo, COALESCE(NOME, CAST(CODFILIAL AS VARCHAR)) AS nome FROM GFILIAL ORDER BY CODCOLIGADA, CODFILIAL """ with get_sqlserver_connection() as conn: cursor = conn.cursor(as_dict=True) cursor.execute(query) rows = cursor.fetchall() out = [ { "cod_coligada": int(row["cod_coligada"]), "codigo": int(row["codigo"]), "nome": (row.get("nome") or str(row["codigo"])).strip(), } for row in rows ] return out except Exception: try: query = """ SELECT DISTINCT CODCOLIGADA AS cod_coligada, CODFILIAL AS codigo FROM PFUNC WHERE CODSITUACAO <> 'D' ORDER BY CODCOLIGADA, CODFILIAL """ with get_sqlserver_connection() as conn: cursor = conn.cursor(as_dict=True) cursor.execute(query) rows = cursor.fetchall() out = [ {"cod_coligada": int(row["cod_coligada"]), "codigo": int(row["codigo"]), "nome": str(int(row["codigo"]))} for row in rows ] return out except Exception: return [] def verificar_estabilidades_colaborador(id_rm: str): """ Verifica todas as estabilidades legais/operacionais de um colaborador que podem bloquear um desligamento. O id_rm deve estar no formato "CODCOLIGADA-CHAPA" (ex: "3-052412") Retorna uma lista de dicionários com informações sobre cada estabilidade encontrada. Args: id_rm: Identificador do colaborador no formato "CODCOLIGADA-CHAPA" Returns: list: Lista de dicionários com: - tipo: Tipo de estabilidade (MATERNIDADE, PREVIDENCIARIO, ACIDENTE_TRABALHO, CIPA) - bloqueado: bool - Se o desligamento está bloqueado - data_fim: date - Data até quando a estabilidade vale (None se indefinida) - mensagem: str - Mensagem explicativa para o usuário - detalhes: dict - Detalhes adicionais do afastamento """ # Extrai codcoligada e chapa do id_rm try: codcoligada_str, chapa = id_rm.split('-', 1) codcoligada = int(codcoligada_str) except (ValueError, AttributeError): # Se não conseguir extrair, retorna lista vazia return [] # Chama a função auxiliar return verificar_estabilidades_colaborador_por_codcoligada_chapa(codcoligada, chapa) def _normalizar_para_date(valor): """ Normaliza um valor para datetime.date. Aceita date, datetime, ou outros tipos que tenham método .date() Garante que sempre retorne date (não datetime). """ from datetime import date, datetime if valor is None: return None # Se já é date (e não datetime), retorna direto if isinstance(valor, date): # Verifica se é datetime (datetime é subclasse de date) if isinstance(valor, datetime): return valor.date() return valor # Se é datetime, converte para date if isinstance(valor, datetime): return valor.date() # Se tem método date(), tenta usar if hasattr(valor, 'date'): try: resultado = valor.date() # Se o resultado ainda for datetime, converte novamente if isinstance(resultado, datetime): return resultado.date() return resultado except (AttributeError, TypeError): pass # Se chegou aqui e não conseguiu converter, retorna None ou tenta converter de outra forma return valor def verificar_estabilidades_colaborador_por_codcoligada_chapa(codcoligada: int, chapa: str): """ Versão alternativa que aceita codcoligada e chapa diretamente. Usada internamente pela função principal. """ from datetime import date, timedelta try: from dateutil.relativedelta import relativedelta except ImportError: # Fallback se dateutil não estiver instalado def relativedelta(**kwargs): months = kwargs.get('months', 0) if months: # Aproximação simples: 30 dias por mês return timedelta(days=months * 30) return timedelta() estabilidades = [] query = """ SELECT HS.TIPO, HS.MOTIVO, HS.DTINICIO, HS.DTFINAL, HS.OBSERVACAO, HS.CODESTABILIDADE, HS.AFASTPELAPREVIDENCIA FROM PFHSTAFT HS WHERE HS.CODCOLIGADA = %s AND HS.CHAPA = %s AND ( -- Licença Maternidade (HS.TIPO = 'E' AND HS.MOTIVO = '03') OR -- Afastamento Previdenciário (HS.TIPO = 'P' AND HS.AFASTPELAPREVIDENCIA = 1) OR -- Acidente de Trabalho (HS.TIPO = 'T') OR -- Outros tipos que podem indicar estabilidade (HS.CODESTABILIDADE IS NOT NULL AND HS.CODESTABILIDADE <> '') ) ORDER BY HS.DTINICIO DESC """ with get_sqlserver_connection() as conn: cursor = conn.cursor(as_dict=True) cursor.execute(query, (codcoligada, chapa)) rows = cursor.fetchall() hoje = date.today() # Garantir que hoje é sempre date for row in rows: tipo_afast = row.get('TIPO', '').upper() motivo = row.get('MOTIVO', '') observacao = (row.get('OBSERVACAO') or '').upper() dt_inicio = row.get('DTINICIO') dt_final = row.get('DTFINAL') afast_previdencia = row.get('AFASTPELAPREVIDENCIA', 0) # 1. LICENÇA MATERNIDADE if tipo_afast == 'E' and motivo == '03' and 'MATERNIDADE' in observacao: if dt_final: # Estabilidade: durante licença + 30 dias após retorno data_retorno = _normalizar_para_date(dt_final) data_fim_estabilidade = data_retorno + timedelta(days=30) # Garante que data_fim_estabilidade seja date data_fim_estabilidade = _normalizar_para_date(data_fim_estabilidade) bloqueado = hoje <= data_fim_estabilidade estabilidades.append({ 'tipo': 'MATERNIDADE', 'bloqueado': bloqueado, 'data_fim': data_fim_estabilidade, 'mensagem': f"Colaboradora em estabilidade por licença maternidade. O desligamento não é permitido até {data_fim_estabilidade.strftime('%d/%m/%Y')}.", 'detalhes': { 'dt_inicio': dt_inicio, 'dt_final': dt_final, 'observacao': row.get('OBSERVACAO') } }) else: # Licença em andamento sem data final estabilidades.append({ 'tipo': 'MATERNIDADE', 'bloqueado': True, 'data_fim': None, 'mensagem': "Colaboradora em licença maternidade. O desligamento não é permitido durante a licença e até 30 dias após o retorno.", 'detalhes': { 'dt_inicio': dt_inicio, 'dt_final': dt_final, 'observacao': row.get('OBSERVACAO') } }) # 2. AFASTAMENTO PREVIDENCIÁRIO elif tipo_afast == 'P' and afast_previdencia == 1: if dt_final: data_fim = _normalizar_para_date(dt_final) bloqueado = hoje <= data_fim else: # Afastamento em andamento bloqueado = True data_fim = None if bloqueado: estabilidades.append({ 'tipo': 'PREVIDENCIARIO', 'bloqueado': True, 'data_fim': data_fim, 'mensagem': "Colaborador encontra-se afastado pelo INSS. O desligamento só poderá ser avaliado após o término do afastamento." + (f" (até {data_fim.strftime('%d/%m/%Y')})" if data_fim else ""), 'detalhes': { 'dt_inicio': dt_inicio, 'dt_final': dt_final, 'observacao': row.get('OBSERVACAO') } }) # 3. ACIDENTE DE TRABALHO elif tipo_afast == 'T': if dt_final: # Estabilidade de 12 meses após retorno data_retorno = _normalizar_para_date(dt_final) if data_retorno is None or not isinstance(data_retorno, date): continue # Não conseguiu normalizar, pula este registro # Soma os meses usando relativedelta # IMPORTANTE: relativedelta pode retornar datetime mesmo quando somado a date data_fim_estabilidade = data_retorno + relativedelta(months=12) # Garante que seja sempre date (não datetime) # relativedelta pode retornar datetime, então normaliza novamente from datetime import datetime as dt if isinstance(data_fim_estabilidade, dt): data_fim_estabilidade = data_fim_estabilidade.date() elif not isinstance(data_fim_estabilidade, date): data_fim_estabilidade = _normalizar_para_date(data_fim_estabilidade) if data_fim_estabilidade is None or not isinstance(data_fim_estabilidade, date): continue # Não conseguiu normalizar, pula bloqueado = hoje <= data_fim_estabilidade if bloqueado: estabilidades.append({ 'tipo': 'ACIDENTE_TRABALHO', 'bloqueado': True, 'data_fim': data_fim_estabilidade, 'mensagem': f"Colaborador possui estabilidade por acidente de trabalho até {data_fim_estabilidade.strftime('%d/%m/%Y')}.", 'detalhes': { 'dt_inicio': dt_inicio, 'dt_final': dt_final, 'dt_retorno': data_retorno, 'observacao': row.get('OBSERVACAO') } }) # 4. CIPA (verificar em tabela separada se existir) query_cipa = """ SELECT TOP 1 DTINICIO, DTFINAL FROM VCANDIDATOSCIPA WHERE CODCOLIGADA = %s AND CHAPA = %s AND (DTFINAL IS NULL OR DTFINAL >= GETDATE()) ORDER BY DTINICIO DESC """ try: with get_sqlserver_connection() as conn: cursor = conn.cursor(as_dict=True) cursor.execute(query_cipa, (codcoligada, chapa)) cipa_row = cursor.fetchone() if cipa_row: dt_fim_cipa = cipa_row.get('DTFINAL') if dt_fim_cipa: data_fim = _normalizar_para_date(dt_fim_cipa) bloqueado = hoje <= data_fim else: # CIPA em andamento bloqueado = True data_fim = None if bloqueado: estabilidades.append({ 'tipo': 'CIPA', 'bloqueado': True, 'data_fim': data_fim, 'mensagem': "Colaborador possui estabilidade por participação na CIPA. Desligamento não permitido neste período." + (f" (até {data_fim.strftime('%d/%m/%Y')})" if data_fim else ""), 'detalhes': { 'dt_inicio': cipa_row.get('DTINICIO'), 'dt_final': dt_fim_cipa } }) except Exception: # Se a tabela CIPA não existir, ignora silenciosamente pass return estabilidades