使用Python sanic框架钉钉和第三方打卡机实现

使用Python sanic框架钉钉和第三方打卡机实现同样还是需要开通钉钉应用这里就不错多说了第一步:梳理逻辑流程前提:打卡的机器是使用postgres数据库,由于因为某些原因,钉钉userId 我已经提前获取到了存放到数据库里。1.

大家好,欢迎来到IT知识分享网。

同样还是需要开通钉钉应用这里就不错多说了

第一步:梳理逻辑流程

前提:打卡的机器是使用postgres数据库,由于因为某些原因,钉钉userId 我已经提前获取到了存放到数据库里。

1.用户打卡成功后,我们应该监听数据库进行查询,然后获取到打卡的时间,在通过钉钉的工作消息接口,发送消息给当前考勤打卡的用户,这样用户就可以知道我上班的打卡时间!

现在我们看看应该怎么去实现

先定义钉钉接口先Token:

appkey = "" appsecret = "" ding_url = "https://oapi.dingtalk.com/" async def dingTalkToken(): async with httpx.AsyncClient() as client: response = await client.get(ding_url + 'gettoken', params={"appkey": appkey, "appsecret": appsecret}) # 解析响应JSON result = response.json() # 提取Access Token access_token = result.get("access_token") return access_token

钉钉工作消息接口

async def dingTalkTokenAsyncsend_v2(access_token, kqTime, userid): params = { "agent_id": , "msg": { "msgtype": "text", "text": { "content": "打卡成功:" + kqTime } }, "userid_list": userid } async with httpx.AsyncClient() as client: response = await client.post( ding_url + 'topapi/message/corpconversation/asyncsend_v2?access_token=' + access_token, params=params) # 解析响应JSON result = response.json() # 提取Access Token errcode = result.get("errcode") return errcode

以上向钉钉工作发送消息的接口已经完成了

接下来就是核心代码:

async def check_notifications(): print('执行') try: # 连接到数据库 with psycopg2.connect(dbname=posql.dbname, user=posql.user, password=posql.password, host=posql.host, port=posql.port) as connection: # 创建一个游标对象,用于执行 SQL 语句 with connection.cursor() as cursor: # 执行查询 cursor.execute("LISTEN punch_event_channel") while True: connection.commit() # 提交事务 # 检查是否有通知 connection.poll() # 从服务器获取通知 notifies = connection.notifies # 获取所有通知列表 if notifies: for notify in notifies: # 逐一处理通知 # 检查通知是否已经处理过 if notify.payload not in processed_notifications: # 执行查询 cursor.execute("SELECT * FROM table WHERE id = %s" % int(notify.payload)) # 获取查询结果 result = cursor.fetchone() columns = [desc[0] for desc in cursor.description] result_dict = dict(zip(columns, result)) # 处理 datetime 对象的序列化 result_dict['timestamp'] = result_dict.get('timestamp', None) if result_dict['timestamp']: result_dict['timestamp'] = datetime.fromisoformat(result_dict['timestamp']) kqTime = result_dict['att_date'] + ' ' + result_dict['att_time'] # 获取用户ID user_name = result_dict['person_name'] sql_str = """EXEC GetUserDingByName @UserName = N'%s';""" % user_name user_id = query_user_info(sql_str) print(user_id) if user_id != 'null' and user_id is not None: # 发送HTTP POST请求获取Access Token access_token = await dingTalkToken() codes = await dingTalkTokenAsyncsend_v2(access_token, kqTime, user_id, user_name) print(f"考勤时间: " + result_dict['att_date'] + ' ' + result_dict['att_time']) if codes == 0: # 记录成功打卡的信息 success_message = f"成功打卡:{user_name},时间:{kqTime}" logging.info(success_message) print(success_message) else: # 记录打卡失败的信息 error_message = f"打卡失败:{user_name},时间:{kqTime},错误代码:{codes}" logging.error(error_message) print(error_message) break # 清除已处理的通知 processed_notifications.add(notify.payload) else: success_message = f"成功打卡:{user_name},用户ID不在系统中 'null',无法发送钉钉消息。" logging.info(success_message) print(f"用户ID为 'null',无法发送钉钉消息。") # 清空通知列表 connection.notifies.clear() break # 在没有通知时等待一段时间 else: print(f"在没有通知时等待一段时间") time.sleep(5) except Exception as e: print({"error": str(e)})

解释一下代码:

 # 执行查询 cursor.execute("LISTEN punch_event_channel")

这里我是在考勤机器的数据库里做了一个punch_event_channel 的频道,而这个频道是我创建了一个触发函数用来触发最新数据库里的数据

接下来是创建触发函数

-- 创建触发器函数 CREATE OR REPLACE FUNCTION notify_punch_event() RETURNS TRIGGER AS $ BEGIN PERFORM pg_notify('punch_event_channel', 'new_punch_event'); RETURN NEW; END; $ LANGUAGE plpgsql; 

-- 创建触发器,关联到表的 AFTER INSERT 事件上 CREATE TRIGGER 触发器名称XXXXX AFTER INSERT ON 表名 FOR EACH ROW EXECUTE PROCEDURE notify_punch_event();

在sql 工具执行这两句就可以了,替换成你自己的数据库

另外这里是我内部拿取钉钉Userid的数据库,我就不放代码了:

# 获取用户ID user_name = result_dict['person_name'] sql_str = """EXEC GetUserDingByName @UserName = N'%s';""" % user_name user_id = query_user_info(sql_str)

你们可以根据自己方式,来获取

最后,就是使用定时任务来

async def periodic_task(): while True: await check_notifications() await asyncio.sleep(1) # 1秒钟检查一次,可以根据需要调整间隔 if __name__ == '__main__': # 启动定时任务 app.add_task(periodic_task()) # 启动 Sanic 应用 app.run(host='0.0.0.0', port=8089, workers=1)

最后记得导入包

import psycopg2 import httpx import pymssql # 这是sql server 数据库连接

如果写的好动动你们发财的小手点赞,对你有帮助也可以打赏请我喝杯咖啡,提提神,感谢各位兄弟了

使用Python sanic框架钉钉和第三方打卡机实现

null

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/91062.html

(0)
上一篇 2024-10-19 12:15
下一篇 2024-10-20 12:45

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信