本文共 10177 字,大约阅读时间需要 33 分钟。
微信公众号共有三种,服务号、订阅号、企业号。它们在获取AccessToken上各有不同。其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效。而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回相同结果。为兼容这两种方式,因此按照订阅号的方式处理。
处理办法与接口文档中的要求相同:
为了保密appsecrect,第三方需要一个access_token获取和刷新的中控服务器。而其他业务逻辑服务器所使用的access_token均来自于该中控服务器,不应该各自去刷新,否则会造成access_token覆盖而影响业务。
下面的代码以企业号为例,将access_token储存在sqlite3数据库中,相比储存在文本中,放在数据库里,可以为后期存放其他数据提供向后兼容。如果放在文本中,则不如放在数据库中灵活。
设计思路和使用方法:
自动创建sqlite3数据库,包括表结构和数据,并能在数据库表结构不存在或者数据不存在或遭删除的情况下,创建新的可用的数据
尽可能的保证Class中每一个可执行的函数单独调用都能成功。
Class中只将真正能被用到的方法和变量设置为public的。
使用时只需要修改此文件中的weixin_qy_CorpID和weixin_qy_Secret改成自己的,并import此文件,使用WeiXinTokenClass().get()方法即可得到access_token。
脚本内容可以从github上获取,地址:
脚本内容如下:
| #!/usr/bin/python # encoding: utf-8 # -*- coding: utf8 -*- """ Created by PyCharm. File: LinuxBashShellScriptForOps:odbp_getToken.py User: Guodong Create Date: 2016/8/10 Create Time: 17:04 """ import os import sqlite3 import sys import urllib import urllib2 import json import datetime # import time enable_debug = True def debug(msg, code = None ): if enable_debug: if code is None : print "message: %s" % msg else : print "message: %s, code: %s " % (msg, code) AUTHOR_MAIL = "uberurey_ups@163.com" weixin_qy_CorpID = "your_corpid" weixin_qy_Secret = "your_secret" # Build paths inside the project like this: os.path.join(BASE_DIR, ...) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # Database # https://docs.djangoproject.com/en/1.9/ref/settings/#databases DATABASES = { 'default' : { 'ENGINE' : 'db.backends.sqlite3' , 'NAME' : os.path.join(BASE_DIR, '.odbp_db.sqlite3' ), } } sqlite3_db_file = str (DATABASES[ 'default' ][ 'NAME' ]) def sqlite3_conn(database): try : conn = sqlite3.connect(database) except sqlite3.Error: print >> sys.stderr, """\ There was a problem connecting to Database: %s The error leading to this problem was: %s It's possible that this database is broken or permission denied. If you cannot solve this problem yourself, please mail to: %s """ % (database, sys.exc_value, AUTHOR_MAIL) sys.exit( 1 ) else : return conn def sqlite3_commit(conn): return conn.commit() def sqlite3_close(conn): return conn.close() def sqlite3_execute(database, sql): try : sql_conn = sqlite3_conn(database) sql_cursor = sql_conn.cursor() sql_cursor.execute(sql) sql_conn.commit() sql_conn.close() except sqlite3.Error as e: print e sys.exit( 1 ) def sqlite3_create_table_token(): sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute( '''CREATE TABLE "main"."weixin_token" ( "id" INTEGER , "access_token" TEXT, "expires_in" TEXT, "expires_on" TEXT, "is_expired" INTEGER ) ; ''' ) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) def sqlite3_create_table_account(): sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute( '''CREATE TABLE "main"."weixin_account" ( "id" INTEGER, "name" TEXT, "corpid" TEXT, "secret" TEXT, "current" INTEGER ) ; ''' ) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) def sqlite3_create_tables(): print "sqlite3_create_tables" sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute( '''CREATE TABLE "main"."weixin_token" ( "id" INTEGER , "access_token" TEXT, "expires_in" TEXT, "expires_on" TEXT, "is_expired" INTEGER ) ; ''' ) sql_cursor.execute( '''CREATE TABLE "main"."weixin_account" ( "id" INTEGER, "name" TEXT, "corpid" TEXT, "secret" TEXT, "current" INTEGER ) ; ''' ) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) def sqlite3_set_credential(corpid, secret): try : sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute( '''INSERT INTO "weixin_account" ("id", "name", "corpid", "secret", "current") VALUES (1, 'odbp', ?, ?, 1) ''' , (corpid, secret)) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) except sqlite3.Error: sqlite3_create_table_account() sqlite3_set_credential(corpid, secret) def sqlite3_set_token(access_token, expires_in, expires_on, is_expired): try : sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute( '''INSERT INTO "weixin_token" ("id", "access_token", "expires_in", "expires_on", "is_expired") VALUES ( 1, ?, ?, ?, ? ) ''' , (access_token, expires_in, expires_on, is_expired)) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) except sqlite3.Error: sqlite3_create_table_token() sqlite3_set_token(access_token, expires_in, expires_on, is_expired) def sqlite3_get_credential(): try : sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() credential = sql_cursor.execute( '''SELECT "corpid", "secret" FROM weixin_account WHERE current == 1;''' ) result = credential.fetchall() sqlite3_close(sql_conn) except sqlite3.Error: sqlite3_set_credential(weixin_qy_CorpID, weixin_qy_Secret) return sqlite3_get_credential() else : if result is not None and len (result) ! = 0 : return result else : print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL sys.exit( 1 ) def sqlite3_get_token(): try : sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() credential = sql_cursor.execute( '''SELECT "access_token", "expires_on" FROM weixin_token WHERE "is_expired" == 1 ;''' ) result = credential.fetchall() sqlite3_close(sql_conn) except sqlite3.Error: info = sys.exc_info() print info[ 0 ], ":" , info[ 1 ] else : if result is not None and len (result) ! = 0 : return result else : # print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL # sys.exit(1) return None def sqlite3_update_token(access_token, expires_on): sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute( '''UPDATE "weixin_token" SET access_token=?, expires_on=? WHERE _ROWID_ = 1;''' , (access_token, expires_on) ) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) class WeiXinTokenClass( object ): def __init__( self ): self .__corpid = None self .__corpsecret = None self .__use_persistence = True self .__access_token = None self .__expires_in = None self .__expires_on = None self .__is_expired = None if self .__use_persistence: self .__corpid = sqlite3_get_credential()[ 0 ][ 0 ] self .__corpsecret = sqlite3_get_credential()[ 0 ][ 1 ] else : self .__corpid = weixin_qy_CorpID self .__corpsecret = weixin_qy_Secret def __get_token_from_weixin_qy_api( self ): parameters = { "corpid" : self .__corpid, "corpsecret" : self .__corpsecret } url_parameters = urllib.urlencode(parameters) token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?" url = token_url + url_parameters response = urllib2.urlopen(url) result = response.read() token_json = json.loads(result) if token_json[ 'access_token' ] is not None : get_time_now = datetime.datetime.now() # TODO(Guodong Ding) token will expired ahead of time or not expired after the time expire_time = get_time_now + datetime.timedelta(seconds = token_json[ 'expires_in' ]) token_json[ 'expires_on' ] = str (expire_time) self .__access_token = token_json[ 'access_token' ] self .__expires_in = token_json[ 'expires_in' ] self .__expires_on = token_json[ 'expires_on' ] self .__is_expired = 1 try : token_result_set = sqlite3_get_token() except sqlite3.Error: token_result_set = None if token_result_set is None and len (token_result_set) = = 0 : sqlite3_set_token( self .__access_token, self .__expires_in, self .__expires_on, self .__is_expired) else : if self .__is_token_expired() is True : sqlite3_update_token( self .__access_token, self .__expires_on) else : debug( "pass" ) return else : if token_json[ 'errcode' ] is not None : print "errcode is: %s" % token_json[ 'errcode' ] print "errmsg is: %s" % token_json[ 'errmsg' ] else : print result def __get_token_from_persistence_storage( self ): try : token_result_set = sqlite3_get_token() except sqlite3.Error: self .__get_token_from_weixin_qy_api() finally : if token_result_set is None : self .__get_token_from_weixin_qy_api() token_result_set = sqlite3_get_token() access_token = token_result_set[ 0 ][ 0 ] expire_time = token_result_set[ 0 ][ 1 ] else : access_token = token_result_set[ 0 ][ 0 ] expire_time = token_result_set[ 0 ][ 1 ] expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f' ) now_time = datetime.datetime.now() if now_time < expire_time: # print "The token is %s" % access_token # print "The token will expire on %s" % expire_time return access_token else : self .__get_token_from_weixin_qy_api() return self .__get_token_from_persistence_storage() @ staticmethod def __is_token_expired(): try : token_result_set = sqlite3_get_token() except sqlite3.Error as e: print e sys.exit( 1 ) expire_time = token_result_set[ 0 ][ 1 ] expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f' ) now_time = datetime.datetime.now() if now_time < expire_time: return False else : return True def get( self ): return self .__get_token_from_persistence_storage() |
tag:微信公众号,python,sqlite3
--end--
本文转自 urey_pp 51CTO博客,原文链接:http://blog.51cto.com/dgd2010/1837284,如需转载请自行联系原作者