最近, cloudnetという学会に論文を投稿したのだが, そこでデータを集める際に制作したシミュレータがいい感じにできたのでそれについて書いてみる。

https://github.com/adshidtadka/server-allocation

Parameterクラス

まず良かった点としてParameter クラスを作ったことである。


import numpy as np
import pandas as pd
import itertools
import sys

import Constant


class Parameter:

    USER_NUM_CONST = 500
    SERVER_NUM_CONST = 10
    CAPACITY_CONST = 50

    def __init__(self, seed):
        np.random.seed(seed)
        self.USER_NUM = 500
        self.SERVER_NUM = 10
        self.DELAY_MAX = 10
        self.DELAY_SERVER = 1
        self.CAPACITY = 50

    def create_input(self):
        # inputs
        self.e_u = list(itertools.product(range(self.USER_NUM), range(self.SERVER_NUM)))
        self.e_s = list(itertools.combinations(list(range(0, self.SERVER_NUM)), 2))
        self.d_us = np.random.randint(1, self.DELAY_MAX, (self.USER_NUM, self.SERVER_NUM))
        self.m_s = np.full(self.SERVER_NUM, self.CAPACITY)

    def set_param(self, var_name, consts, var):
        if var_name == 'user':
            self.USER_NUM = var
            self.SERVER_NUM = consts['server']
            self.CAPACITY = consts['capacity']
        elif var_name == 'server':
            self.USER_NUM = consts['user']
            self.SERVER_NUM = var
            self.CAPACITY = consts['capacity']
        elif var_name == 'capacity':
            self.USER_NUM = consts['user']
            self.SERVER_NUM = consts['server']
            self.CAPACITY = var
        else:
            sys.exit('invalid var_name = ' + str(var_name))

    def get_const(var_name):
        if var_name == 'user':
            return Parameter.USER_NUM_CONST
        elif var_name == 'server':
            return Parameter.SERVER_NUM_CONST
        elif var_name == 'capacity':
            return Parameter.CAPACITY_CONST
        else:
            sys.exit('invalid var_name = ' + str(var_name))

これを作ることで1つのインスタンスが1つのパラメータに対応することになるので, 全部グローバル変数で書いていた前回のシミュレータよりめちゃめちゃわかりやすくなった。また, インスタンスメソッドを呼び出してパラ調整がしやすく, メソッドとしてパッケージ化することでエラーを極力避けれるようになった。

IlpクラスとMmdクラス


class Ilp:
    def __init__(self, param):
        self.create_dataframe(param)
        self.set_input()

    def set_input(self):
        # optimization problem
        problem = LpProblem()

        # decision variables
        self.df_e_u['variable'] = [LpVariable('x_us%d' % i, cat=LpBinary) for i in self.df_e_u.index]
        self.df_e_s['variable'] = [LpVariable('x_st%d' % i, cat=LpBinary) for i in self.df_e_s.index]
        self.df_v_s['variable'] = [LpVariable('y%d' % i, cat=LpBinary) for i in self.df_v_s.index]
        self.D_u = LpVariable('D_u', cat=LpInteger)
        self.D_s = LpVariable('D_s', cat=LpInteger)

        # objective function
        problem += 2 * self.D_u + self.D_s

        self.problem = problem

    def create_dataframe(self, param):
        # dataframe for E_U
        df_e_u = pd.DataFrame([(i, j) for i, j in param.e_u], columns=['user', 'server'])
        df_e_u['delay'] = param.d_us.flatten()
        self.df_e_u = df_e_u

        # dataframe for E_S
        df_e_s = pd.DataFrame([(i, j) for i, j in param.e_s], columns=['server_1', 'server_2'])
        df_e_s['delay'] = param.DELAY_SERVER
        self.df_e_s = df_e_s

        # dataframe for V_S
        df_v_s = pd.DataFrame(list(range(0, param.SERVER_NUM)), columns=['server'])
        df_v_s['capacity'] = param.m_s
        self.df_v_s = df_v_s

    def solve_by_ilp(self, solver=None):
        t_0 = time.perf_counter()
        # solve
        try:
            # constraints
            self.problem = self.create_constraints(self.problem)

            if solver == 'cplex':
                self.problem.solve(GLPK_CMD(msg=0))
            else:
                self.problem.solve(CPLEX_CMD(msg=0))

        except PulpSolverError:
            print(CPLEX_CMD().path, 'is not installed')

        t_1 = time.perf_counter()
        return t_1 - t_0

    def print_result(self):
        if self.problem.status == 1:
            print('objective function is ', value(self.problem.objective))
            print('cpu time is ' + str(self.cpu_time) + ' sec')
        else:
            print('status code is ', self.problem.status)

    def create_constraints(self, m):
        # constraints
        # (1b)
        for k, v in self.df_e_u.groupby('user'):
            m += lpSum(v.variable) == 1

        # (1c)
        for k, v in self.df_e_u.groupby('server'):
            m += lpSum(v.variable) <= self.df_v_s['capacity'][k]

        # (1d)
        for k, v in self.df_e_u.iterrows():
            m += v.variable * v.delay <= self.D_u

        # (1e)
        for k, v in self.df_e_s.iterrows():
            m += v.variable * v.delay <= self.D_s

        # (1f)
        for k, v in self.df_e_u.groupby('user'):
            for l, w in self.df_v_s.iterrows():
                m += w.variable >= v.variable

        # (1g)
        for k, v in self.df_e_s.iterrows():
            m += self.df_v_s.iloc[v.server_1].variable + \
                self.df_v_s.iloc[v.server_2].variable - 1 <= v.variable

        # (1h)
        for k, v in self.df_e_s.iterrows():
            m += v.variable <= self.df_v_s.iloc[v.server_1].variable

        # (1i)
        for k, v in self.df_e_s.iterrows():
            m += v.variable <= self.df_v_s.iloc[v.server_2].variable

        return m

class Mmd:

    def __init__(self, param):
        self.set_input(param)

    def set_input(self, param):
        # edges list
        edges = np.empty(3, dtype=int)
        for k, v in enumerate(param.d_us):
            for i, j in enumerate(v):
                edges = np.vstack((edges, np.array([k, i, j])))
        self.edges = edges

    def start_algorithm(self, param):
        t_0 = time.perf_counter()
        L_1 = self.one_server_case(param)
        L_2 = self.multiple_server_case(param)
        D_u = min([L_1, L_2])

        if D_u > param.DELAY_MAX:
            self.status = False
        else:
            self.status = True
            self.objective_function = D_u * 2 + param.DELAY_SERVER
        t_1 = time.perf_counter()
        return t_1 - t_0

    def one_server_case(self, param):
        # step 1: consider one server case

        # allocate all user and get L_1
        dic_l = dict()
        for k, v in enumerate(param.m_s):
            if v >= param.USER_NUM:
                D_u = param.d_us[:, k].max()
                dic_l[k] = D_u

        # search minimum D_u
        if bool(dic_l):
            return min(dic_l.values())
        else:
            return Constant.INF

    def multiple_server_case(self, param):
        # step 2: consider multiple server case

        # initialize the bipartite graph
        added_server = param.SERVER_NUM
        for k, v in enumerate(param.m_s):
            for j in range(param.USER_NUM):
                delay = param.d_us[j][k]
                for i in range(added_server, added_server + v - 1):
                    self.edges = np.vstack((self.edges, np.array([j, i, delay])))
                added_server += v - 1
        param.COPY_SERVER_NUM = added_server

        # search matching
        for i in range(1, param.DELAY_MAX):
            hc = HopcroftKarp(param.USER_NUM, param.COPY_SERVER_NUM)
            for j in np.where(self.edges[:, -1] <= i)[0]:
                hc.add_edge(self.edges[j][0], self.edges[j][1])
            if hc.flow() == param.USER_NUM:
                return i

        return Constant.INF

    def print_result(self):
        if self.status:
            print('objective function is ', str(self.objective_function))
            print('cpu time is ' + str(self.cpu_time) + ' sec')
        else:
            print('Error')

inputを入れればoutputが出るツールとして2つのクラスを作った。これにさっき定義したParameterをそのままいれればいいので, ブラックボックスとして扱える。set_inputなんかのメソッドを共通化してもよかった。また Ilp ではPulpを使ってソルバを簡単に切り替えられるようにした。

Result クラス


import time
import csv
import os
import slackweb

import Constant
from Parameter import Parameter
from Mmd import Mmd


class Result:

    def __init__(self, var_name):
        self.var_name = var_name
        self.const_names = ['user', 'server', 'capacity']
        self.is_execute = self.is_execute()
        if self.is_execute:
            self.var_range = Result.set_range(Constant.get_range(var_name))
            self.consts = self.set_consts()

    def is_execute(self):
        print("Do you execute " + self.var_name + " simulator? [y/N]", end=' > ')
        if input() == 'y':
            return True
        else:
            return False

    def set_range(var_range_def):
        print("Please set range [start stop step]", end=' > ')
        try:
            x, y, z = map(int, input().split())
            return range(x, y, z)
        except:
            print(str(var_range_def) + ' set.')
            return var_range_def

    def set_consts(self):
        self.const_names.remove(self.var_name)
        consts = dict()
        for const_name in self.const_names:
            print("Please set " + const_name + ".", end=' > ')
            try:
                consts[const_name] = int(input())
            except:
                f = Parameter.get_const(const_name)
                print(str(f) + ' set.')
                consts[const_name] = f
        return consts

    def rotate_file_name(file_name):
        file_index = 1
        while os.path.exists(file_name + '_' + str(file_index) + '.csv'):
            file_index += 1
        return file_name + '_' + str(file_index) + '.csv'

    def get_result(self):
        message = '\nGet result for {' + self.var_name + ': ' + str(self.var_range) + '} with ' + str(self.consts)
        Result.post_to_slack(message)
        file_name = Result.rotate_file_name('../result/' + self.var_name + str(self.consts))

        for var in self.var_range:
            average_result = self.get_average(var)
            Result.post_to_slack(str(average_result) + ' for {' + self.var_name + ': ' + str(var) + '} and ' + str(self.consts))
            f = open(file_name, 'a')
            f.write(str(var) + ',' + str(average_result) + '\n')
            f.close()

    def get_average(self, var):
        iterated_result = []
        for i in range(Constant.ITERATION_NUM):
            # create param
            param = Parameter(Constant.SEED + i)
            param.set_param(self.var_name, self.consts, var)
            param.create_input()

            # solve by algorithm
            mmd = Mmd(param)
            cpu_time_mmd = mmd.start_algorithm(param)
            iterated_result.append(cpu_time_mmd)
        return sum(iterated_result) / len(iterated_result)

    def post_to_slack(text):
        print(text)
        slack = slackweb.Slack(url="xxxx")
        slack.notify(text=text)


if not os.path.exists('../result'):
    os.mkdir('../result')

result_user = Result('user')
result_server = Result('server')
result_capacity = Result('capacity')

if result_user.is_execute:
    result_user.get_result()

if result_server.is_execute:
    result_server.get_result()

if result_capacity.is_execute:
    result_capacity.get_result()

コマンドラインでインタラクティブにパラメータを設定できるようにした。これはサーバーにあるファイルを書き換えてバグを引き起こす可能性が大幅に下がるので絶対にやるべきだと思った。また, Slackに通知を送ることで無駄なシミュレーションを回さずにすんだ。

まとめ

時間はかかるがほとんどバグなく実装できたのでこれからも堅牢なシミュレーター開発を心がけたい。