pythonでExcel内の文字列をPLCに書込むbyMCプロトコル
いつもお世話になっております。
駆け出しIIoTエンジニアのチャオです。
本日はExcelのセルにある文字列をPLCのデータメモリに書込むプログラムを書きたいと思います。
【注意】
対象PLCはKV-7500を想定しています。
MCプロトコルを利用するので、PLCの方でMCプログラムを用いるための設定が必要な場合があります。
今回の記述では1セル内ずつの対象になります。
Excelのセルの値はPLCへ書き込む際、アスキーコードに変換します。
Excel:セルA1[ C ] ⇒ KV:DM000[0043]
1セル内の文字が2文字以上になった場合、PLCには次のデータメモリにも書き込みます
Excel:セルA1[ CIAO ] ⇒ KV:DM000[4349]DM001[414f]
※キーエンスマニュアル『KV-8000/7000/5000/3000/1000 シリーズ・KV Nano シリーズ 命令語リファレンスマニュアル-付95 ASCIIコード表一覧』参考
書き込む際は工場等で稼働中のPLCではなく、まずテスト機等で試してから行うことを推奨します。
import socket import binascii from contextlib import closing from openpyxl import load_workbook # MCプロトコル用の変換を行う際に用いる関数群 def hex_change_XX(num): if(type(num)==str): num_int=int(num) else: num_int=num num=hex(num_int) num=num[2:4] if(num_int<=15): num='0'+num return num def hex_change_XXXX(data): if(type(data) == str): data_int = int(data) else: data_int = data data = hex(data_int) if(data_int <= 15): data = '0' + data[2:] + '00' if(data_int <= 255 and data_int > 15): data = data[2:]+'00' if(data_int>255 and data_int<=4095): data=data[3:5]+'0'+data[2:3] if(data_int>4095 and data_int<=65535): data=data[4:6]+data[2:4] return data def hex_change_XXXXXX(data): if(type(data)==str): data_int=int(data) else: data_int=data data=hex(data_int) if(data_int<=15): data='0'+data[2:]+'0000' if(data_int<=255 and data_int>15): data=data[2:]+'0000' if(data_int>255 and data_int<=4095): data=data[3:5]+'0'+data[2:3]+'00' if(data_int>4095 and data_int<=65535): data=data[4:6]+data[2:4]+'00' if(data_int > 65535 and data_int <= 1048575): data=data[5:7]+data[3:5]+'0'+data[2:3] if(data_int > 1048575): data=data[6:8] + data[4:6] + data[2:4] return data def error_check(error_code): if(error_code=='0000'): return else: print('errorコードのレスポンスがありました。'+error_code) # MCプロトコルに用いるクラス class Mc: def __init__(self, ip, port, bit_or_word, device_num, device_code, data_num, word_num=0, double_word_num='00', write_list = [], device_list = []): self.ip = ip self.port = port self.subhead = '5000' # 今回は使わない self.network = '00' # 今回は使わない self.pc_num = 'FF' # 今回は使わない self.i_o_num = 'FF03' # 今回は使わない self.office_num = '00' # 今回は使わない self.monitoring_timer = '0000'# 今回は使わない self.bit_or_word = bit_or_word self.bit_or_word = bit_or_word self.device_num = device_num self.device_code = device_code self.data_num = data_num self.word_num = word_num self.double_word_num = double_word_num self.write_list = write_list self.device_list = device_list def excel_write_to_kv(self,load_file,read_sheet): # Excelの値を読み取る book = load_workbook(load_file)#ファイルの読込 active_sheet = book.active#ファイルのアクティブ化 read_word = active_sheet[read_sheet].value#ExcelファイルのA1に対して値を取得 ascii_change = binascii.b2a_hex(read_word.encode('utf-8')) ascii_str = str(ascii_change)[2:]# 文字列に変換し、先頭の'bを取る ascii_str = ascii_str[:-1]# 後尾の'を取る replace等を使うのもあり # 文字列が奇数だった場合は帳尻合わせのために'00'を挿入する if(len(ascii_str) / 2 % 2 == 1): ascii_str = ascii_str[:-2] + '00'+ascii_str[-2:] # 以下変数はアスキーコードを上位2桁と下位2桁反転させるために行う ascii_str_list = [ascii_str[i: i+4] for i in range(0, len(ascii_str), 4)] #返答数字を四分割にする ascii_list = [ascii_str_list[i:i+2] for i in range(0, len(ascii_str_list), 1)] # 2文字づつチャンク化 ascii_list = [ascii_list[i:i+1] for i in range(0, len(ascii_list), 1)] # 2x2文字単位にブロック化 ascii_list = [i[::2] + i[::2] for i in ascii_list] # 上下2位の値を反転させる revers_ascii_list=[] for i in ascii_list: n = (len(i) + 1)//4 s = ''.join(sum(i[n:],[])) t = s[2:4] + s[0:2] h = t revers_ascii_list.append(h) # アスキーコード変換したリストの長さを取得 revers_ascii_len = hex_change_XX(len(revers_ascii_list)) # 出力先のアドレスを指定する device_num = self.device_num#インスタンスで指定した書込み開始アドレス device_code = self.device_code#インスタンスで指定した書込み先デバイスコードDM = A8 device_list = [] for i in range(len(revers_ascii_list)): device_list.append(hex_change_XXXXXX(device_num + i) + device_code) header = '500000FFFF0300' if(self.bit_or_word == 'bit'): bit_or_word = '0100' else: bit_or_word = '0000'# ワード(ビットは0100) command = '04000214' + bit_or_word # ランダム書込みワード word_num = hex_change_XX(len(revers_ascii_list))#ワードの書込み点数 w_word_num = '00' #ダブルワードの書込み点数。今回は使わない設定なので'00' command_protocol = command + word_num + w_word_num for i in range(len(revers_ascii_list)): command_protocol += device_list[i] + revers_ascii_list[i] code_num = hex_change_XXXX(str(round(len(command_protocol)/2))) _send = header + code_num + command_protocol mc_send=binascii.a2b_hex(_send) sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) with closing(sock): try: sock.connect((self.ip,self.port)) sock.send(mc_send) res_data=sock.recv(1024) res_data=binascii.b2a_hex(res_data) res_data=res_data.decode() error_check(res_data[18:22]) except: print('write error') mc_send = Mc(ip = '192.168.0.140', port = 5000, bit_or_word = 'word', # 'word' or 'bit' device_num = 100, # アドレス device_code = 'A8', # デバイスコード data_num = 10) # データ数 今回は使わない # load_fileには本プログラムと同じディレクトリ内のExcelファイルを想定しています。 # r"./07_20/sample.xlsx"でもファイル指定が可能かと思います。 # read_sheetにはload_fileの書き込みたい文字列が入ったセルを指定します。 mc_send.excel_write_to_kv(load_file = 'sample.xlsx',read_sheet='A1')
pythonでPLCのビットをONするbyMCプロトコル通信
いつもお世話になっております。
駆け出しIIoTエンジニアのチャオです。
前回、MCプロトコルでのデータ読出しを行いましたが、
今回はビットに対してON/OFFの書出しを行います。
想定しているPLCはキーエンスのKV-7500です。
import socket import binascii from contextlib import closing """ ### MCプロトコルについて ### ##################################### 〇フォーマット サブヘッダ 5000 ネットワーク番号00 PC番号 FF 要求ユニット FF03 要求先ユニット 00 要求データ長 XXXX CPU監視タイマ 0000 コマンド 0304#ランダム読出し サブコマンド 0000 ###################################### 〇コマンド一覧 ・一括読み出し ビット:0401 0001 →01040100 ワード:0401 0000 →01040000 ・一括書き込み ビット:1401 0001 →01140100 ワード:1401 0000 →01140000 ・ランダム読出し ワード:0403 0000 →03040000 ・ランダム書き込み ビット:1402 0001 →02140100 ワード:1402 0000 →02140000 ・モニタ登録 :0801 0000 →01080000 ・モニタ読出し :0802 0000 →02080000 ・複数ブロック一括読み出し ワード:0406 0000 →06040000 ・複数ブロック一括書き込み ワード:1406 0000 →06140000 ・バッファメモリ読出し ワード:0613 0000 →13060000 ・バッファメモリ読出し ワード:0613 0000 →13060000 ・インテリユニットバッファメモリ読出し ワード:0601 0000 →01060000 ・インテリユニットバッファメモリ書き込み ワード:0601 0000 →01060000 ・リモートRUN :1001 0000 →01100000 ・リモートSTOP :1002 0000 →02100000 ・CPU形名読み出し :0101 0000 →01010000 ・折り返しテスト :0619 0000 →19060000 ######################################### 〇デバイスコード リレー 9C リンクリレー A0 内部補助リレー 90 ラッチリレー 92 コントロールリレー 91 コントロールメモリ A9 データメモリ A8 拡張データメモリ A8 ファイルレジスタ AF B0 リンクレジスタ B4 タイマ(現在値) C2 タイマ(接点) C1 カウンタ(現在値) C5 カウンタ(接点) C4 ########################################### 〇一括書出し サブヘッダ 5000 ネットワーク番号 00 PC番号 FF I/O番号 FF03 局番 00 データ長 XXXX 監視タイマ 0400 コマンド 0114 サブコマンド 0100#ビット対象は0100 デバイス番号 XXXXXX(3なら030000) デバイス種類 90# 内部補助リレー MR デバイス点数 XXXX(1点なら0100) ONOFF制御 10 #デバイス点数が奇数と偶数では多少変わってくる。今回のような奇数の場合はこちらで先頭がON 5000,00,FF,FF03,0D00,0400,0114,0100,010000,90,0100,10 →MR1に対してON制御を行うコマンド """ class Mc: def __init__(self, ip, port, bit_or_word, device_num, device_code, data_num, word_num='00', double_word_num='00', bit_on_off='00'): self.ip = ip self.port = port self.subhead = '5000' self.network = '00' self.pc_num = 'FF' self.i_o_num = 'FF03' self.office_num = '00' self.monitoring_timer = '0400' self.bit_or_word = bit_or_word if(self.bit_or_word == 'bit'): sub_command = '0100' else: sub_command = '0000'# ワード(ビットは0100) self.device_num = device_num self.device_code = device_code self.data_num = data_num self.word_num = word_num self.double_word_num = double_word_num self.bit_on_off = bit_on_off # 一括書出し # レスポンスはエラーコード等 def pin_write(self): header = '500000FFFF0300' command = self.monitoring_timer + '0114' + sub_command #command = self.monitoring_timer + '01140100' #device = self.device_num + self.device_code #インタンスを使う場合はこちらをCIで下行をCO device = '01000090'# MR001指定の場合 #number = self.data_num +self.bit_on_off #インタンスを使う場合はこちらをCIで下行をCO number = '0100'+'10' command_protocol = command + device + number code_num = hex_change_XXXX(str(round(len(command_protocol)/2))) _send = header + code_num + command_protocol print(_send) mc_send=binascii.a2b_hex(_send) sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) with closing(sock): try: sock.connect((self.ip,self.port)) sock.send(mc_send) res_data=sock.recv(4096) res_data=binascii.b2a_hex(res_data) res_data=res_data.decode() error_check(res_data[18:22]) except: print('write error') # ONしたものをOFFする #number = self.data_num +self.bit_on_off #インタンスを使う場合はこちらをCIで下行をCO number = '0100'+'00' command_protocol = command + device + number code_num = hex_change_XXXX(str(round(len(command_protocol)/2))) _send = header + code_num + command_protocol mc_send=binascii.a2b_hex(_send) sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) with closing(sock): try: sock.connect((self.ip,self.port)) sock.send(mc_send) res_data=sock.recv(4096) res_data=binascii.b2a_hex(res_data) res_data=res_data.decode() error_check(res_data[18:22]) except: print('write error') def hex_change_XX(num): if(type(num)==str): num_int=int(num) else: num_int=num num=hex(num_int) num=num[2:4] if(num_int<=15): num='0'+num return num def hex_change_XXXX(data): if(type(data)==str): data_int=int(data) else: data_int=data data=hex(data_int) if(data_int<=15): data='0'+data[2:]+'00'#語尾の00はビット指定の時に用いるためチャネルでは00固定 if(data_int<=255 and data_int>15): data=data[2:]+'00' if(data_int>255 and data_int<=4095): data=data[3:5]+'0'+data[2:3] if(data_int>4095 and data_int<=65535): data=data[4:6]+data[2:4] return data def hex_change_XXXXXX(data): if(type(data)==str): data_int=int(data) else: data_int=data data=hex(data_int) if(data_int<=15): data='0'+data[2:]+'0000'#語尾の00はビット指定の時に用いるためチャネルでは00固定 if(data_int<=255 and data_int>15): data=data[2:]+'0000' if(data_int>255 and data_int<=4095): data=data[3:5]+'0'+data[2:3]+'00' if(data_int>4095 and data_int<=65535): data=data[4:6]+data[2:4]+'00' if(data_int > 65535 and data_int <= 1048575): data=data[5:7]+data[3:5]+'0'+data[2:3] if(data_int > 1048575): data=data[6:8] + data[4:6] + data[2:4] return data def error_check(error_code): if(error_code=='0000'): return else: print('errorコードのレスポンスがありました。'+error_code) mc_send = Mc(ip = '192.168.0.140', port = 5000, bit_or_word = 'bit',# 'word' or 'bit' device_num = 1,# アドレス device_code = '90', data_num = 1 bit_on_off = '10')# 対象数 mc_send.pin_write()# ビットのONとOFF
pythonでPLCのデータを取得するbyMCプロトコル通信
いつもお世話になっております。
駆け出しエンジニアのチャオです。
本日はキーエンスや三菱のPLCで使用できるMCプロトコル通信(SLMP)のプログラムをpythonで書きたいと思います。
import socket import binascii from contextlib import closing """ ### MCプロトコルについて ### ##################################### 〇フォーマット サブヘッダ 5000 ネットワーク番号00 PC番号 FF 要求ユニット FF03 要求先ユニット 00 要求データ長 XXXX CPU監視タイマ 0000 コマンド 0304#ランダム読出し サブコマンド 0000 ###################################### 〇コマンド一覧 ・一括読み出し ビット:0401 0001 →01040100 ワード:0401 0000 →01040000 ・一括書き込み ビット:1401 0001 →01140100 ワード:1401 0000 →01140000 ・ランダム読出し ワード:0403 0000 →03040000 ・ランダム書き込み ビット:1402 0001 →02140100 ワード:1402 0000 →02140000 ・モニタ登録 :0801 0000 →01080000 ・モニタ読出し :0802 0000 →02080000 ・複数ブロック一括読み出し ワード:0406 0000 →06040000 ・複数ブロック一括書き込み ワード:1406 0000 →06140000 ・バッファメモリ読出し ワード:0613 0000 →13060000 ・バッファメモリ読出し ワード:0613 0000 →13060000 ・インテリユニットバッファメモリ読出し ワード:0601 0000 →01060000 ・インテリユニットバッファメモリ書き込み ワード:0601 0000 →01060000 ・リモートRUN :1001 0000 →01100000 ・リモートSTOP :1002 0000 →02100000 ・CPU形名読み出し :0101 0000 →01010000 ・折り返しテスト :0619 0000 →19060000 ######################################### 〇デバイスコード リレー 9C リンクリレー A0 内部補助リレー 90 ラッチリレー 92 コントロールリレー 91 コントロールメモリ A9 データメモリ A8 拡張データメモリ A8 ファイルレジスタ AF B0 リンクレジスタ B4 タイマ(現在値) C2 タイマ(接点) C1 カウンタ(現在値) C5 カウンタ(接点) C4 ########################################### 〇一括読み出し サブヘッダ 5000 ネットワーク番号 00 PC番号 FF I/O番号 FF03 局番 XXXX 監視タイマ 0000 コマンド 0104 サブコマンド 0000 デバイス番号 XXXXXX(3なら030000) デバイス種類 A8 データ数 XXXX(1点なら0100) 5000,00,FF,FF03,0C00,0000,0104,0000,000000,A8,3200 →DM0からDM49までの一括読み出しを行うコマンド 〇KV-7500の時刻の置き場所 'BC0200A9BD0200A9BE0200A9BF0200A9C00200A9C10200A9' """ class Mc: def __init__(self, ip, port, bit_or_word, device_num, device_code, data_num, word_num='00', double_word_num='00'): self.ip = ip self.port = port self.subhead = '5000' self.network = '00' self.pc_num = 'FF' self.i_o_num = 'FF03' self.office_num = '00' self.monitoring_timer = '0000' self.bit_or_word = bit_or_word self.device_num = device_num self.device_code = device_code self.data_num = data_num self.word_num = word_num self.double_word_num = double_word_num # 一括読み出し # リストとして返す def socket_0401(self): sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) try: with closing(sock): command = '0104' if(self.bit_or_word == 'bit'): sub_command = '0100' else: sub_command = '0000'# ワード(ビットは0100) _device_start_point = hex_change_XXXXXX(self.device_num) _device_code = self.device_code _data_num = hex_change_XXXX(self.data_num) header = self.subhead + self.network + self.pc_num + self.i_o_num + self.office_num time_to_sub_command =self.monitoring_timer + command + sub_command point_to_data_num = _device_start_point + _device_code + _data_num command_protocol = time_to_sub_command + point_to_data_num code_num = hex_change_XXXX(str(round(len(command_protocol)/2))) _send = header + code_num + command_protocol mc_send=binascii.a2b_hex(_send) try: sock.settimeout(1) sock.connect((self.ip,self.port)) except sock.timeout: return except socket.error: return except BaseException as be: print(be) return try: sock.settimeout(1) sock.send(mc_send) except sock.timeout: return except socket.error: return except BaseException as be: print(be) return try: sock.settimeout(1) res_data=sock.recv(4096) except sock.timeout: return except socket.error: return except BaseException as be: print(be) return res_data=binascii.b2a_hex(res_data) res_data=res_data.decode() except BaseException as be: print(be) return error_check(res_data[18:22]) response_data = res_data[22:] L = [response_data[i: i+4] for i in range(0, len(response_data), 4)]#返答数字を四分割にする Z = [L[i:i+2] for i in range(0, len(L), 1)] # 2文字づつチャンク化 Z = [Z[i:i+1] for i in range(0, len(Z), 1)] # 2x2文字単位にブロック化 Z = [z[::2] + z[::2] for z in Z] # ブロック内チャンクをinterleave res_list = [] for i in Z: n = (len(i) + 1)//4 s = ''.join(sum(i[n:],[])) t = s[2:4] + s[0:2] h = int(t,16) res_list.append(h) return res_list def hex_change_XX(num): if(type(num)==str): num_int=int(num) else: num_int=num num=hex(num_int) num=num[2:4] if(num_int<=15): num='0'+num return num def hex_change_XXXX(data): if(type(data)==str): data_int=int(data) else: data_int=data data=hex(data_int) if(data_int<=15): data='0'+data[2:]+'00' if(data_int<=255 and data_int>15): data=data[2:]+'00' if(data_int>255 and data_int<=4095): data=data[3:5]+'0'+data[2:3] if(data_int>4095 and data_int<=65535): data=data[4:6]+data[2:4] return data def hex_change_XXXXXX(data): if(type(data)==str): data_int=int(data) else: data_int=data data=hex(data_int) if(data_int<=15): data='0'+data[2:]+'0000' if(data_int<=255 and data_int>15): data=data[2:]+'0000' if(data_int>255 and data_int<=4095): data=data[3:5]+'0'+data[2:3]+'00' if(data_int>4095 and data_int<=65535): data=data[4:6]+data[2:4]+'00' return data def error_check(error_code): if(error_code=='0000'): return else: print('errorコードのレスポンスがありました。'+error_code) mc_send = Mc(ip = '192.168.0.140', port = 5000, bit_or_word = 'word',# 'word' or 'bit' device_num = 0, device_code = 'A8',# SD = 'A9' data_num = 10) print(mc_send.socket_0401())
pythonでPLCのデータを取得するbyFINS通信
いつもお世話になっております。
駆け出しIIoTエンジニアのチャオです。
業務でもpythonを記述することが増えているのですが、オムロンのPLCの内部データを読み取るプログラムを掲示します。
以下、プログラムはFINS通信でデータ連続取得を記述しています。
import socket import binascii from contextlib import closing ## I/Oメモリアドレス # bit b_CIO='30' b_WR='31' b_HR='32' b_AR='33' b_DM='02' b_TK='06' # force bit fb_CIO='70' fb_WR='71' fb_HR='72' # word w_CIO='B0' w_WR='B1' w_HR='B2' w_AR='B3' w_DM='82' # force word fw_CIO='F0' fw_WR='F1' fw_HR='F2' # upflg upflg_TIM='09' upflg_CNT='09' # force upflg f_upflg_TIM='49' f_upflg_CNT='49' # current value cv_TIM='89' cv_CNT='89' cv_IR='DC' cv_DR='BC' # status s_TK='46' class Fins: def __init__(self,ip,port,dna,da1,sna,sa1,data): self.ip=ip self.port=port self.ICF='80' #ほぼ固定 self.RSV='00' #固定 self.GCT='02' #ほぼ固定 self.dna=dna self.da1=da1 self.DA2='00' self.sna=sna self.sa1=sa1 self.SA2='00' self.SID='00' #command_code self.data=data def hex_change(self,num): if(type(num)==str): num_int=int(num) else: num_int=num num=hex(num_int) num=num[2:4] if(num_int<=15): num='0'+num return num def hex_change_points(self,num): if(type(num)==str): num_int=int(num) else: num_int=num num=hex(num_int) if(num_int<=15): num='000'+num[2:] if(num_int<=255 and num_int>15): num='00'+num[2:] if(num_int>255 and num_int<=4095): data='0'+data[2:] if(num_int>4095 and num_int<=65535): num=num[2:] return num def hex_change_data(self,data): if(type(data)==str): data_int=int(data) else: data_int=data data=hex(data_int) if(data_int<=15): data='000'+data[2:]+'00'#語尾の00はビット指定の時に用いるためチャネルでは00固定 if(data_int<=255 and data_int>15): data='00'+data[2:]+'00' if(data_int>255 and data_int<=4095): data='0'+data[2:]+'00' if(data_int>4095 and data_int<=65535): data=data[2:]+'00' return data def error_check(self,error_code): if(error_code=='0000'): return else: print('errorコードのレスポンスがありました。'+error_code) return def socket_0101(self):#連続読取 ## COMMAND FORMAT ## command code = '0101' ## I/Omemory = 'XX' ## read start address = 'XXXXXX' ## read points = 'XXXX' sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) with closing(sock): DNA=self.hex_change(self.dna) DA1=self.hex_change(self.da1) SNA=self.hex_change(self.sna) SA1=self.hex_change(self.sa1) MRC_SRC='0101'#連続読取コマンド times='B3016200B3016100B3016000B3015f00' data=self.data fins_head=self.ICF+self.RSV+self.GCT fins_head=fins_head+DNA+DA1+self.DA2+SNA+SA1+self.SA2 fins_head=fins_head+self.SID+MRC_SRC fins_send=fins_head+times+data fins_send=fins_head+data #print(fins_send) fins_send=binascii.a2b_hex(fins_send) try: sock.settimeout(1) sock.connect((self.ip,self.port)) #print(self.ip,self.port) except socket.timeout: print(1) return except socket.error: print(2) return except BaseException as be: print(be) return try: sock.settimeout(1) sock.send(fins_send) except socket.timeout: print('socket.timeout') return except socket.error: print('socket.error') return except BaseException as be: print(be) return try: sock.settimeout(1) res_data=sock.recv(4096) except socket.timeout: print('socket.timeout') return except socket.error: print('socket.error') return except BaseException as be: print(be) return res_data=binascii.b2a_hex(res_data) res_data=res_data.decode() #print(res_data) self.error_check(res_data[24:28]) res_data=res_data[28:] res_data = [res_data[i: i+4] for i in range(0, len(res_data), 4)]#返答数字を4分割にする #print(res_data) res_data_list=[] res_data_list+=[int(res_data,16) for res_data in res_data] return res_data_list