博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python实现获取微信企业号access_token的Class
阅读量:5864 次
发布时间:2019-06-19

本文共 10177 字,大约阅读时间需要 33 分钟。

    微信公众号共有三种,服务号、订阅号、企业号。它们在获取AccessToken上各有不同。其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效。而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回相同结果。为兼容这两种方式,因此按照订阅号的方式处理。

    处理办法与接口文档中的要求相同:

    为了保密appsecrect,第三方需要一个access_token获取和刷新的中控服务器。而其他业务逻辑服务器所使用的access_token均来自于该中控服务器,不应该各自去刷新,否则会造成access_token覆盖而影响业务。

    下面的代码以企业号为例,将access_token储存在sqlite3数据库中,相比储存在文本中,放在数据库里,可以为后期存放其他数据提供向后兼容。如果放在文本中,则不如放在数据库中灵活。

    设计思路和使用方法:

  1. 自动创建sqlite3数据库,包括表结构和数据,并能在数据库表结构不存在或者数据不存在或遭删除的情况下,创建新的可用的数据

  2. 尽可能的保证Class中每一个可执行的函数单独调用都能成功。

  3. Class中只将真正能被用到的方法和变量设置为public的。

  4. 使用时只需要修改此文件中的weixin_qy_CorpID和weixin_qy_Secret改成自己的,并import此文件,使用WeiXinTokenClass().get()方法即可得到access_token。

脚本内容可以从github上获取,地址:

脚本内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#!/usr/bin/python
# encoding: utf-8
# -*- coding: utf8 -*-
"""
Created by PyCharm.
File:               LinuxBashShellScriptForOps:odbp_getToken.py
User:               Guodong
Create Date:        2016/8/10
Create Time:        17:04
 
"""
 
import 
os
import 
sqlite3
import 
sys
import 
urllib
import 
urllib2
import 
json
import 
datetime
 
# import time
 
enable_debug 
= 
True
 
 
def 
debug(msg, code
=
None
):
    
if 
enable_debug:
        
if 
code 
is 
None
:
            
print 
"message: %s" 
% 
msg
        
else
:
            
print 
"message: %s, code: %s " 
% 
(msg, code)
 
 
AUTHOR_MAIL 
= 
"uberurey_ups@163.com"
 
weixin_qy_CorpID 
= 
"your_corpid"
weixin_qy_Secret 
= 
"your_secret"
 
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR 
= 
os.path.dirname(os.path.abspath(__file__))
 
# Database
# https://docs.djangoproject.com/en/1.9/ref/settings/#databases
 
DATABASES 
= 
{
    
'default'
: {
        
'ENGINE'
'db.backends.sqlite3'
,
        
'NAME'
: os.path.join(BASE_DIR, 
'.odbp_db.sqlite3'
),
    
}
}
 
sqlite3_db_file 
= 
str
(DATABASES[
'default'
][
'NAME'
])
 
 
def 
sqlite3_conn(database):
    
try
:
        
conn 
= 
sqlite3.connect(database)
    
except 
sqlite3.Error:
        
print 
>> sys.stderr, 
"""\
    
There was a problem connecting to Database:
 
        
%s
 
    
The error leading to this problem was:
 
        
%s
 
    
It's possible that this database is broken or permission denied.
 
    
If you cannot solve this problem yourself, please mail to:
 
        
%s
 
    
""" 
% 
(database, sys.exc_value, AUTHOR_MAIL)
        
sys.exit(
1
)
    
else
:
        
return 
conn
 
 
def 
sqlite3_commit(conn):
    
return 
conn.commit()
 
 
def 
sqlite3_close(conn):
    
return 
conn.close()
 
 
def 
sqlite3_execute(database, sql):
    
try
:
        
sql_conn 
= 
sqlite3_conn(database)
        
sql_cursor 
= 
sql_conn.cursor()
        
sql_cursor.execute(sql)
        
sql_conn.commit()
        
sql_conn.close()
    
except 
sqlite3.Error as e:
        
print 
e
        
sys.exit(
1
)
 
 
def 
sqlite3_create_table_token():
    
sql_conn 
= 
sqlite3_conn(sqlite3_db_file)
    
sql_cursor 
= 
sql_conn.cursor()
    
sql_cursor.execute(
'''CREATE TABLE "main"."weixin_token" (
                
"id"  INTEGER ,
                
"access_token"  TEXT,
                
"expires_in"  TEXT,
                
"expires_on"  TEXT,
                
"is_expired"  INTEGER
                
)
                
;
    
'''
)
    
sqlite3_commit(sql_conn)
    
sqlite3_close(sql_conn)
 
 
def 
sqlite3_create_table_account():
    
sql_conn 
= 
sqlite3_conn(sqlite3_db_file)
    
sql_cursor 
= 
sql_conn.cursor()
    
sql_cursor.execute(
'''CREATE TABLE "main"."weixin_account" (
                
"id"  INTEGER,
                
"name"  TEXT,
                
"corpid"  TEXT,
                
"secret"  TEXT,
                
"current"  INTEGER
                
)
                
;
    
'''
)
    
sqlite3_commit(sql_conn)
    
sqlite3_close(sql_conn)
 
 
def 
sqlite3_create_tables():
    
print 
"sqlite3_create_tables"
    
sql_conn 
= 
sqlite3_conn(sqlite3_db_file)
    
sql_cursor 
= 
sql_conn.cursor()
    
sql_cursor.execute(
'''CREATE TABLE "main"."weixin_token" (
                
"id"  INTEGER ,
                
"access_token"  TEXT,
                
"expires_in"  TEXT,
                
"expires_on"  TEXT,
                
"is_expired"  INTEGER
                
)
                
;
    
'''
)
    
sql_cursor.execute(
'''CREATE TABLE "main"."weixin_account" (
                
"id"  INTEGER,
                
"name"  TEXT,
                
"corpid"  TEXT,
                
"secret"  TEXT,
                
"current"  INTEGER
                
)
                
;
    
'''
)
    
sqlite3_commit(sql_conn)
    
sqlite3_close(sql_conn)
 
 
def 
sqlite3_set_credential(corpid, secret):
    
try
:
        
sql_conn 
= 
sqlite3_conn(sqlite3_db_file)
        
sql_cursor 
= 
sql_conn.cursor()
        
sql_cursor.execute(
'''INSERT INTO "weixin_account" ("id", "name", "corpid", "secret", "current") VALUES
                                
(1,
                                
'odbp',
                                
?,
                                
?,
                                
1)
'''
, (corpid, secret))
        
sqlite3_commit(sql_conn)
        
sqlite3_close(sql_conn)
    
except 
sqlite3.Error:
        
sqlite3_create_table_account()
        
sqlite3_set_credential(corpid, secret)
 
 
def 
sqlite3_set_token(access_token, expires_in, expires_on, is_expired):
    
try
:
        
sql_conn 
= 
sqlite3_conn(sqlite3_db_file)
        
sql_cursor 
= 
sql_conn.cursor()
        
sql_cursor.execute(
'''INSERT INTO "weixin_token"
                              
("id", "access_token", "expires_in", "expires_on", "is_expired") VALUES
                              
(
                              
1,
                              
?,
                              
?,
                              
?,
                              
?
                              
)
'''
, (access_token, expires_in, expires_on, is_expired))
        
sqlite3_commit(sql_conn)
        
sqlite3_close(sql_conn)
    
except 
sqlite3.Error:
        
sqlite3_create_table_token()
        
sqlite3_set_token(access_token, expires_in, expires_on, is_expired)
 
 
def 
sqlite3_get_credential():
    
try
:
        
sql_conn 
= 
sqlite3_conn(sqlite3_db_file)
        
sql_cursor 
= 
sql_conn.cursor()
        
credential 
= 
sql_cursor.execute(
'''SELECT "corpid", "secret"  FROM weixin_account WHERE current == 1;'''
)
        
result 
= 
credential.fetchall()
        
sqlite3_close(sql_conn)
    
except 
sqlite3.Error:
        
sqlite3_set_credential(weixin_qy_CorpID, weixin_qy_Secret)
        
return 
sqlite3_get_credential()
    
else
:
        
if 
result 
is 
not 
None 
and 
len
(result) !
= 
0
:
            
return 
result
        
else
:
            
print 
"unrecoverable problem, please alter to %s" 
% 
AUTHOR_MAIL
            
sys.exit(
1
)
 
 
def 
sqlite3_get_token():
    
try
:
        
sql_conn 
= 
sqlite3_conn(sqlite3_db_file)
        
sql_cursor 
= 
sql_conn.cursor()
        
credential 
= 
sql_cursor.execute(
            
'''SELECT "access_token", "expires_on" FROM weixin_token WHERE "is_expired" == 1 ;'''
)
        
result 
= 
credential.fetchall()
        
sqlite3_close(sql_conn)
    
except 
sqlite3.Error:
        
info 
= 
sys.exc_info()
        
print 
info[
0
], 
":"
, info[
1
]
    
else
:
        
if 
result 
is 
not 
None 
and 
len
(result) !
= 
0
:
            
return 
result
        
else
:
            
# print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL
            
# sys.exit(1)
            
return 
None
 
 
def 
sqlite3_update_token(access_token, expires_on):
    
sql_conn 
= 
sqlite3_conn(sqlite3_db_file)
    
sql_cursor 
= 
sql_conn.cursor()
    
sql_cursor.execute(
'''UPDATE "weixin_token" SET
                          
access_token=?,
                          
expires_on=?
                          
WHERE _ROWID_ = 1;'''
, (access_token, expires_on)
                       
)
    
sqlite3_commit(sql_conn)
    
sqlite3_close(sql_conn)
 
 
class 
WeiXinTokenClass(
object
):
    
def 
__init__(
self
):
        
self
.__corpid 
= 
None
        
self
.__corpsecret 
= 
None
        
self
.__use_persistence 
= 
True
 
        
self
.__access_token 
= 
None
        
self
.__expires_in 
= 
None
        
self
.__expires_on 
= 
None
        
self
.__is_expired 
= 
None
 
        
if 
self
.__use_persistence:
            
self
.__corpid 
= 
sqlite3_get_credential()[
0
][
0
]
            
self
.__corpsecret 
= 
sqlite3_get_credential()[
0
][
1
]
        
else
:
            
self
.__corpid 
= 
weixin_qy_CorpID
            
self
.__corpsecret 
= 
weixin_qy_Secret
 
    
def 
__get_token_from_weixin_qy_api(
self
):
        
parameters 
= 
{
            
"corpid"
self
.__corpid,
            
"corpsecret"
self
.__corpsecret
        
}
        
url_parameters 
= 
urllib.urlencode(parameters)
        
token_url 
= 
"https://qyapi.weixin.qq.com/cgi-bin/gettoken?"
        
url 
= 
token_url 
+ 
url_parameters
        
response 
= 
urllib2.urlopen(url)
        
result 
= 
response.read()
        
token_json 
= 
json.loads(result)
        
if 
token_json[
'access_token'
is 
not 
None
:
            
get_time_now 
= 
datetime.datetime.now()
            
# TODO(Guodong Ding) token will expired ahead of time or not expired after the time
            
expire_time 
= 
get_time_now 
+ 
datetime.timedelta(seconds
=
token_json[
'expires_in'
])
            
token_json[
'expires_on'
= 
str
(expire_time)
            
self
.__access_token 
= 
token_json[
'access_token'
]
            
self
.__expires_in 
= 
token_json[
'expires_in'
]
            
self
.__expires_on 
= 
token_json[
'expires_on'
]
            
self
.__is_expired 
= 
1
 
            
try
:
                
token_result_set 
= 
sqlite3_get_token()
            
except 
sqlite3.Error:
                
token_result_set 
= 
None
            
if 
token_result_set 
is 
None 
and 
len
(token_result_set) 
=
= 
0
:
                
sqlite3_set_token(
self
.__access_token, 
self
.__expires_in, 
self
.__expires_on, 
self
.__is_expired)
            
else
:
                
if 
self
.__is_token_expired() 
is 
True
:
                    
sqlite3_update_token(
self
.__access_token, 
self
.__expires_on)
                
else
:
                    
debug(
"pass"
)
                    
return
        
else
:
            
if 
token_json[
'errcode'
is 
not 
None
:
                
print 
"errcode is: %s" 
% 
token_json[
'errcode'
]
                
print 
"errmsg is: %s" 
% 
token_json[
'errmsg'
]
            
else
:
                
print 
result
 
    
def 
__get_token_from_persistence_storage(
self
):
        
try
:
            
token_result_set 
= 
sqlite3_get_token()
        
except 
sqlite3.Error:
            
self
.__get_token_from_weixin_qy_api()
        
finally
:
            
if 
token_result_set 
is 
None
:
                
self
.__get_token_from_weixin_qy_api()
                
token_result_set 
= 
sqlite3_get_token()
                
access_token 
= 
token_result_set[
0
][
0
]
                
expire_time 
= 
token_result_set[
0
][
1
]
            
else
:
                
access_token 
= 
token_result_set[
0
][
0
]
                
expire_time 
= 
token_result_set[
0
][
1
]
        
expire_time 
= 
datetime.datetime.strptime(expire_time, 
'%Y-%m-%d %H:%M:%S.%f'
)
        
now_time 
= 
datetime.datetime.now()
        
if 
now_time < expire_time:
            
# print "The token is %s" % access_token
            
# print "The token will expire on %s" % expire_time
            
return 
access_token
        
else
:
            
self
.__get_token_from_weixin_qy_api()
            
return 
self
.__get_token_from_persistence_storage()
 
    
@
staticmethod
    
def 
__is_token_expired():
        
try
:
            
token_result_set 
= 
sqlite3_get_token()
        
except 
sqlite3.Error as e:
            
print 
e
            
sys.exit(
1
)
        
expire_time 
= 
token_result_set[
0
][
1
]
        
expire_time 
= 
datetime.datetime.strptime(expire_time, 
'%Y-%m-%d %H:%M:%S.%f'
)
        
now_time 
= 
datetime.datetime.now()
        
if 
now_time < expire_time:
            
return 
False
        
else
:
            
return 
True
 
    
def 
get(
self
):
        
return 
self
.__get_token_from_persistence_storage()

tag:微信公众号,python,sqlite3

--end--

本文转自 urey_pp 51CTO博客,原文链接:http://blog.51cto.com/dgd2010/1837284,如需转载请自行联系原作者

你可能感兴趣的文章
wowza配置rtsp/rtp播放
查看>>
Swift 学习笔记
查看>>
关于m2eclipse插件在myeclipse 及其以后版本的问题及终极解决办法!
查看>>
Java静态代码块和非静态代码块、类加载、构造对象的机制
查看>>
ORM--UserType--Hibernate4.0注解版
查看>>
noclassdeffounderror和classnotfound
查看>>
干货 | Java中获取类名的3种方法!
查看>>
IT公司100题-2-设计带min函数的stack
查看>>
将CentOS安装到U盘
查看>>
c++友元与运算符重载
查看>>
nginx日志分析工具
查看>>
Linux简介
查看>>
提高 iOS App 通知功能启用率的三个策略
查看>>
做有中国特色的程序员
查看>>
网站UI设计小工具
查看>>
(转) Java 中十进制和十六进制的相互转换
查看>>
proxyTable配置接口代理
查看>>
【比较】view和viewcontroller加载方式
查看>>
linux C之access函数
查看>>
Android 自定义progressDialog实现
查看>>