• python安全工具开发基础


    拷贝、with

    ==、is

    a = [11, 22, 33]
    b = [11, 22, 33]
    c=a
    print(id(a))
    print(id(b))
    print(id(c))
    
    print(a == b)
    print(a == c)
    print(a is b)
    print(a is c)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    1655503254336
    1655503220608
    1655503254336
    True
    True
    False
    True

    深拷贝、浅拷贝

    • 浅拷贝:对于一个对象的顶层拷贝
    • 深拷贝:对于一个对象所有层次的拷贝(递归)
    import copy
    _list=['a',4]
    a=[1,2,3,_list]
    b=a
    c=a.copy()
    d=copy.deepcopy(a)
    
    a.append(9)
    a[3].append(8)
    
    print(a)
    print(b)
    print(c)
    print(d)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    [1, 2, 3, [‘a’, 4, 8], 9]
    [1, 2, 3, [‘a’, 4, 8], 9]
    [1, 2, 3, [‘a’, 4, 8]]
    [1, 2, 3, [‘a’, 4]]

    注意:copy.copy对于可变类型会进行浅拷贝,对于不可变类型(例如元组)不会拷贝,仅仅是指向

    with

    上下文管理器

    任何实现了 __enter__() __exit__() 方法的对象都可称之为上下文管理器,上下文管理器对象可以使用 with 关键字

    class File():
        def __init__(self, filename, mode):
            self.filename = filename
            self.mode = mode
    
        def __enter__(self):
            print("entering")
            self.f = open(self.filename, self.mode)
            return self.f
    
        def __exit__(self, *args):
            print("will exit")
            self.f.close()
    
    with File('test', 'w') as f:
        print("coleak")
        f.write('hello, python')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    entering
    coleak
    will exit

    contextmanager 的装饰器

    通过 yield 将函数分割成两部分,yield 之前的语句在__enter__方法中执行,yield 之后的语句在 __exit__ 方法中执行。紧跟在 yield 后面的值是函数的返回值。

    from contextlib import contextmanager
    
    @contextmanager
    def my_open(path, mode):
        f = open(path, mode)
        yield f
        f.close()
    
    
    with my_open('test', 'w') as m:
        m.write("hello , the simplest context manager")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    三器一闭

    迭代器

    • 只要是可以通过for…in…的形式进行遍历的,那么这个数据类型就是可以迭代的
    • 只要是通过isinstance来判断出是Iterable类的实例,即isinstance的结果是True那么就表示,这个数据类型是可以迭代的数据类型
    • 迭代器是一个可以记住遍历的位置的对象。迭代器对象从第一个元素开始访问,直到所有的元素被访问完结束。迭代器只能往前不会后退。
    • listtuple等都是可迭代对象,我们可以通过iter()函数获取这些可迭代对象的迭代器。然后我们可以对获取到的迭代器不断使用next()函数来获取下一条数据
    • 只要在类中,定义__iter__方法,那么这个类创建出来的对象一定是可迭代对象
    • 凡是可作用于for 循环的对象都是 Iterable 类型;
    • 凡是可作用于 next() 函数的对象都是 Iterator 类型
    • 集合数据类型如 listdictstr等是 Iterable 但不是Iterator,不过可以通过 iter() 函数获得一个 Iterator 对象

    Iterable, Iterator

    from collections.abc import Iterator
    nums = [11, 22, 33, 44]
    nums_iter = iter(nums)
    print("nums", isinstance(nums, Iterator))
    print("nums_iter", isinstance(nums_iter, Iterator))
    num1 = next(nums_iter)
    print(num1)
    num2 = next(nums_iter)
    print(num2)
    num3 = next(nums_iter)
    print(num3)
    num4 = next(nums_iter)
    print(num4)
    
    # nums False
    # nums_iter True
    # 11
    # 22
    # 33
    # 44
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    next,iter

    from collections.abc import Iterator
    
    class MyList(object):
        """自定义的一个可迭代对象"""
        def __init__(self):
            self.items = []
            self.current = 0
    
    
        def add(self, val):
            self.items.append(val)
    
        def __iter__(self):
            return self
    
        def __next__(self):
            if self.current < len(self.items):
                item = self.items[self.current]
                self.current += 1
                return item
            else:
            	self.current = 0
                raise StopIteration
    
    if __name__ == '__main__':
        mylist = MyList()
        mylist.add(1)
        mylist.add(2)
        mylist.add(3)
        mylist.add(4)
        mylist.add(5)
        for num in mylist:
            print(num)
        print("mylist是否是迭代器", isinstance(mylist, Iterator))
        print(next(mylist))
        print(next(mylist))
        print(next(mylist))
        print(next(mylist))
        print(next(mylist))
    
    
    # 1
    # 2
    # 3
    # 4
    # 5
    # mylist是否是迭代器 True
    # 1
    # 2
    # 3
    # 4
    # 5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    生成器

    • 生成器是一种特殊的迭代器
    • def函数中有yield关键字的 就称为 生成器

    yield

    def fib_generator():
        num1 = 1
        num2 = 1
        while True:
            temp_num = num1
            num1, num2 = num2, num1+num2
            # return temp_num
            yield temp_num
    
    fib = fib_generator()
    print(next(fib))
    print(next(fib))
    print(next(fib))
    print(next(fib))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    def generator_test():
        while True:
            print("--1--")
            num = yield 100
            print("--2--", "num=", num)
    
    
    g = generator_test()
    print(g.send(None))
    print(g.send(11))
    
    # --1--
    # 100
    # --2-- num= 11
    # --1--
    # 100
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    闭包

    • 闭包定义是在函数内再嵌套函数
    • 闭包是可以访问另一个函数局部作用域中变量的函数
    • 闭包可以读取另外一个函数内部的变量
    • 闭包可以让参数和变量不会被垃圾回收机制回收,始终保持在内存中(而普通的函数调用结束后 会被Python解释器自动释放局部变量)
    def who(name):
        def talk(content):
            print("(%s):%s" % (name, content))
        return talk
    
    zhangsan = who("张三")
    lisi = who("李四")
    
    zhangsan("zsan")
    lisi("lsi")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    def make_filter(keep):  
        def the_filter(file_name):  
            file = open(file_name)  
            lines = file.readlines()  
            file.close()  
            filter_doc = [i for i in lines if keep in i]  
            return filter_doc  
        return the_filter  
    
    filter = make_filter("163.com")  
    filter_result = filter("result.txt")
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    装饰器

    • 引入日志
    • 函数执行时间统计
    • 执行函数前预备处理
    • 执行函数后清理功能
    • 权限校验等场景
    • 缓存

    简单装饰器

    import time
    def out_hello(fn):
        def inner_hello():
            before = time.time()
            fn()
            after = time.time()
            print("函数所用时间是:", after-before)
        return inner_hello
    @out_hello
    def print_hello():
        for i in range(10000):
            print("hello:%d" % i)
    ph = print_hello()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    执行顺序

    def timefun(func):
        print("----开始装饰----")
        def wrapped_func():
            print("----开始调用原函数----")
            func()
            print("----结束调用原函数----")
    
        print("----完成装饰----")
        return wrapped_func
    
    @timefun
    def helloworld():
        print("helloworld")
    
    helloworld()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    带参数

    from time import ctime, sleep
    
    def timefun(func):
        def wrapped_func(a, b):
            print("%s called at %s" % (func.__name__, ctime()))
            print(a, b)
            func(a, b)
        return wrapped_func
    
    @timefun
    def foo(a, b):
        print(a+b)
    
    foo(3,5)
    sleep(2)
    foo(2,4)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    带return的函数

    from time import ctime, sleep
    def timefun(func):
        def wrapped_func():
            print("%s called at %s" % (func.__name__, ctime()))
            return func()
        return wrapped_func
    
    @timefun
    def foo():
        print("I am foo")
    
    @timefun
    def get_info():
        return '----hahah---'
    
    foo()
    sleep(2)
    foo()
    print(get_info())  # 可以看到这里并没有 get_info这个函数 返回的数据,因此这里有不完善的地方
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    类对函数进行装饰

    class Test(object):
        def __init__(self, func):
            print("---初始化---")
            print("func name is %s" % func.__name__)
            self.__func = func
        def __call__(self):
            print("---装饰器中的功能---")
            self.__func()
    
    @Test
    def test():
        print("----test---")
    
    test()  # 如果把这句话注释,重新运行程序,依然会看到"--初始化--"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    动态绑定

    import types
    
    class Person():
        num = 0
        def __init__(self, name = None, age = None):
            self.name = name
            self.age = age
        def eat(self):
            print("---默认的实例方法---")
    
    # 定义一个实例方法
    def run(self, speed):
        print("----实例方法--1--")
        print("%s在移动, 速度是 %d km/h"%(self.name, speed))
        print("----实例方法--2--")
    
    # 定义一个类方法
    @classmethod
    def test_class(cls):
        print("----类方法--1--")
        print("num=%d" % cls.num)
        cls.num = 100
        print("num=%d" % cls.num)
        print("----类方法--2--")
    
    # 定义一个静态方法
    @staticmethod
    def test_static():
        print("----静态方法--1--")
        print("---static method----")
        print("----静态方法--2--")
    
    # 创建一个实例对象
    p = Person("老王", 24)
    # 调用在class中的方法
    p.eat()
    
    # 给这个对象添加实例方法
    p.run = types.MethodType(run,p)
    # 调用实例方法
    p.run(180)
    
    # 给Person类绑定类方法
    Person.test_class = test_class
    
    # 调用类方法
    Person.test_class()
    
    # 给Person类绑定静态方法
    Person.test_static = test_static
    # 调用静态方法
    Person.test_static()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    slots

    __slots__ = ("name", "age")
    
    • 1

    限制实例的属性,只允许对Person实例添加nameage属性

    垃圾回收

    • python采用的是引用计数机制为主,标记-清除和**分代收集(隔代回收)**两种机制为辅的策略。
    • [-5, 256] 这些整数对象是提前建立好的,不会被垃圾回收
    • 大整数不共用内存,引用计数为0,销毁
    • 单个单词,不可修改,默认开启intern机制,共用对象,引用计数为0,则销毁
    • 字符串(含有空格),不可修改,没开启intern机制,不共用对象,引用计数为0,销毁

    引用计数机制的优点

    • 简单
    • 实时性:一旦没有引用,内存就直接释放了。不用像其他机制等到特定时机。实时性还带来一个好处:处理回收内存的时间分摊到了平时

    引用计数机制的缺点

    • 维护引用计数消耗资源
    • 循环引用

    GC系统

    • 为新生成的对象分配内存
    • 识别哪些是垃圾对象
    • 回收垃圾对象占用的内存

    导致引用计数+1的情况

    • 对象被创建,例如a=23
    • 对象被引用,例如b=a
    • 对象被作为参数,传入到一个函数中,例如func(a)
    • 对象作为一个元素,存储在容器中,例如list1=[a,a]

    导致引用计数-1的情况

    • 对象的别名被显式销毁,例如del a
    • 对象的别名被赋予新的对象,例如a=24
    • 一个对象离开它的作用域,例如f函数执行完毕时,func函数中的局部变量(全局变量不会)
    • 对象所在的容器被销毁,或从容器中删除对象

    分代回收

    • 分代回收是一种以空间换时间的操作方式,Python将内存根据对象的存活时间划分为不同的集合,每个集合称为一个代,Python将内存分为了3“代”,分别为年轻代(第0代)、中年代(第1代)、老年代(第2代),他们对应的是3个链表,它们的垃圾收集频率随着对象存活时间的增大而减小。
    • 新创建的对象都会分配在年轻代,年轻代链表的总数达到上限时,Python垃圾收集机制就会被触发,把那些可以被回收的对象回收掉,而那些不会回收的对象就会被移到中年代去,依此类推,老年代中的对象是存活时间最久的对象,甚至是存活于整个系统的生命周期内。
    • 同时,分代回收是建立在标记清除技术基础之上。分代回收同样作为Python的辅助垃圾收集技术处理那些容器对象

    gc模块

    • gc.get_count():获取当前自动执行垃圾回收的计数器,返回一个长度为3的列表
    • gc.get_threshold():获取gc模块中自动执行垃圾回收的频率,默认是(700, 10, 10)
    • gc.set_threshold(threshold0[,threshold1,threshold2]):设置自动执行垃圾回收的频率
    • gc.disable():python3默认开启gc机制,可以使用该方法手动关闭gc机制
    • gc.collect():手动调用垃圾回收机制回收垃圾

    查看引用计数

    import sys
    a = "hello world"
    sys.getrefcount(a)
    
    • 1
    • 2
    • 3

    查看阈值

    import gc
    
    print(gc.get_threshold())
    #(700, 10, 10)
    # 700:表示当分配对象的个数达到700时,进行一次0代回收
    # 10:当进行10次0代回收以后触发一次1代回收
    # 10:当进行10次1代回收以后触发一次2代回收
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    内存泄漏

    import gc
    
    class ClassA():
        def __init__(self):
            print('object born,id:%s'%str(id(self)))
    
    def f2():
        while True:
            c1 = ClassA()
            c2 = ClassA()
            c1.t = c2
            c2.t = c1
            del c1
            del c2
            #gc.collect() 手动调用垃圾回收功能,这样在自动垃圾回收被关闭的情况下,也会进行回收
    
    #python默认是开启垃圾回收的,可以通过下面代码来将其关闭
    gc.disable()
    f2()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    有三种情况会触发垃圾回收

    当gc模块的计数器达到阈值的时候,自动回收垃圾
    调用gc.collect(),手动回收垃圾
    程序退出的时候,python解释器来回收垃圾
    
    • 1
    • 2
    • 3
    import gc
    class ClassA():
        pass
    
    print(gc.get_count())
    a = ClassA()
    print(gc.get_count())
    del a
    print(gc.get_count())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    网络编程

    函数 socket.socket 创建一个套接字,该函数带有两个参数:

    • Address Family:可以选择 AF_INET(用于 Internet 进程间通信) 或者 AF_UNIX(用于同一台机器进程间通信),实际工作中常用AF_INET
    • Type:套接字类型,可以是 SOCK_STREAM(流式套接字,主要用于 TCP 协议)或者 SOCK_DGRAM(数据报套接字,主要用于 UDP 协议)

    Udp

    数据传输

    from socket import *
    
    
    def send_msg(udp_socket):
        """获取键盘数据,并将其发送给对方"""
        # 1. 从键盘输入数据
        msg = input("\n请输入要发送的数据:")
        # 2. 输入对方的ip地址
        dest_ip = input("\n请输入对方的ip地址:")
        # 3. 输入对方的port
        dest_port = int(input("\n请输入对方的Aport:"))
        # 4. 发送数据
        udp_socket.sendto(msg.encode("utf-8"), (dest_ip, dest_port))
    
    
    def recv_msg(udp_socket):
        """接收数据并显示"""
        # 1. 接收数据
        recv_msg = udp_socket.recvfrom(1024)
        # 2. 解码
        recv_ip = recv_msg[1]
        recv_msg = recv_msg[0].decode("utf-8")
        # 3. 显示接收到的数据
        print(">>>%s:%s" % (str(recv_ip), recv_msg))
    
    
    def main():
        # 1. 创建套接字
        udp_socket = socket(AF_INET,SOCK_DGRAM)
        # 2. 绑定本地信息
        udp_socket.bind(("", 7890))
        while True:
            # 3. 选择功能
            print("="*30)
            print("1:发送消息")
            print("2:接收消息")
            print("="*30)
            op_num = input("请输入要操作的功能序号:")
    
            # 4. 根据选择调用相应的函数
            if op_num == "1":
                send_msg(udp_socket)
            elif op_num == "2":
                recv_msg(udp_socket)
            else:
                print("输入有误,请重新输入...")
    
    if __name__ == "__main__":
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    广播

    import socket
    
    # 1. 创建UDP套接字
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    # 2. 设置UDP套接字允许其广播(注意如果udp套接字需要广播,则一定要添加此语句)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    
    # 选做 绑定本地信息
    # s.bind(("", 8080))
    
    # 4. 向本局域网中发送广播数据
    # 此时只要是本局域网中的电脑上有 用1060端口的udp程序 它就会收到此数据
    dest_info = ("", 1060)  # 会自动改为本局域网的广播ip
    s.sendto('hello world !'.encode('utf-8'), dest_info)
    
    # 5. 关闭套接字
    s.close()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    Tcp

    数据传输

    import socket
    
    # 1. 创建TCP套接字
    server_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # 2. 绑定本地信息
    server_s.bind(("", 8082))
    
    # 3. 设置为被动的
    server_s.listen(128)
    
    # 4. 等待客户端链接
    new_s,client_info = server_s.accept()
    
    # 5. 用新的套接字为已经连接好的客户端服务器
    while True:
        recv_content = new_s.recv(1024)
        print(f"{str(client_info)},{recv_content.decode('utf-8')}")
    
        if not recv_content:
            # 当客户端调用了close后,recv返回值为空,此时服务套接字就可以close了
            # 6. 关闭服务套接字
            new_s.close()
            break
    
    # 7. 关闭监听套接字
    server_s.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    from socket import *
    
    # 1. 创建socket
    tcp_client_socket = socket(AF_INET, SOCK_STREAM)
    
    # 2. 链接服务器
    tcp_client_socket.connect(("127.0.0.1", 8082))
    
    # 3. 向服务器发送数据
    while True:
        ch=int (input("选择1则发送信息"))
        if ch==1:
            send_data = input("请输入要发送的数据:")
            tcp_client_socket.send(send_data.encode("utf-8"))
        else:
            break
    
    tcp_client_socket.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    文件下载功能

    from socket import *
    import sys
    
    
    def get_file_content(file_name):
        """获取文件的内容"""
        try:
            with open(file_name, "rb") as f:
                content = f.read()
            return content
        except:
            print("没有下载的文件:%s" % file_name)
    
    
    def main():
        port = int(input("开启的端口"))
        # 创建socket
        tcp_server_socket = socket(AF_INET, SOCK_STREAM)
        # 本地信息
        address = ('', port)
        # 绑定本地信息
        tcp_server_socket.bind(address)
        # 将主动套接字变为被动套接字
        tcp_server_socket.listen(128)
    
        # 等待客户端的链接,即为这个客户端发送文件
        client_socket, clientAddr = tcp_server_socket.accept()
        # 接收对方发送过来的数据
        recv_data = client_socket.recv(1024)  # 接收1024个字节
        file_name = recv_data.decode("utf-8")
        print("对方请求下载的文件名为:%s" % file_name)
        file_content = get_file_content(file_name)
        # 发送文件的数据给客户端
        # 因为获取打开文件时是以rb方式打开,所以file_content中的数据已经是二进制的格式,因此不需要encode编码
        if file_content:
            client_socket.send(file_content)
        # 关闭这个套接字
        client_socket.close()
        # 关闭监听套接字
        tcp_server_socket.close()
    
    if __name__ == "__main__":
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    import time
    from socket import *
    
    def main():
    
        # 创建socket
        tcp_client_socket = socket(AF_INET, SOCK_STREAM)
    
        # 目的信息
        server_ip = input("请输入服务器ip:")
        server_port = int(input("请输入服务器port:"))
    
        # 链接服务器
        tcp_client_socket.connect((server_ip, server_port))
    
        # 输入需要下载的文件名
        file_name = input("请输入要下载的文件名:")
    
        # 发送文件下载请求
        tcp_client_socket.send(file_name.encode("utf-8"))
    
        # 接收对方发送过来的数据,最大接收1024个字节(1K)
        recv_data = tcp_client_socket.recv(1024)
        # print('接收到的数据为:', recv_data.decode('utf-8'))
        # 如果接收到数据再创建文件,否则不创建
        if recv_data:
            with open(f"new_{time.time()}", "wb") as f:
                f.write(recv_data)
            print('下载完毕')
    
        # 关闭套接字
        tcp_client_socket.close()
    
    
    if __name__ == "__main__":
        main()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    协程

    yield

    import time
    
    def work1():
        while True:
            print("----work1---")
            yield
            time.sleep(0.5)
    
    def work2():
        while True:
            print("----work2---")
            yield
            time.sleep(0.5)
    
    def main():
        w1 = work1()
        w2 = work2()
        while True:
            next(w1)
            next(w2)
    
    if __name__ == "__main__":
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    greenlet

    from greenlet import greenlet
    import time
    
    def test1():
        while True:
            print("---A1--")
            gr2.switch()
            time.sleep(0.5)
            print("---A2--")
    
    
    def test2():
        while True:
            print("---B1--")
            gr1.switch()
            time.sleep(0.5)
            print("---B2--")
    
    
    gr1 = greenlet(test1)
    gr2 = greenlet(test2)
    
    #切换到gr1中运行
    gr1.switch()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    gevent

    import gevent
    import time
    from gevent import monkey
    
    
    monkey.patch_all()
    
    
    def f1(n):
        for i in range(n):
            print("-----f1-----", i)
            # gevent.sleep(1)
            time.sleep(1)
    
    
    def f2(n):
        for i in range(n):
            print("-----f2-----", i)
            # gevent.sleep(1)
            time.sleep(1)
    
    def f3(n):
        for i in range(n):
            print("-----f3-----", i)
            # gevent.sleep(1)
            time.sleep(1)
    
    
    g1 = gevent.spawn(f1, 5)
    g2 = gevent.spawn(f2, 5)
    g3 = gevent.spawn(f3, 5)
    g1.join()  # join会等待g1标识的那个任务执行完毕之后 对其进行清理工作,其实这就是一个 耗时操作
    g2.join()
    g3.join()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    通过添加monkey.patch_all()能够让程序中看上去的time.sleep也具备了自动切换任务的功能,实际上它会悄悄的修改程序中time.sleepgevent.sleep从而实现功能

    import gevent
    import random
    import time
    from gevent import monkey
    
    monkey.patch_all()
    
    def coroutine_work(coroutine_name):
        for i in range(10):
            print(coroutine_name, i)
            time.sleep(random.random())
    
    
    def coroutine_work2(coroutine_name):
        for i in range(10):
            print(coroutine_name, i)
            time.sleep(random.random())
    
    
    gevent.joinall([
            gevent.spawn(coroutine_work, "work1"),
            gevent.spawn(coroutine_work2, "work2")
    ])
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    gevent.joinall,只要将得到的协程对象放到里面即可

    mysql

    设置远程权限

    mysql -uroot -proot
    use mysql;
    select user,host from user;
    update user set host = '%'  where user ='root';
    grant all privileges on *.* to 'root'@'%' identified by 'root' with grant option;
    flush privileges;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    grant all privileges on . to ‘用户名’@‘%’ identified by ‘密码’ with grant option;
    注释: 第一个 * ,表示被授权访问的库
    第二个 *, 表示库下的所有表
    ‘用户名’@‘%’ 用户名 表示授权用户,%表示任意的ip地址
    【identified by ‘密码’】 访问mysql的密码
    整句命令的意思就是,允许在任何IP地址上用这个用户名和密码来访问这个mysql

    查看版本

    import pymysql
    conn=pymysql.Connection(
        host='192.168.10.133',
        port=3306,
        user='root',
        password='root'
    )
    # mysql数据库服务器的版本
    # 使用cursor()方法获取操作游标
    cursor = conn.cursor()
    
    # 使用execute方法执行SQL语句
    cursor.execute("SELECT VERSION();")
    
    # 使用 fetchone() 方法获取一条数据
    data = cursor.fetchone()
    
    print(data)
    
    # 关闭数据库连接
    conn.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    创建表

    import pymysql
    
    db = pymysql.connect(
        host='192.168.10.133',
        port=3306,
        user='root',
        password='root',
        autocommit=True
        )
    
    # 使用 cursor() 方法创建一个游标对象 cursor
    cursor = db.cursor()
    db.select_db('test')
    # 使用 execute() 方法执行 SQL,如果表存在则删除
    cursor.execute("DROP TABLE IF EXISTS EMPLOYEE;")
    
    # 使用预处理语句创建表
    sql = """CREATE TABLE EMPLOYEE (
             FIRST_NAME  CHAR(20) NOT NULL,
             LAST_NAME  CHAR(20),
             AGE INT,  
             SEX CHAR(1),
             INCOME FLOAT )"""
    
    cursor.execute(sql)
    
    # 关闭数据库连接
    db.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    插入数据

    # SQL 插入语句
    sql = "INSERT INTO EMPLOYEE(FIRST_NAME,LAST_NAME, AGE, SEX, INCOME)VALUES ('%s', '%s',  %s,  '%s',  %s)" %('Mac', 'Mohan', 21, 'M', 2000)
    try:
        # 执行sql语句
        cursor.execute(sql)
    except:
        # 如果发生错误则回滚
        db.rollback()
    
    # 关闭数据库连接
    db.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    查询操作

    # SQL 插入语句
    sql = "SELECT * FROM EMPLOYEE \
           WHERE INCOME > %s" % (1000)
    try:
        # 执行SQL语句
        cursor.execute(sql)
        # 获取所有记录列表
        results = cursor.fetchall()
        # result=cursor.fetchone()
        # print(result)
        print(results)
        for row in results:
            fname = row[0]
            lname = row[1]
            age = row[2]
            sex = row[3]
            income = row[4]
            # 打印结果
            print("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \
                  (fname, lname, age, sex, income))
    except:
        print("Error: unable to fetch data")
    
    # 关闭数据库连接
    db.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    更新操作

    # SQL 更新语句
    sql = "UPDATE EMPLOYEE SET AGE = AGE + 10 WHERE SEX = '%c'" % ('M')
    try:
        # 执行SQL语句
        cursor.execute(sql)
    except:
        # 发生错误时回滚
        db.rollback()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    删除操作

    # SQL 删除语句
    sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
    try:
        # 执行SQL语句
        cursor.execute(sql)
        # 提交修改
        db.commit()
    except:
        # 发生错误时回滚
        db.rollback()
    
    # 关闭连接
    db.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    预处理防止注入

    开启日志

    set global general_log_file='/tmp/general_log';
    set global general_log=on;
    show global variables like '%general%';
    
    • 1
    • 2
    • 3

    普通查询操作

    cursor = db.cursor()
    db.select_db('test')
    fn='Mac2'
    # SQL 插入语句
    sql = f"SELECT * FROM EMPLOYEE WHERE FIRST_NAME = '{fn}';"
    print(sql)
    try:
        # 执行SQL语句
        cursor.execute(sql)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    日志记录

    85 Connect  root@192.168.10.1 on  using TCP/IP
    85 Init DB  test
    85 Query    SELECT * FROM EMPLOYEE WHERE FIRST_NAME = 'Mac2'
    85 Quit
    
    • 1
    • 2
    • 3
    • 4

    修改fn为

    fn=f"1' or 1=1 #"
    即SELECT * FROM EMPLOYEE WHERE FIRST_NAME = '1' or 1=1 #';
    
    • 1
    • 2

    日志记录

    88 Connect  root@192.168.10.1 on  using TCP/IP
    88 Init DB  test
    88 Query    SELECT * FROM EMPLOYEE WHERE FIRST_NAME = '1' or 1=1 --+ '
    88 Quit
    
    • 1
    • 2
    • 3
    • 4

    此时存在sql注入,返回所有数据

    预处理

    fn=f"1' or 1=1 #"
    # SQL 插入语句
    sql = "select * from EMPLOYEE where FIRST_NAME = %s"
    print(sql)
    try:
        # 执行SQL语句
        cursor.execute(sql,fn)
        # 获取所有记录列表
        results = cursor.fetchall()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    日志记录

    93 Connect  root@192.168.10.1 on  using TCP/IP
    93 Init DB  test
    93 Query    select * from EMPLOYEE where FIRST_NAME = '1\' or 1=1 #'
    93 Quit
    
    • 1
    • 2
    • 3
    • 4

    发现此时单引号已经被转义

    mysql原预编译语法为

     prepare emp from 'select * from EMPLOYEE where FIRST_NAME = ?';
     set @FIRST_NAME='Mac';
     execute emp using @FIRST_NAME;
    
    • 1
    • 2
    • 3

    redis

    未授权/弱密码

    import socket
    import sys
    def check(ip, port, file,timeout):
        passfile=open(file,'r')
        PASSWORD_DIC=passfile.read().splitlines()
        socket.setdefaulttimeout(timeout)
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect((ip, int(port)))
        s.send("INFO\r\n".encode("utf-8"))
        result = s.recv(1024).decode()
        if "redis_version" in result:
            return "未授权访问"
        elif "Authentication" in result:
            for pass_ in PASSWORD_DIC:
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                s.connect((ip, int(port)))
                s.send(f"AUTH {pass_}\r\n".encode("utf-8"))
                result = s.recv(1024).decode()
                if 'OK' in result:
                    return "存在弱口令,密码:%s" % (pass_)
    if __name__ == '__main__':
        ip=sys.argv[1]
        port=sys.argv[2]
        file=sys.argv[3]
        print(check(ip, port,file, timeout=10))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    python redis.py 127.0.0.1 6379 pass.txt
    
    • 1
  • 相关阅读:
    centos7.9安装jenkins
    【AGC】如何集成华为AGC性能管理- iOS
    Windows10操作系统安装AD DS
    vue2学习之axios在项目中的优化
    Vue中的$nextTick有什么作用?
    Python网页解析库:用requests-html爬取网页
    最新基于 R 语言 lavaan 结构方程模型(SEM)实践技术应用
    STM32F407ZGT6|串口发送数据给上位机(定时)
    Ubuntu安装pyenv,配置虚拟环境
    Python.03.函数使用
  • 原文地址:https://blog.csdn.net/qq_63701832/article/details/133657136