首页 > 技术收集 > work tools – ip log analysis and record

work tools – ip log analysis and record

2011年7月27日 发表评论 阅读评论

收录一个工作中使用的日志分析脚本,使用python写成,简单明了,直接上代码:

#! /usr/bin/env python
# -*- coding: utf8
 
import MySQLdb
import datetime
import sys, time
import traceback
import devintool as t
from threading import Thread
import threading
import Queue as q
THREAD_NUM = 100
BATCH_SIZE = 1000
 
clients=[]
 
class Conn():
    def __init__(self, host, port, db="fb", user="", passwd="", charset="utf8"):
        self.host=host
        self.port=port
        self.db=db
        self.user=user
        self.passwd=passwd
        self.charset=charset
        self.is_open=False
        self.__conn = None
 
    def open(self):
        self.__conn = MySQLdb.connect(host=self.host, port=self.port, user=self.user,
            passwd=self.passwd, db=self.db,  charset=self.charset)
        self.is_open=True
        return self.__conn
 
    def close(self):
        self.__conn.close()
        self.is_open=False
 
    def cursor(self):
        return self.__conn.cursor()
 
stdoutMutex = threading.Lock()
class mysqlThread(Thread):
    #def __init__(self, host1, port1, db1, host2, port2, db2, queue):
    def __init__(self, host1, port1, db1, queue):
        Thread.__init__(self)
        self.queue=queue
        self.conn1=Conn(host=host1,port=port1,db=db1)
        self.cursor1=self.conn1.open().cursor()
        self.cursor1.execute('set autocommit=1')
        '''
	self.cursor2=self.conn2.open().cursor()
	self.conn2=Conn(host=host2,port=port2,db=db2)
        self.cursor2.execute('set autocommit=1')
	'''
        self._stop = threading.Event()
 
 
    def run(self):
        print "run"
        while True:
	    #try:
                sql = self.queue.get()
                if not self.cursor1:
                    self.cursor1 = self.conn1.open().cursor()
                    self.cursor1.execute('set autocommit=1')
                #if not self.cursor2:
                #    self.cursor2 = self.conn2.open().cursor()
                #    self.cursor2.execute('set autocommit=1')
                self.cursor1.execute(sql)
                #self.cursor2.execute(sql)
                self.queue.task_done()
                '''
          commit_cnt+=1
          print commit_cnt
          if commit_cnt%10000==0:
                stdoutMutex.acquire()
                print datetime.datetime.now()
                print commit_cnt 
                stdoutMutex.release()
                '''
            #except Exception,e:
	    #	print e
class iplogThread(Thread):
    def __init__(self, queue, date):
         Thread.__init__(self)
         self.queue = queue
         self.date = date
	 self.conn = Conn(host="log1.kdb.d.kaixin.com",port=3306,db="fb")
         self.cursor = self.conn.open().cursor()
 
    def run(self):
        user_cnt=0        
        id_begin=0
        while(1):
            print datetime.datetime.now()
            user_set = self.get_user_ids(self.cursor, id_begin, BATCH_SIZE) 
            if len(user_set)==0:
               PUT_DONE = 1
               break
 
            user_cnt+=len(user_set)
            print id_begin
            id_begin=int(user_set[-1][0])
            self.__handle(user_set) 
            print "%d, %d"%(len(user_set), user_cnt)
            print datetime.datetime.now()
            if len(user_set)<BATCH_SIZE:
                PUT_DONE = 1
                break
        time.sleep(300) 
 
    def __handle(self, user_set):
       user_set_type0=[]
       user_set_type1=[]
       user_set_type3=[]
       user_set_type_=[]
       user_set_type4=[]
       user_set_type__=[]
       keyMap={"user_set_type0":"web", "user_set_type1":"wap", "user_set_type3":"im", "user_set_type_":"client", "user_set_type__":"others", "user_set_type4": "im_click"}
       for user in user_set:
	   type = '__'
	   if user[1] in (0, 1, 3, 4):
	       type = str(user[1])
	   elif user[1] in clients:
	       type = "_"
	   else:
	       type = '__'
           #type = ((user[1]>=0 and user[1]<=3) and [str(user[1])] or ["_"])[0]
           locals()["user_set_type%s" % (type,)].append(user[0])
 
       for type in ("0", "1", "3", "_", "4", "__"):
           listName = "user_set_type%s" % (type,)
	   if len(locals()[listName])>0:
               str1 = ",".join("(%d, 1<<%d, 1<<%d)"%(id, self.date.day-1, self.date.day-1) for id in locals()[listName])
               str2 = ",".join("%d"%id for id in locals()[listName])
               str1 = str1.rstrip(',')
               str2 = str2.rstrip(',')
               sql = "insert ignore into user_login_%s (id, login_days, %s) values %s" %(self.date.strftime('%Y%m'),keyMap[listName],str1)
               #print sql
               self.queue.put(sql)
               #bit record log days
               sql = "update user_login_%s set login_days=login_days|1<<%d, %s=%s|1<<%d where id in (%s)"%(self.date.strftime('%Y%m'), (self.date.day-1), keyMap[listName], keyMap[listName], (self.date.day-1), str2)
               #print sql
               self.queue.put(sql)
	   else:
	       print "len=0:", keyMap[listName]
 
       '''
       #deal with Other
       str1 = ",".join("(%d, 1<<%d)"%(id, self.date.day-1) for id in user_set_typeOther)
       str2 = ",".join("%d"%id for id in user_set_typeOther)
       str1 = str1.rstrip(',')
       str2 = str2.rstrip(',')
       sql = "insert ignore into user_login_%s (id, login_days) values %s" %(self.date.strftime('%Y%m'), str1)
       self.queue.put(sql)
       sql = "update user_login_%s set login_days=login_days|1<<%d where id in (%s)"%(self.date.strftime('%Y%m'), (self.date.day-1), str2)
       self.queue.put(sql)
       '''
 
    def get_user_ids(self, cursor, id_begin=0, limit=10000):
        sql="select id, type from iplog_typeid_%s where id>%d order by id limit %d" % (self.date.strftime("%Y%m%d"), id_begin, limit)
        return t.get_set_cur(sql, cursor)
 
def getClients():
    sql = "select app_id from ClientDefine;"
    cursor = MySQLdb.connect(host="10.22.241.148", user="xiaoneilogs", passwd="Pycd8452", port=3306, db="xnstat", charset="utf8").cursor()
    rs = t.get_set_cur(sql, cursor)
    return [l[0] for l in rs]
 
if __name__ == "__main__":
    #try:
        if len(sys.argv)>1:
            date_str = sys.argv[1]
            print date_str
            date = datetime.datetime(int(date_str.split('-')[0]),int(date_str.split('-')[1]),int(date_str.split('-')[2]) )
            print date
        #x = raw_input('ok')
        else:
  	    date = (datetime.datetime.today() - datetime.timedelta(days=1)).date()
	print date
 
	clients=getClients()
	print "clients:n", clients
 
	conn1 = Conn(host="10.22.200.224", db="ugc_xn", port=3306)
	conn2 = Conn(host="123.125.45.215", db="user_login", port=3306)
	#for conn in (conn1,conn2):
	for conn in (conn1, ):
            sql = "create table if not exists user_login_%s like user_login; "%date.strftime('%Y%m') 
            print sql
            conn.open()
            conn.cursor().execute(sql)
            conn.close()
        queue =  q.Queue(THREAD_NUM)
        for i in range(THREAD_NUM):
            #th = mysqlThread(host1='10.22.200.224', port1=3306, db1='ugc_xn', host2='123.125.45.215',port2=3306, db2='user_login', queue=queue)
 
    	    th = mysqlThread(host1='10.22.200.224', port1=3306, db1='ugc_xn', queue=queue)
            th.setDaemon(True)
            th.start()
        iplog = iplogThread(queue, date)
        iplog.start()    
        queue.join()
    #except Exception,e:
    #	print "Exception happened : ",e
    #	t.putout_except(e,os.path.dirname(os.path.realpath(__file__)))
分类: 技术收集
  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.