pythonでPLCのデータを取得するbyMCプロトコル通信
いつもお世話になっております。
駆け出しエンジニアのチャオです。
本日はキーエンスや三菱のPLCで使用できるMCプロトコル通信(SLMP)のプログラムをpythonで書きたいと思います。
import socket import binascii from contextlib import closing """ ### MCプロトコルについて ### ##################################### 〇フォーマット サブヘッダ 5000 ネットワーク番号00 PC番号 FF 要求ユニット FF03 要求先ユニット 00 要求データ長 XXXX CPU監視タイマ 0000 コマンド 0304#ランダム読出し サブコマンド 0000 ###################################### 〇コマンド一覧 ・一括読み出し ビット:0401 0001 →01040100 ワード:0401 0000 →01040000 ・一括書き込み ビット:1401 0001 →01140100 ワード:1401 0000 →01140000 ・ランダム読出し ワード:0403 0000 →03040000 ・ランダム書き込み ビット:1402 0001 →02140100 ワード:1402 0000 →02140000 ・モニタ登録 :0801 0000 →01080000 ・モニタ読出し :0802 0000 →02080000 ・複数ブロック一括読み出し ワード:0406 0000 →06040000 ・複数ブロック一括書き込み ワード:1406 0000 →06140000 ・バッファメモリ読出し ワード:0613 0000 →13060000 ・バッファメモリ読出し ワード:0613 0000 →13060000 ・インテリユニットバッファメモリ読出し ワード:0601 0000 →01060000 ・インテリユニットバッファメモリ書き込み ワード:0601 0000 →01060000 ・リモートRUN :1001 0000 →01100000 ・リモートSTOP :1002 0000 →02100000 ・CPU形名読み出し :0101 0000 →01010000 ・折り返しテスト :0619 0000 →19060000 ######################################### 〇デバイスコード リレー 9C リンクリレー A0 内部補助リレー 90 ラッチリレー 92 コントロールリレー 91 コントロールメモリ A9 データメモリ A8 拡張データメモリ A8 ファイルレジスタ AF B0 リンクレジスタ B4 タイマ(現在値) C2 タイマ(接点) C1 カウンタ(現在値) C5 カウンタ(接点) C4 ########################################### 〇一括読み出し サブヘッダ 5000 ネットワーク番号 00 PC番号 FF I/O番号 FF03 局番 XXXX 監視タイマ 0000 コマンド 0104 サブコマンド 0000 デバイス番号 XXXXXX(3なら030000) デバイス種類 A8 データ数 XXXX(1点なら0100) 5000,00,FF,FF03,0C00,0000,0104,0000,000000,A8,3200 →DM0からDM49までの一括読み出しを行うコマンド 〇KV-7500の時刻の置き場所 'BC0200A9BD0200A9BE0200A9BF0200A9C00200A9C10200A9' """ class Mc: def __init__(self, ip, port, bit_or_word, device_num, device_code, data_num, word_num='00', double_word_num='00'): self.ip = ip self.port = port self.subhead = '5000' self.network = '00' self.pc_num = 'FF' self.i_o_num = 'FF03' self.office_num = '00' self.monitoring_timer = '0000' self.bit_or_word = bit_or_word self.device_num = device_num self.device_code = device_code self.data_num = data_num self.word_num = word_num self.double_word_num = double_word_num # 一括読み出し # リストとして返す def socket_0401(self): sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) try: with closing(sock): command = '0104' if(self.bit_or_word == 'bit'): sub_command = '0100' else: sub_command = '0000'# ワード(ビットは0100) _device_start_point = hex_change_XXXXXX(self.device_num) _device_code = self.device_code _data_num = hex_change_XXXX(self.data_num) header = self.subhead + self.network + self.pc_num + self.i_o_num + self.office_num time_to_sub_command =self.monitoring_timer + command + sub_command point_to_data_num = _device_start_point + _device_code + _data_num command_protocol = time_to_sub_command + point_to_data_num code_num = hex_change_XXXX(str(round(len(command_protocol)/2))) _send = header + code_num + command_protocol mc_send=binascii.a2b_hex(_send) try: sock.settimeout(1) sock.connect((self.ip,self.port)) except sock.timeout: return except socket.error: return except BaseException as be: print(be) return try: sock.settimeout(1) sock.send(mc_send) except sock.timeout: return except socket.error: return except BaseException as be: print(be) return try: sock.settimeout(1) res_data=sock.recv(4096) except sock.timeout: return except socket.error: return except BaseException as be: print(be) return res_data=binascii.b2a_hex(res_data) res_data=res_data.decode() except BaseException as be: print(be) return error_check(res_data[18:22]) response_data = res_data[22:] L = [response_data[i: i+4] for i in range(0, len(response_data), 4)]#返答数字を四分割にする Z = [L[i:i+2] for i in range(0, len(L), 1)] # 2文字づつチャンク化 Z = [Z[i:i+1] for i in range(0, len(Z), 1)] # 2x2文字単位にブロック化 Z = [z[::2] + z[::2] for z in Z] # ブロック内チャンクをinterleave res_list = [] for i in Z: n = (len(i) + 1)//4 s = ''.join(sum(i[n:],[])) t = s[2:4] + s[0:2] h = int(t,16) res_list.append(h) return res_list def hex_change_XX(num): if(type(num)==str): num_int=int(num) else: num_int=num num=hex(num_int) num=num[2:4] if(num_int<=15): num='0'+num return num def hex_change_XXXX(data): if(type(data)==str): data_int=int(data) else: data_int=data data=hex(data_int) if(data_int<=15): data='0'+data[2:]+'00' if(data_int<=255 and data_int>15): data=data[2:]+'00' if(data_int>255 and data_int<=4095): data=data[3:5]+'0'+data[2:3] if(data_int>4095 and data_int<=65535): data=data[4:6]+data[2:4] return data def hex_change_XXXXXX(data): if(type(data)==str): data_int=int(data) else: data_int=data data=hex(data_int) if(data_int<=15): data='0'+data[2:]+'0000' if(data_int<=255 and data_int>15): data=data[2:]+'0000' if(data_int>255 and data_int<=4095): data=data[3:5]+'0'+data[2:3]+'00' if(data_int>4095 and data_int<=65535): data=data[4:6]+data[2:4]+'00' return data def error_check(error_code): if(error_code=='0000'): return else: print('errorコードのレスポンスがありました。'+error_code) mc_send = Mc(ip = '192.168.0.140', port = 5000, bit_or_word = 'word',# 'word' or 'bit' device_num = 0, device_code = 'A8',# SD = 'A9' data_num = 10) print(mc_send.socket_0401())