最近, cloudnetという学会に論文を投稿したのだが, そこでデータを集める際に制作したシミュレータがいい感じにできたのでそれについて書いてみる。
https://github.com/adshidtadka/server-allocation
Parameter
クラス
まず良かった点としてParameter
クラスを作ったことである。
import numpy as np
import pandas as pd
import itertools
import sys
import Constant
class Parameter:
USER_NUM_CONST = 500
SERVER_NUM_CONST = 10
CAPACITY_CONST = 50
def __init__(self, seed):
np.random.seed(seed)
self.USER_NUM = 500
self.SERVER_NUM = 10
self.DELAY_MAX = 10
self.DELAY_SERVER = 1
self.CAPACITY = 50
def create_input(self):
# inputs
self.e_u = list(itertools.product(range(self.USER_NUM), range(self.SERVER_NUM)))
self.e_s = list(itertools.combinations(list(range(0, self.SERVER_NUM)), 2))
self.d_us = np.random.randint(1, self.DELAY_MAX, (self.USER_NUM, self.SERVER_NUM))
self.m_s = np.full(self.SERVER_NUM, self.CAPACITY)
def set_param(self, var_name, consts, var):
if var_name == 'user':
self.USER_NUM = var
self.SERVER_NUM = consts['server']
self.CAPACITY = consts['capacity']
elif var_name == 'server':
self.USER_NUM = consts['user']
self.SERVER_NUM = var
self.CAPACITY = consts['capacity']
elif var_name == 'capacity':
self.USER_NUM = consts['user']
self.SERVER_NUM = consts['server']
self.CAPACITY = var
else:
sys.exit('invalid var_name = ' + str(var_name))
def get_const(var_name):
if var_name == 'user':
return Parameter.USER_NUM_CONST
elif var_name == 'server':
return Parameter.SERVER_NUM_CONST
elif var_name == 'capacity':
return Parameter.CAPACITY_CONST
else:
sys.exit('invalid var_name = ' + str(var_name))
これを作ることで1つのインスタンスが1つのパラメータに対応することになるので, 全部グローバル変数で書いていた前回のシミュレータよりめちゃめちゃわかりやすくなった。また, インスタンスメソッドを呼び出してパラ調整がしやすく, メソッドとしてパッケージ化することでエラーを極力避けれるようになった。
Ilp
クラスとMmd
クラス
class Ilp:
def __init__(self, param):
self.create_dataframe(param)
self.set_input()
def set_input(self):
# optimization problem
problem = LpProblem()
# decision variables
self.df_e_u['variable'] = [LpVariable('x_us%d' % i, cat=LpBinary) for i in self.df_e_u.index]
self.df_e_s['variable'] = [LpVariable('x_st%d' % i, cat=LpBinary) for i in self.df_e_s.index]
self.df_v_s['variable'] = [LpVariable('y%d' % i, cat=LpBinary) for i in self.df_v_s.index]
self.D_u = LpVariable('D_u', cat=LpInteger)
self.D_s = LpVariable('D_s', cat=LpInteger)
# objective function
problem += 2 * self.D_u + self.D_s
self.problem = problem
def create_dataframe(self, param):
# dataframe for E_U
df_e_u = pd.DataFrame([(i, j) for i, j in param.e_u], columns=['user', 'server'])
df_e_u['delay'] = param.d_us.flatten()
self.df_e_u = df_e_u
# dataframe for E_S
df_e_s = pd.DataFrame([(i, j) for i, j in param.e_s], columns=['server_1', 'server_2'])
df_e_s['delay'] = param.DELAY_SERVER
self.df_e_s = df_e_s
# dataframe for V_S
df_v_s = pd.DataFrame(list(range(0, param.SERVER_NUM)), columns=['server'])
df_v_s['capacity'] = param.m_s
self.df_v_s = df_v_s
def solve_by_ilp(self, solver=None):
t_0 = time.perf_counter()
# solve
try:
# constraints
self.problem = self.create_constraints(self.problem)
if solver == 'cplex':
self.problem.solve(GLPK_CMD(msg=0))
else:
self.problem.solve(CPLEX_CMD(msg=0))
except PulpSolverError:
print(CPLEX_CMD().path, 'is not installed')
t_1 = time.perf_counter()
return t_1 - t_0
def print_result(self):
if self.problem.status == 1:
print('objective function is ', value(self.problem.objective))
print('cpu time is ' + str(self.cpu_time) + ' sec')
else:
print('status code is ', self.problem.status)
def create_constraints(self, m):
# constraints
# (1b)
for k, v in self.df_e_u.groupby('user'):
m += lpSum(v.variable) == 1
# (1c)
for k, v in self.df_e_u.groupby('server'):
m += lpSum(v.variable) <= self.df_v_s['capacity'][k]
# (1d)
for k, v in self.df_e_u.iterrows():
m += v.variable * v.delay <= self.D_u
# (1e)
for k, v in self.df_e_s.iterrows():
m += v.variable * v.delay <= self.D_s
# (1f)
for k, v in self.df_e_u.groupby('user'):
for l, w in self.df_v_s.iterrows():
m += w.variable >= v.variable
# (1g)
for k, v in self.df_e_s.iterrows():
m += self.df_v_s.iloc[v.server_1].variable + \
self.df_v_s.iloc[v.server_2].variable - 1 <= v.variable
# (1h)
for k, v in self.df_e_s.iterrows():
m += v.variable <= self.df_v_s.iloc[v.server_1].variable
# (1i)
for k, v in self.df_e_s.iterrows():
m += v.variable <= self.df_v_s.iloc[v.server_2].variable
return m
class Mmd:
def __init__(self, param):
self.set_input(param)
def set_input(self, param):
# edges list
edges = np.empty(3, dtype=int)
for k, v in enumerate(param.d_us):
for i, j in enumerate(v):
edges = np.vstack((edges, np.array([k, i, j])))
self.edges = edges
def start_algorithm(self, param):
t_0 = time.perf_counter()
L_1 = self.one_server_case(param)
L_2 = self.multiple_server_case(param)
D_u = min([L_1, L_2])
if D_u > param.DELAY_MAX:
self.status = False
else:
self.status = True
self.objective_function = D_u * 2 + param.DELAY_SERVER
t_1 = time.perf_counter()
return t_1 - t_0
def one_server_case(self, param):
# step 1: consider one server case
# allocate all user and get L_1
dic_l = dict()
for k, v in enumerate(param.m_s):
if v >= param.USER_NUM:
D_u = param.d_us[:, k].max()
dic_l[k] = D_u
# search minimum D_u
if bool(dic_l):
return min(dic_l.values())
else:
return Constant.INF
def multiple_server_case(self, param):
# step 2: consider multiple server case
# initialize the bipartite graph
added_server = param.SERVER_NUM
for k, v in enumerate(param.m_s):
for j in range(param.USER_NUM):
delay = param.d_us[j][k]
for i in range(added_server, added_server + v - 1):
self.edges = np.vstack((self.edges, np.array([j, i, delay])))
added_server += v - 1
param.COPY_SERVER_NUM = added_server
# search matching
for i in range(1, param.DELAY_MAX):
hc = HopcroftKarp(param.USER_NUM, param.COPY_SERVER_NUM)
for j in np.where(self.edges[:, -1] <= i)[0]:
hc.add_edge(self.edges[j][0], self.edges[j][1])
if hc.flow() == param.USER_NUM:
return i
return Constant.INF
def print_result(self):
if self.status:
print('objective function is ', str(self.objective_function))
print('cpu time is ' + str(self.cpu_time) + ' sec')
else:
print('Error')
inputを入れればoutputが出るツールとして2つのクラスを作った。これにさっき定義したParameterをそのままいれればいいので, ブラックボックスとして扱える。set_input
なんかのメソッドを共通化してもよかった。また Ilp
ではPulpを使ってソルバを簡単に切り替えられるようにした。
Result
クラス
import time
import csv
import os
import slackweb
import Constant
from Parameter import Parameter
from Mmd import Mmd
class Result:
def __init__(self, var_name):
self.var_name = var_name
self.const_names = ['user', 'server', 'capacity']
self.is_execute = self.is_execute()
if self.is_execute:
self.var_range = Result.set_range(Constant.get_range(var_name))
self.consts = self.set_consts()
def is_execute(self):
print("Do you execute " + self.var_name + " simulator? [y/N]", end=' > ')
if input() == 'y':
return True
else:
return False
def set_range(var_range_def):
print("Please set range [start stop step]", end=' > ')
try:
x, y, z = map(int, input().split())
return range(x, y, z)
except:
print(str(var_range_def) + ' set.')
return var_range_def
def set_consts(self):
self.const_names.remove(self.var_name)
consts = dict()
for const_name in self.const_names:
print("Please set " + const_name + ".", end=' > ')
try:
consts[const_name] = int(input())
except:
f = Parameter.get_const(const_name)
print(str(f) + ' set.')
consts[const_name] = f
return consts
def rotate_file_name(file_name):
file_index = 1
while os.path.exists(file_name + '_' + str(file_index) + '.csv'):
file_index += 1
return file_name + '_' + str(file_index) + '.csv'
def get_result(self):
message = '\nGet result for {' + self.var_name + ': ' + str(self.var_range) + '} with ' + str(self.consts)
Result.post_to_slack(message)
file_name = Result.rotate_file_name('../result/' + self.var_name + str(self.consts))
for var in self.var_range:
average_result = self.get_average(var)
Result.post_to_slack(str(average_result) + ' for {' + self.var_name + ': ' + str(var) + '} and ' + str(self.consts))
f = open(file_name, 'a')
f.write(str(var) + ',' + str(average_result) + '\n')
f.close()
def get_average(self, var):
iterated_result = []
for i in range(Constant.ITERATION_NUM):
# create param
param = Parameter(Constant.SEED + i)
param.set_param(self.var_name, self.consts, var)
param.create_input()
# solve by algorithm
mmd = Mmd(param)
cpu_time_mmd = mmd.start_algorithm(param)
iterated_result.append(cpu_time_mmd)
return sum(iterated_result) / len(iterated_result)
def post_to_slack(text):
print(text)
slack = slackweb.Slack(url="xxxx")
slack.notify(text=text)
if not os.path.exists('../result'):
os.mkdir('../result')
result_user = Result('user')
result_server = Result('server')
result_capacity = Result('capacity')
if result_user.is_execute:
result_user.get_result()
if result_server.is_execute:
result_server.get_result()
if result_capacity.is_execute:
result_capacity.get_result()
コマンドラインでインタラクティブにパラメータを設定できるようにした。これはサーバーにあるファイルを書き換えてバグを引き起こす可能性が大幅に下がるので絶対にやるべきだと思った。また, Slackに通知を送ることで無駄なシミュレーションを回さずにすんだ。
まとめ
時間はかかるがほとんどバグなく実装できたのでこれからも堅牢なシミュレーター開発を心がけたい。