由于工作中一些需求,需要完成C++与Python之间的Socket通信,基于自定义协议的通信。
环境:
Centos 6.5+gcc (GCC) 4.8.2+kernel 2.6.32+cmake version 2.8.12.2+Python 3.4.3(windows环境)
Bug修复请参考:
难点:
1:Python Socket Struct解析。
由于C++的Socket开发是基于自定义协议的,通信中加入了结构体,在Python中需要对socket通信中的结构体进行解析。Python中正好有struct模块,所以,可以借助该模块进行自定义协议的通信。
2:Python中自定义协议与服务器端协议统一
客户端协议的解析读取,发送等都要与服务端相同,不然势必无法正常按照协议进行通信。
代码如下:
服务端(基于C语言):
socket_server.c:
#include <netdb.h> #include <sys/socket.h> #include <unistd.h> #include <string.h> #include <stdio.h> #include <memory.h> #include <signal.h> #include <time.h> #include <pthread.h> #include <errno.h> #include "msg.h" #define MAXCONNECTIONS 100 int sockfd; void sig_handler(int signo) { if (signo == SIGINT) { printf("server close\n"); close(sockfd); exit(1); } } /*输出连接上来的客户端的相关信息*/ void out_addr(struct sockaddr_in *clientaddr) { //将端口从网络字节序转换成主机字节序 int port = ntohs(clientaddr->sin_port); char ip[16]; memset(ip, 0, sizeof(ip)); //将ip地址从网络字节序转换成点分十进制 inet_ntop(AF_INET, &clientaddr->sin_addr.s_addr, ip, sizeof(ip)); printf("client: %s(%d) connected\n", ip, port); } /*输出服务器端时间*/ void do_service(int fd) { /*和客户端进行读写操作(双向通信)*/ char buff[1024]; while (1) { memset(buff, 0, sizeof(buff)); printf("start read and write...\n"); size_t size; //printf("sizeof buff is %d = \n", sizeof(buff)); if ((size = read_msg(fd, buff, sizeof(buff))) < 0) { print("protocal error"); break; } else if (size == 0) { break; } else { printf("Server Received = %s\n", buff); //printf("socket_server send size = %d \n", strlen(buff)); //printf("socket_server fd = %d \n", fd); //printf("socket_server send size = %d \n", sizeof(buff)); //printf("socket_server send size = %d \n", strlen(buff)); if (write_msg(fd, buff, sizeof(buff)) < 0) { printf("errorno = %d", errno); if (errno == EPIPE) { break; } perror("protocal error"); } } } } void out_fd(int fd) { struct sockaddr_in addr; socklen_t len = sizeof(addr); //从fd中获取连接的客户端相关信息 if (getpeername(fd, (struct sockaddr *)&addr, &len) < 0) { perror("getpeername error"); return; } char ip[16]; memset(ip, 0, sizeof(ip)); int port = ntohs(addr.sin_port); inet_ntop(AF_INET, &addr.sin_addr.s_addr, ip, sizeof(ip)); printf("%16s(%5d) closed!\n", ip, port); } void *th_fn(void *arg) { int fd = (int)arg; do_service(fd); out_fd(fd); close(fd); return (void *)0; } int main(int argc, char *argv[]) { if (argc < 2) { printf("usage: %s #port\n", argv[0]); exit(1); } if (signal(SIGINT, sig_handler) == SIG_ERR) { perror("signal sigint error"); exit(1); } signal(SIGPIPE, SIG_IGN); /*步骤1:创建socket(套接字)*/ sockfd = socket(AF_INET, SOCK_STREAM, 0); /*步骤2:将socket和地址(包括ip,port)进行绑定*/ struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(serveraddr)); /*向地址中填入ip,port,internet地址簇类型*/ serveraddr.sin_family = AF_INET; //ipv4 serveraddr.sin_port = htons(atoi(argv[1])); //port serveraddr.sin_addr.s_addr = INADDR_ANY; //接收所有网卡地址 if (bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) < 0) { perror("bind error"); exit(1); } /*步骤3:调用listen函数启动监听(指定port监听) 通知系统去接受来自客户端的连接请求 第二个参数:指定队列的长度*/ if (listen(sockfd, MAXCONNECTIONS) < 0) { perror("listen error"); exit(1); } /*步骤4:调用accept函数从队列中获得一个客户端的连接请求, 并返回新的socket描述符*/ struct sockaddr_in clientaddr; socklen_t clientaddr_len = sizeof(clientaddr); /*设置线程的分离属性*/ pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); while (1) { int fd = accept(sockfd, NULL, NULL); if (fd < 0) { perror("accept error"); continue; } /*步骤5:启动子线程去调用IO函数(read/write)和连接的客户端进行双向通信*/ pthread_t th; int err; /*以分离状态启动子线程*/ if ((err = pthread_create(&th, &attr, th_fn, (void *)fd)) != 0) { perror("pthread create error"); } pthread_attr_destroy(&attr); } return 0; }
msg.h:
#ifndef __MSG_H__ #define __MSG_H__ #include <sys/types.h> typedef struct { //协议头部 char head[13]; char checknum; //校验码 //协议体部 char buff[1024]; //数据 } Msg; /************************************************************************/ /* 发送一个基于自定义协议的message * 发送的数据存放在buff中*/ /************************************************************************/ extern int write_msg(int sockfd, char *buff, size_t len); /************************************************************************/ /* 读取一个基于自定义协议的message * 读取的数据存放在buff中*/ /************************************************************************/ extern int read_msg(int sockfd, char *buff, size_t len); #endif
msg.c:
#include "msg.h" #include <unistd.h> #include <string.h> #include <memory.h> #include <sys/types.h> /*计算校验码*/ static unsigned char msg_check(Msg *message) { //unsigned char s = 0; int s = 0; int i; //printf("message head = %s \n", message->head); //printf("message buff = %s \n", message->buff); for (i = 0; i < sizeof(message->head); i++) { //printf("i1 = %c \n", message->head[i]); s = ((s + message->head[i]) % 255); //printf("i1 checknum = %d \n", s); } for (i = 0; i < sizeof(message->buff); i++) { //printf("i2 = %c \n", message->buff[i]); s = ((s + message->buff[i]) % 255); //printf("i2 checknum = %d \n", s); } //printf("msg_check num is %d \n", s); return s; } /************************************************************************/ /* 发送一个基于自定义协议的message * 发送的数据存放在buff中*/ /************************************************************************/ int write_msg(int sockfd, char *buff, size_t len) { //printf("write_msg1 \n"); Msg message; memset(&message, 0, sizeof(message)); strcpy(message.head, "gmqh_sh_2016"); memcpy(message.buff, buff, len); message.checknum = msg_check(&message); //printf("write_msg2 \n"); if (write(sockfd, &message, sizeof(message)) != sizeof(message)) { return -1; } } /************************************************************************/ /* 读取一个基于自定义协议的message * 读取的数据存放在buff中*/ /************************************************************************/ int read_msg(int sockfd, char *buff, size_t len) { //printf("read_msg1 \n"); Msg message; memset(&message, 0, sizeof(message)); size_t size; //printf("read_msg1-1 \n"); if ((size = read(sockfd, &message, sizeof(message))) < 0) { //printf("read_msg1-2 \n"); return -1; } else if (size == 0) { return 0; } //printf("read_msg2 \n"); //进行校验码验证,判断接收到message是否完整 unsigned char s = msg_check(&message); //printf("read_msg message.head = %s \n", message.head); //printf("read_msg message.checknum = %d \n", message.checknum); //printf("read_msg message.buff = %s \n", message.buff); //printf("read_msg checknum cal = %d \n", s); if ((s == (unsigned char)message.checknum) && (!strcmp("gmqh_sh_2016", message.head))) { //printf("read_msg3 \n"); memcpy(buff, message.buff, len); return sizeof(message); } //printf("read_msg4 \n"); return -1; }
CMakeLists.txt:
cmake_minimum_required(VERSION 2.8) #project name project(multi_thread_socket_demo) set(CMAKE_CXX_COMPILER "g++") #set compiler for c++ language set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x -lrt -lpthread -D_GLIBCXX_USE_NANOSLEEP") #source files(can add source files manually) #set(SOURCE_FILES main.cpp ThostFtdcMdApi.h ThostFtdcTraderApi.h ThostFtdcUserApiDataType.h ThostFtdcUserApiStruct.h MdSpi.cpp MdSpi.h TdSpi.cpp TdSpi.h CTP_Manager.cpp CTP_Manager.h Login.h Order.cpp Order.h User.cpp User.h Utils.cpp Utils.h DBManager.h DBManager.cpp Trader.h Trader.cpp FutureAccount.h FutureAccount.cpp Strategy.h Strategy.cpp Algorithm.h Algorithm.cpp) #source directory(add source files automatically) aux_source_directory(. SOURCE_FILES) find_package(Threads) #set extern libraries #set(LIBRARIES libthostmduserapi.so libthosttraderapi.so libmongoclient.so libboost_thread.so libboost_system.so libboost_regex.so) #add execute file add_executable(multi-thread-socket-server ${SOURCE_FILES}) #add link library target_link_libraries(multi-thread-socket-server ${CMAKE_THREAD_LIBS_INIT})
编译方式:
cmake . make
客户端(基于Python语言):
socket_client.py:
# -*- coding: utf-8 -*- from collections import namedtuple import socket import sys import struct Message = namedtuple("Message", "head checknum buff") # m = Message("gmqh", 0, "hello, world") # 计算校验码 def msg_check(message): #将收到的head以及buff分别累加 % 255 checknum = 0 for i in message.head: # print("i1 = %c \n" % i) checknum = ((checknum + ord(i)) % 255) # print("i1 checknum = %d \n" % checknum) for i in message.buff: # print("i2 = %c \n" %i) checknum = ((checknum + ord(i)) % 255) # print("i2 checknum = %d \n" % checknum) return checknum #发送数据 def write_msg(sockfd, buff): # print("send buff = ", buff) # print("send buff len = ", len(buff)) # print("send buff = ", buff.encode()) # print("send buff len = ", len(buff.encode())) #构造Message m = Message("gmqh_sh_2016", 0, buff) #数据发送前,将校验数据填入Message结构体 checknum = msg_check(m) m = Message("gmqh_sh_2016", checknum, buff) # print("send m.buff = ", m.buff.encode()) # print("send m.checknum = ", m.checknum) #打包数据(13位的head,1位校验码,不定长数据段) data = struct.pack(">13s1B" + str(len(m.buff.encode()) + 1) + "s", m.head.encode(), m.checknum, m.buff.encode()); print("send data = ", data) #发送数据 size = sockfd.send(data) # print(size) return size #读取数据 def read_msg(sockfd): try: #接收数据1038个字节(与服务器端统一:13位head+1位checknum+1024数据段) data = sockfd.recv(1038) except socket.error as e: print(e) #解包数据 head, checknum, buff = struct.unpack(">13s1B"+ str(len(data) - 14) +"s", data) # print(head, checknum, buff, '\n') #将解包的数据封装为Message结构体 m = Message(head.decode().split('\x00')[0], checknum, buff.decode()) tmp_checknum = msg_check(m) m = Message(head.decode().split('\x00')[0], tmp_checknum, buff.decode()) #将收到的标志位与收到数据重新计算的标志位进行对比+head内容对比 if ((m.checknum == checknum) and (m.head == "gmqh_sh_2016")): #打印接收到的数据 print("receive data = ", m.buff) return 1 else: return -1 if __name__ == '__main__': #创建socket套接字 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if s: # 连接服务器: IP,port try: #进行与服务端的连接(ip地址根据实际情况进行更改) s.connect(("192.168.1.12", 8888)) except socket.error as e: print("socket error", e) sys.exit(1) #输入提示符 prompt = b'->' while True: buff = input(prompt) if buff == "": continue #发送数据 if (write_msg(s, buff) < 0): print("write msg error") continue else: #接收数据 if (read_msg(s) < 0): print("read msg error") continue pass s.close() else: print("socket error")
运行效果:
文章的脚注信息由WordPress的wp-posturl插件自动生成
来自外部的引用: 1