# socket 模块 API 文档 ## API ### class socket(_socket.socket): ``` python def __init__(self,*vars):... ``` ``` python def bind(self,host_port):... ``` ``` python def listen(self,num):... ``` ``` python def accept(self):... ``` ``` python def send(self,data):... ``` ``` python def close(self):... ``` ``` python def connect(self,host_port):... ``` ``` python def recv(self,num):... ``` ``` python def setblocking(self,sta):... ``` ``` python def gethostname():... ``` ``` python def gethostbyname(host):... ``` ## Examples ### socket_thread.py ```python import socket import _thread import random import time test_finished = False server_started = False def socket_server_task(host, port): """ socket 服务器任务 :return: """ print("socket server start:", host, port) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((host, port)) s.listen(5) print("socket server waiting accept") global server_started server_started = True accept, addr = s.accept() print("socket server accepted at", addr) while True: try: data = accept.recv(1024) print('socket server recv:', data.decode()) accept.send(data) except Exception: print('socket server closing accept') accept.close() break print("socket server closing") s.close() global test_finished test_finished = True def socket_server_init(host='0.0.0.0', port=36500): _thread.start_new_thread(socket_server_task, (host, port)) def socket_client_task(host, port): print("socket client start:", host, port) client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, port)) client.send("hello".encode()) recv = client.recv(1024).decode() print("client recv:", recv) client.close() def socket_server_test(host='0.0.0.0', port=36500): _thread.start_new_thread(socket_client_task, (host, port)) test_port = random.randint(10000, 65535) socket_server_init(port=test_port) while not server_started: time.sleep(0.1) socket_server_test(port=test_port) while not test_finished: time.sleep(0.1) ``` ### socket_DNS.py ```python import socket # 创建一个 socket 对象 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 使用域名和端口来进行连接 s.connect(("dns.google", 53)) # 关闭连接 s.close() print("PASS") ``` ### socket_GET.py ```python import socket def test_socket_GET(): # 创建一个socket对象 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 获取服务器的IP地址 # server_ip = socket.gethostbyname('baidu.com') server_port = 80 # 连接到服务器 s.connect(('pikapython.com', server_port)) # 创建HTTP GET请求 request = 'GET / HTTP/1.1\r\nHost: pikascript.com\r\n\r\n' # print('request:', request) s.send(request.encode()) # 接收服务器的响应 response = '' while True: try: recv = s.recv(1024) except: break if not recv: break response += recv.decode() s.close() return response for i in range(10): response = test_socket_GET() res = 'HTTP/1.1' in response if res == True: break print('test_socket_GET() failed, retrying...') print('response', response) ``` ### server_client.py ```python import socket import random server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) host = "127.0.0.1" port = 9999 + random.randint(0, 1000) print("port:", port) server.bind((host, port)) server.listen(5) client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, port)) accept, addr = server.accept() print("recv from client: %s" % str(addr)) client.send("send test from client".encode()) print("server recv:", accept.recv(1024).decode()) accept.send("send test from server".encode()) print("client recv:", client.recv(1024).decode()) accept.close() client.close() server.close() ``` ### gethostname.py ```python import socket hostname = socket.gethostname() print(hostname) ``` ### socket_json.py ```python import socket import _thread import random import time import json test_finished = False server_started = False test_data = { 'result': { 'a_a': { 'value': 0.290000, 'desc': 'A 相电流' } }, 'code': 0 } def socket_server_task(host, port): """ socket 服务器任务 :return: """ print("socket server start:", host, port) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((host, port)) s.listen(5) global server_started server_started = True while True: try: print("socket server waiting accept") accept, addr = s.accept() print("socket server accepted at", addr) while True: data = accept.recv(1024) print('socket server recv:', data.decode()) # accept.send(data) accept.send(json.dumps(test_data)) except Exception: print('socket server closing accept') accept.close() break print("socket server closing") s.close() global test_finished test_finished = True def socket_server_init(host='0.0.0.0', port=36500): _thread.start_new_thread(socket_server_task, (host, port)) def socket_client_task(host, port): print("socket client start:", host, port) client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, port)) for i in range(2): client.send("hello".encode()) recv = client.recv(1024).decode() print("client recv:", recv) client.close() def socket_server_test(host='0.0.0.0', port=36500): _thread.start_new_thread(socket_client_task, (host, port)) test_port = random.randint(10000, 65535) socket_server_init(port=test_port) while not server_started: time.sleep(0.1) socket_server_test(port=test_port) while not test_finished: time.sleep(0.1) ``` ### socket_download.py ```python import socket import time def http_download_file(url: str, file_path: str, buff_size=1024): # Parse the URL to extract the host and path if not url.startswith('http://'): print("Only HTTP protocol is supported") return -1 host = url.split("//")[1].split('/')[0] path = url[url.find(host) + len(host):] if len(path) == 0: path = "/" print("Host:", host) print("Path:", path) # Establish a socket connection try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((host, 80)) print("Connected to:", host) except Exception as e: print("Connection error:", e) return -1 # Send HTTP GET request get_request = "GET " + path + " HTTP/1.1\r\nHost: " + \ host + "\r\nConnection: close\r\n\r\n" sock.send(get_request.encode()) # Open file to write f = open(file_path, 'wb') # Manually open the file data_received = False head_received = False while True: try: data = sock.recv(buff_size) except: print('End of data') break print("[Data received:", len(data), ']') if head_received: sz = f.write(data) # print("Data written:", sz) # print(data.decode()) if len(data) == 0: print("Length of data:", len(data)) if not data_received: print("No data received.") f.close() return -1 print("No more data to receive") break data_received = True # Handle the end of the HTTP header if it's still present if head_received == False: if b'\r\n\r\n' in data: # print("Header received", data) head_received = True aplited = data.split(b'\r\n\r\n', 1) if len(aplited) == 2: sz = f.write(aplited[1]) # Close file and socket manually f.close() sock.close() print("Download completed") return 0 assert http_download_file("http://pikapython.com", "pikapython.html") == 0 print("PASS") ```