pythonでPLCのデータを取得するbyFINS通信
いつもお世話になっております。
駆け出しIIoTエンジニアのチャオです。
業務でもpythonを記述することが増えているのですが、オムロンのPLCの内部データを読み取るプログラムを掲示します。
以下、プログラムはFINS通信でデータ連続取得を記述しています。
import socket import binascii from contextlib import closing ## I/Oメモリアドレス # bit b_CIO='30' b_WR='31' b_HR='32' b_AR='33' b_DM='02' b_TK='06' # force bit fb_CIO='70' fb_WR='71' fb_HR='72' # word w_CIO='B0' w_WR='B1' w_HR='B2' w_AR='B3' w_DM='82' # force word fw_CIO='F0' fw_WR='F1' fw_HR='F2' # upflg upflg_TIM='09' upflg_CNT='09' # force upflg f_upflg_TIM='49' f_upflg_CNT='49' # current value cv_TIM='89' cv_CNT='89' cv_IR='DC' cv_DR='BC' # status s_TK='46' class Fins: def __init__(self,ip,port,dna,da1,sna,sa1,data): self.ip=ip self.port=port self.ICF='80' #ほぼ固定 self.RSV='00' #固定 self.GCT='02' #ほぼ固定 self.dna=dna self.da1=da1 self.DA2='00' self.sna=sna self.sa1=sa1 self.SA2='00' self.SID='00' #command_code self.data=data def hex_change(self,num): if(type(num)==str): num_int=int(num) else: num_int=num num=hex(num_int) num=num[2:4] if(num_int<=15): num='0'+num return num def hex_change_points(self,num): if(type(num)==str): num_int=int(num) else: num_int=num num=hex(num_int) if(num_int<=15): num='000'+num[2:] if(num_int<=255 and num_int>15): num='00'+num[2:] if(num_int>255 and num_int<=4095): data='0'+data[2:] if(num_int>4095 and num_int<=65535): num=num[2:] return num def hex_change_data(self,data): if(type(data)==str): data_int=int(data) else: data_int=data data=hex(data_int) if(data_int<=15): data='000'+data[2:]+'00'#語尾の00はビット指定の時に用いるためチャネルでは00固定 if(data_int<=255 and data_int>15): data='00'+data[2:]+'00' if(data_int>255 and data_int<=4095): data='0'+data[2:]+'00' if(data_int>4095 and data_int<=65535): data=data[2:]+'00' return data def error_check(self,error_code): if(error_code=='0000'): return else: print('errorコードのレスポンスがありました。'+error_code) return def socket_0101(self):#連続読取 ## COMMAND FORMAT ## command code = '0101' ## I/Omemory = 'XX' ## read start address = 'XXXXXX' ## read points = 'XXXX' sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) with closing(sock): DNA=self.hex_change(self.dna) DA1=self.hex_change(self.da1) SNA=self.hex_change(self.sna) SA1=self.hex_change(self.sa1) MRC_SRC='0101'#連続読取コマンド times='B3016200B3016100B3016000B3015f00' data=self.data fins_head=self.ICF+self.RSV+self.GCT fins_head=fins_head+DNA+DA1+self.DA2+SNA+SA1+self.SA2 fins_head=fins_head+self.SID+MRC_SRC fins_send=fins_head+times+data fins_send=fins_head+data #print(fins_send) fins_send=binascii.a2b_hex(fins_send) try: sock.settimeout(1) sock.connect((self.ip,self.port)) #print(self.ip,self.port) except socket.timeout: print(1) return except socket.error: print(2) return except BaseException as be: print(be) return try: sock.settimeout(1) sock.send(fins_send) except socket.timeout: print('socket.timeout') return except socket.error: print('socket.error') return except BaseException as be: print(be) return try: sock.settimeout(1) res_data=sock.recv(4096) except socket.timeout: print('socket.timeout') return except socket.error: print('socket.error') return except BaseException as be: print(be) return res_data=binascii.b2a_hex(res_data) res_data=res_data.decode() #print(res_data) self.error_check(res_data[24:28]) res_data=res_data[28:] res_data = [res_data[i: i+4] for i in range(0, len(res_data), 4)]#返答数字を4分割にする #print(res_data) res_data_list=[] res_data_list+=[int(res_data,16) for res_data in res_data] return res_data_list