chao_demiglaceのブログ

工場でPLCに対してのデータ収集等を行っています。

pythonでPLCのデータを取得するbyFINS通信

いつもお世話になっております。
駆け出しIIoTエンジニアのチャオです。

業務でもpythonを記述することが増えているのですが、オムロンのPLCの内部データを読み取るプログラムを掲示します。
以下、プログラムはFINS通信でデータ連続取得を記述しています。

import socket
import binascii
from contextlib import closing

## I/Oメモリアドレス
# bit
b_CIO='30'
b_WR='31'
b_HR='32'
b_AR='33'
b_DM='02'
b_TK='06'
# force bit
fb_CIO='70'
fb_WR='71'
fb_HR='72'
# word
w_CIO='B0'
w_WR='B1'
w_HR='B2'
w_AR='B3'
w_DM='82'
# force word
fw_CIO='F0'
fw_WR='F1'
fw_HR='F2'
# upflg
upflg_TIM='09'
upflg_CNT='09'
# force upflg
f_upflg_TIM='49'
f_upflg_CNT='49'
# current value
cv_TIM='89'
cv_CNT='89'
cv_IR='DC'
cv_DR='BC'
# status
s_TK='46'


class Fins:
    def __init__(self,ip,port,dna,da1,sna,sa1,data):
        self.ip=ip
        self.port=port
        self.ICF='80'    #ほぼ固定
        self.RSV='00'    #固定
        self.GCT='02'    #ほぼ固定
        self.dna=dna
        self.da1=da1
        self.DA2='00'
        self.sna=sna
        self.sa1=sa1
        self.SA2='00'
        self.SID='00'
        #command_code
        self.data=data

    def hex_change(self,num):
        if(type(num)==str):
            num_int=int(num)
        else:
            num_int=num
        num=hex(num_int)
        num=num[2:4]
        if(num_int<=15):
            num='0'+num
        return num
    def hex_change_points(self,num):
        if(type(num)==str):
            num_int=int(num)
        else:
            num_int=num
        num=hex(num_int)
        if(num_int<=15):
            num='000'+num[2:]
        if(num_int<=255 and num_int>15):
            num='00'+num[2:]
        if(num_int>255 and num_int<=4095):
            data='0'+data[2:]
        if(num_int>4095 and num_int<=65535):
            num=num[2:]
        return num
    def hex_change_data(self,data):
        if(type(data)==str):
            data_int=int(data)
        else:
            data_int=data
        data=hex(data_int)
        if(data_int<=15):
            data='000'+data[2:]+'00'#語尾の00はビット指定の時に用いるためチャネルでは00固定
        if(data_int<=255 and data_int>15):
            data='00'+data[2:]+'00'
        if(data_int>255 and data_int<=4095):
            data='0'+data[2:]+'00'
        if(data_int>4095 and data_int<=65535):
            data=data[2:]+'00'
        return data
        
    def error_check(self,error_code):
        if(error_code=='0000'):
            return
        else:
            print('errorコードのレスポンスがありました。'+error_code)
        return

    def socket_0101(self):#連続読取
        ## COMMAND FORMAT
        ## command code = '0101'
        ## I/Omemory = 'XX'
        ## read start address = 'XXXXXX'
        ## read points = 'XXXX'
        sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        with closing(sock):
            DNA=self.hex_change(self.dna)
            DA1=self.hex_change(self.da1)
            SNA=self.hex_change(self.sna)
            SA1=self.hex_change(self.sa1)
            MRC_SRC='0101'#連続読取コマンド
            times='B3016200B3016100B3016000B3015f00'
            data=self.data
            fins_head=self.ICF+self.RSV+self.GCT
            fins_head=fins_head+DNA+DA1+self.DA2+SNA+SA1+self.SA2
            fins_head=fins_head+self.SID+MRC_SRC
            fins_send=fins_head+times+data
            fins_send=fins_head+data
            #print(fins_send)
            fins_send=binascii.a2b_hex(fins_send)
            try:
                sock.settimeout(1)
                sock.connect((self.ip,self.port))
                #print(self.ip,self.port)
            except socket.timeout:
                print(1)
                return
            except socket.error:
                print(2)
                return
            except BaseException as be:
                print(be)
                return
            try:
                sock.settimeout(1)
                sock.send(fins_send)
            except socket.timeout:
                print('socket.timeout')
                return
            except socket.error:
                print('socket.error')
                return
            except BaseException as be:
                print(be)
                return
            try:
                sock.settimeout(1)
                res_data=sock.recv(4096)
            except socket.timeout:
                print('socket.timeout')
                return
            except socket.error:
                print('socket.error')
                return
            except BaseException as be:
                print(be)
                return
            res_data=binascii.b2a_hex(res_data)
            res_data=res_data.decode()
            #print(res_data)
            self.error_check(res_data[24:28])
            res_data=res_data[28:]
            res_data = [res_data[i: i+4] for i in range(0, len(res_data), 4)]#返答数字を4分割にする
            #print(res_data)
            res_data_list=[]
            res_data_list+=[int(res_data,16) for res_data in res_data]
            return res_data_list