Refactoring process tree.

master
Alexander Markevych 2012-07-24 16:58:42 +03:00
parent fdc5ae9480
commit d9b32e67d8
14 changed files with 731 additions and 640 deletions

View File

@ -1,11 +1,17 @@
{application,erlmines,
[{description,[]},
{vsn,"0.0.1"},
{registered,[erlmines_app,erlmines_sup,erlmines]},
{registered,[erlmines_app,erlmines_sup,connection_sup,connection_disp]},
{applications,[kernel,stdlib]},
{mod,{erlmines_app,[]}},
{env,[]},
{modules,[clientserver,connection,connection_channels,
connection_incoming_split_buffer,connection_peers,
connection_reliable_packet_buffer,erlmines,
erlmines_app,erlmines_sup]}]}.
{modules,[erlmines_app,
erlmines_sup,
erlmines,
connection_sup,
connection_disp,
peer_channels,
peer_packet_buffers,
peer_mgr,
peer_reliable_packet_buffer,
clientserver]}]}.

View File

@ -1,3 +1,4 @@
#!/bin/bash
#erl -noshell -pa ./ebin -eval 'application:start(erlmines)'
erl -smp auto -pa ./ebin -eval 'application:start(erlmines)'

View File

@ -9,6 +9,26 @@
-define(U16, 16/unsigned-little-integer).
-define(U32, 32/unsigned-little-integer).
%% @type udp_server_option() =
%% {option(), port(), max_restarts(), time(), shutdown(),recv_length(), recv_timeout()}.
%% A data structure holding the options.
%% @type option() = [term()].
%% @type port() = integer().
%% @type max_restarts() = integer().
%% @type time() = integer().
%% @type shutdown() = integer().
%% @type recv_length() = integer().
%% @type recv_timeout() = integer() | infinity.
-record(udp_server_option, {
option = [binary],
port = 4000,
max_restarts = 3,
time = 60,
shutdown = 2000,
recv_length = 0,
recv_timeout = infinity
}).
-record(buffered_packet, {
data, % Data of the packet, including headers
time, % Seconds from buffering the packet or re-sending

View File

@ -1,93 +0,0 @@
%%
%% erlmines - The minetest server written in Erlang
%% Copyright (C) 2012 hummermania, Markevych Alexander <rabota.pmr@gmail.com>
%%
-module(connection).
-include("connection.hrl").
-export([readPeerId/1,readChannel/1]).
-export([makePacket/5,
makeOriginalPacket/1,
makeSplitPacket/3,
makeAutoSplitPacket/3,
makeReliablePacket/2,
test/0]).
ceiling(X) ->
T = trunc(X),
case X - T == 0 of
true -> T;
false -> T + 1
end.
readPeerId(PacketData) ->
<<_P:?U32,SenderPeerId:?U16,_Other/binary>> = PacketData,
SenderPeerId.
readChannel(PacketData) ->
<<_P:?U32,_S:?U16,Channel:?U8,_Other/binary>> = PacketData,
Channel.
% This adds the base headers to the data and makes a packet out of it
makePacket(Address, Data, ProtocolId,SenderPeerId,Channel)->
#buffered_packet{
address=Address,
data = <<ProtocolId:?U32,SenderPeerId:?U16,Channel:?U8,Data/binary>>
}.
% Add the TYPE_ORIGINAL header to the data
makeOriginalPacket(Data) ->
#original_packet{ data = <<?TYPE_ORIGINAL:?U8,Data/binary>> }.
% Split data in chunks and add TYPE_SPLIT headers to them
makeSplitPacket(Data,ChunkSizeMax,SeqNum) ->
ChunkHeaderSize = 7,
MaxDataSize = ChunkSizeMax - ChunkHeaderSize,
ChunkCount=ceiling(byte_size(Data)/MaxDataSize),
Chunks = makeSplitPacketList(MaxDataSize, SeqNum, ChunkCount, 0, Data),
Chunks.
makeSplitPacketList(_MaxDataSize, _SeqNum, ChunkCount, ChunkCount, _Data) ->[];
makeSplitPacketList(MaxDataSize, SeqNum, ChunkCount, ChunkNum, Data) ->
DataSize = byte_size(Data),
ChunkDataSize = case MaxDataSize - DataSize < 0 of
true -> MaxDataSize;
false -> DataSize
end,
<<ChunkData:ChunkDataSize/binary,Tile/binary>> = Data,
%%ChunkNum1 = ChunkNum+1,
[<<?TYPE_SPLIT:?U8, SeqNum:?U16, ChunkCount:?U16, ChunkNum:?U16,ChunkData/binary>>|makeSplitPacketList(MaxDataSize,SeqNum,ChunkCount,ChunkNum+1,Tile)].
test() ->
makeSplitPacket(list_to_binary([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22]),9,65500).
% Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
% Increments split_seqnum if a split packet is made
makeAutoSplitPacket(_Data,_ChunkSizeMax,_SplitSeqNum) ->
todo.
% core::list<SharedBuffer<u8> > makeAutoSplitPacket(
% SharedBuffer<u8> data,
% u32 chunksize_max,
% u16 &split_seqnum)
%{
% u32 original_header_size = 1;
% core::list<SharedBuffer<u8> > list;
% if(data.getSize() + original_header_size > chunksize_max)
% {
% list = makeSplitPacket(data, chunksize_max, split_seqnum);
% split_seqnum++;
% return list;
% }
% else
% {
% list.push_back(makeOriginalPacket(data));
% }
% return list;
%}
%
% Add the TYPE_RELIABLE header to the data
makeReliablePacket(Data, SeqNum) ->
#reliable_packet{ data = <<?TYPE_RELIABLE:?U8,SeqNum:?U16,Data/binary>> }.

View File

@ -0,0 +1,181 @@
%%
%% erlmines - The minetest server written in Erlang
%% Copyright (C) 2012 hummermania, Markevych Alexander <rabota.pmr@gmail.com>
%%
-module(connection_disp).
-behaviour(gen_server).
-export([start_link/0
, init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export([set_socket/2, readPeerId/1,readChannel/1]).
-export([makePacket/5,
makeOriginalPacket/1,
makeSplitPacket/3,
makeAutoSplitPacket/3,
makeReliablePacket/2,
test/0]).
-include("connection.hrl").
%% Client API
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%% Server functions
init(_Args) ->
erlang:process_flag(trap_exit, true),
io:format("===connection:init===~n",[]),
case gen_udp:open(30000,[binary]) of
{ok,Socket} -> {ok, [Socket]};
{error, Reason} -> {stop, Reason}
end.
set_socket(Peer, Socket) when is_pid(Peer) andalso is_port(Socket) ->
gen_tcp:controlling_process(Socket, Peer),
gen_server:call(Peer, {set_socket, Socket}).
handle_call({msg,Msg},From,State) ->
io:format("===connection:handle_call===~p, ~p, ~p~n",[Msg,From,State]),
{reply, ok, State};
handle_call({set_socket, Socket}, _From, State) ->
inet:setopts(Socket, [{packet,raw},{active,once}]),
{reply, ok, State}.
handle_cast(calc, State) ->
io:format("result 2+2=4~n"),
{noreply, State};
handle_cast(calcbad, State) ->
io:format("result 1/0~n"),
1 / 0,
{noreply, State};
handle_cast(_Msg, State) ->
io:format("===connection:handle_cast===~n",[]),
{noreply,State}.
handle_info({udp, Socket, _IP, _InPortNo, _Bin} = UdpMsg, State) ->
inet:setopts(Socket, [{active,once}]),
process_packet(UdpMsg),
%%{ok, Decoder1, Frames} = decode(Bin, Decoder),
%%[Consumer ! Frame || Frame <- Frames],
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
process_packet({udp,_Socket,Host,Port,Bin}) ->
io:format("========================================~n",[]),
io:format("Host = ~p, Port = ~p~n",[Host,Port]),
<<_ProtocolId:?U32, SenderPeerId:?U16, Channel:?U8,
PacketType:?U8, Data/binary>> = Bin,
case PacketType of
?TYPE_CONTROL -> Type = "TYPE_CONTROL (0)",
type_control(Data);
?TYPE_ORIGINAL -> Type = "TYPE_ORIGINAL(1)",
type_original(Data);
?TYPE_SPLIT -> Type = "TYPE_SPLIT(2)",
type_split(Data);
?TYPE_RELIABLE -> Type = "TYPE_RELIABLE(2)",
type_reliable(Data)
end,
%io:format("ProtocolId: ~p, SenderPeerId:~p, Channel:~p, PacketType: ~p,~n Data:~p~n",[ProtocolId,SenderPeerId,Channel,Type,Data]).
io:format("SenderPeerId:~p, Channel:~p, PacketType: ~p,~n Data:~p~n",
[SenderPeerId,Channel,Type,Data]).
type_control(<<_ControlType:?U8,_Data/binary>>)-> ok.
type_original(_Data)-> ok.
type_split(<<_SeqNum:?U16,_ChunkCount:?U16,_ChunkNum:?U16,_Data/binary>>)-> ok.
type_reliable(<<_SeqNum:?U16,_Data/binary>>)-> ok.
ceiling(X) ->
T = trunc(X),
case X - T == 0 of
true -> T;
false -> T + 1
end.
readPeerId(PacketData) ->
<<_P:?U32,SenderPeerId:?U16,_Other/binary>> = PacketData,
SenderPeerId.
readChannel(PacketData) ->
<<_P:?U32,_S:?U16,Channel:?U8,_Other/binary>> = PacketData,
Channel.
% This adds the base headers to the data and makes a packet out of it
makePacket(Address, Data, ProtocolId,SenderPeerId,Channel)->
#buffered_packet{
address=Address,
data = <<ProtocolId:?U32,SenderPeerId:?U16,Channel:?U8,Data/binary>>
}.
% Add the TYPE_ORIGINAL header to the data
makeOriginalPacket(Data) ->
#original_packet{ data = <<?TYPE_ORIGINAL:?U8,Data/binary>> }.
% Split data in chunks and add TYPE_SPLIT headers to them
makeSplitPacket(Data,ChunkSizeMax,SeqNum) ->
ChunkHeaderSize = 7,
MaxDataSize = ChunkSizeMax - ChunkHeaderSize,
ChunkCount=ceiling(byte_size(Data)/MaxDataSize),
Chunks = makeSplitPacketList(MaxDataSize, SeqNum, ChunkCount, 0, Data),
Chunks.
makeSplitPacketList(_MaxDataSize, _SeqNum, ChunkCount, ChunkCount, _Data) ->[];
makeSplitPacketList(MaxDataSize, SeqNum, ChunkCount, ChunkNum, Data) ->
DataSize = byte_size(Data),
ChunkDataSize = case MaxDataSize - DataSize < 0 of
true -> MaxDataSize;
false -> DataSize
end,
<<ChunkData:ChunkDataSize/binary,Tile/binary>> = Data,
%%ChunkNum1 = ChunkNum+1,
[<<?TYPE_SPLIT:?U8, SeqNum:?U16, ChunkCount:?U16, ChunkNum:?U16,ChunkData/binary>>|makeSplitPacketList(MaxDataSize,SeqNum,ChunkCount,ChunkNum+1,Tile)].
test() ->
makeSplitPacket(list_to_binary([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22]),9,65500).
% Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
% Increments split_seqnum if a split packet is made
makeAutoSplitPacket(_Data,_ChunkSizeMax,_SplitSeqNum) ->
todo.
% core::list<SharedBuffer<u8> > makeAutoSplitPacket(
% SharedBuffer<u8> data,
% u32 chunksize_max,
% u16 &split_seqnum)
%{
% u32 original_header_size = 1;
% core::list<SharedBuffer<u8> > list;
% if(data.getSize() + original_header_size > chunksize_max)
% {
% list = makeSplitPacket(data, chunksize_max, split_seqnum);
% split_seqnum++;
% return list;
% }
% else
% {
% list.push_back(makeOriginalPacket(data));
% }
% return list;
%}
%
% Add the TYPE_RELIABLE header to the data
makeReliablePacket(Data, SeqNum) ->
#reliable_packet{ data = <<?TYPE_RELIABLE:?U8,SeqNum:?U16,Data/binary>> }.

View File

@ -0,0 +1,31 @@
%%
%% erlmines - The minetest server written in Erlang
%% Copyright (C) 2012 hummermania, Markevych Alexander <rabota.pmr@gmail.com>
%%
-module(connection_sup).
-behaviour(supervisor).
-export([start_link/0, init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init(_Args) ->
io:format("===connection_sup:init===~n",[]),
RestartStrategy = one_for_one, % one_for_one | one_for_all | rest_for_one
MaxRestarts = 10,
MaxSecondsBetweenRestarts = 60,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
Restart = permanent, % permanent | transient | temporary
Shutdown = 2000, % brutal_kill | int() >= 0 | infinity
Type = worker, % worker | supervisor
Connection = {connection_dispatcher, % used to identify the child spec internally by the supervisor
{connection_disp, start_link, []}, % StartFun = {M, F, A}
Restart, Shutdown, Type,
[connection_disp]}, % Modules = [Module] | dynamic
{ok, {SupFlags, [Connection]}}.

View File

@ -1,12 +0,0 @@
{application, erlmines,
[
{description, ""},
{vsn, "0.0.1"},
{registered, [erlmines_app, erlmines_sup, erlmines]},
{applications, [
kernel,
stdlib
]},
{mod, { erlmines_app, []}},
{env, []}
]}.

View File

@ -1,103 +1,47 @@
-module(erlmines).
-behaviour(gen_server).
-include("connection.hrl").
-export([start_link/0]).
-export([init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
%% @type udp_server_option() =
%% {option(), port(), max_restarts(), time(), shutdown(),recv_length(), recv_timeout()}.
%% A data structure holding the options.
%% @type option() = [term()].
%% @type port() = integer().
%% @type max_restarts() = integer().
%% @type time() = integer().
%% @type shutdown() = integer().
%% @type recv_length() = integer().
%% @type recv_timeout() = integer() | infinity.
-record(udp_server_option, {
option = [binary],
port = 4000,
max_restarts = 3,
time = 60,
shutdown = 2000,
recv_length = 0,
recv_timeout = infinity
}).
%% Client API
start_link() ->
gen_server:start_link(?MODULE, [], []).
%% Server functions
init([]) ->
case gen_udp:open(30000,[binary]) of
{ok,Socket} -> {ok, [Socket]};
{error, Reason} -> {stop, Reason}
end.
handle_call(_Request,From,State) ->
error_logger:info_report("handle_call"),
%% io:format("handle_call ========================================~n",[]),
{reply, ok, State}.
handle_cast({udp,Socket,Host,Port,Bin} = Message,State) ->
error_logger:info_report("handle_cast1"),
%% io:format("handle_cast ========================================~n",[]),
%% io:format("Data= ~p~n",[Bin]),
{noreply,State};
handle_cast(_Msg, State) ->
error_logger:info_report("handle_cast2"),
%% io:format("handle_cast ~n",[]),
%% io:format("Data= ~p~n",[Bin]),
{noreply,State}.
handle_info(UdpMsg,State) ->
process_packet(UdpMsg),
{noreply,State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
process_packet({udp,Socket,Host,Port,Bin}) ->
io:format("========================================~n",[]),
% io:format("Host = ~p, Port = ~p~n",[Host,Port]),
<<ProtocolId:?U32, SenderPeerId:?U16, Channel:?U8,
PacketType:?U8, Data/binary>> = Bin,
case PacketType of
?TYPE_CONTROL -> Type = "TYPE_CONTROL (0)",
type_control(Data);
?TYPE_ORIGINAL -> Type = "TYPE_ORIGINAL(1)",
type_original(Data);
?TYPE_SPLIT -> Type = "TYPE_SPLIT(2)",
type_split(Data);
?TYPE_RELIABLE -> Type = "TYPE_RELIABLE(2)",
type_reliable(Data)
end,
%io:format("ProtocolId: ~p, SenderPeerId:~p, Channel:~p, PacketType: ~p,~n Data:~p~n",[ProtocolId,SenderPeerId,Channel,Type,Data]).
io:format("SenderPeerId:~p, Channel:~p, PacketType: ~p,~n Data:~p~n",
[SenderPeerId,Channel,Type,Data]).
type_control(<<ControlType:?U8,Data/binary>>)-> ok.
type_original(Data)-> ok.
type_split(<<SeqNum:?U16,ChunkCount:?U16,ChunkNum:?U16,Data/binary>>)-> ok.
type_reliable(<<SeqNum:?U16,Data/binary>>)-> ok.
-module(erlmines).
-behaviour(gen_server).
-export([start_link/0]).
-export([init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
%% Client API
start_link() ->
gen_server:start_link({local,?MODULE}, ?MODULE, [], []).
%% Server functions
init(_Args) ->
io:format("===erlmines:init===~n",[]),
erlang:process_flag(trap_exit, true),
io:format("erlmines has started (~w)~n", [self()]),
{ok,[]}.
handle_call(_Msg, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
%error_logger:info_report("handle_cast"),
{noreply,State}.
handle_info(_Msg, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
io:format("===erlmines:terminate===~n",[]),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -1,14 +1,14 @@
%%
%% erlmines - The minetest server written in Erlang
%% Copyright (C) 2012 hummermania, Markevych Alexander <rabota.pmr@gmail.com>
%%
-module(erlmines_app).
-behaviour(application).
-export([start/2, stop/1]).
start(_Type, StartArgs) ->
erlmines_sup:start_link(StartArgs).
stop(_State) ->
ok.
%%
%% erlmines - The minetest server written in Erlang
%% Copyright (C) 2012 hummermania, Markevych Alexander <rabota.pmr@gmail.com>
%%
-module(erlmines_app).
-behaviour(application).
-export([start/2, stop/1]).
start(normal, _Args) ->
erlmines_sup:start_link().
stop(_State) ->
ok.

View File

@ -1,39 +1,52 @@
%%
%% erlmines - The minetest server written in Erlang
%% Copyright (C) 2012 hummermania, Markevych Alexander <rabota.pmr@gmail.com>
%%
-module(erlmines_sup).
-behaviour(supervisor).
-export([start/0, start_link/1, init/1]).
start() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []).
start_link(Args) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, Args).
init([]) ->
RestartStrategy = one_for_one, % one_for_one | one_for_all | rest_for_one
MaxRestarts = 10,
MaxSecondsBetweenRestarts = 60,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
Restart = permanent, % permanent | transient | temporary
Shutdown = 2000, % brutal_kill | int() >= 0 | infinity
Type = worker, % worker | supervisor
AChild = {erlmines_sup, % used to identify the child spec internally by the supervisor
{erlmines, start_link, []}, % StartFun = {M, F, A}
Restart, Shutdown, Type,
[erlmines]}, % Modules = [Module] | dynamic
{ok, {SupFlags, [AChild]}}.
% {ok, {{one_for_one, 100, 300},
% [{erlmines_sup,
% {erlmines, start_link, []},
% permanent, 10000, worker, [erlmines]}
% ]}}.
%%
%% erlmines - The minetest server written in Erlang
%% Copyright (C) 2012 hummermania, Markevych Alexander <rabota.pmr@gmail.com>
%%
-module(erlmines_sup).
-behaviour(supervisor).
-export([start_link/0, init/1,
shutdown/0
% start_shell/0
]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%start_shell() ->
% {ok, Pid} = supervisor:start_link(erlmines_sup, []),
% unlink(Pid).
init(_Args) ->
io:format("===erlmines_sup:init===~n",[]),
RestartStrategy = one_for_one, % one_for_one | one_for_all | rest_for_one
MaxRestarts = 10,
MaxSecondsBetweenRestarts = 60,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
Restart = permanent, % permanent | transient | temporary
Shutdown = infinity, % brutal_kill | int() >= 0 | infinity
Type = supervisor, % worker | supervisor
% Erlmines = {erlmines_main, % used to identify the child spec internally by the supervisor
% {erlmines, start_link, []}, % StartFun = {M, F, A}
% Restart, Shutdown, Type,
% [erlmines]},
Connection_sup = {erlmines_connection_sup,
{connection_sup, start_link, []},
Restart, Shutdown, Type,
[connection_sup]},
%%{ok, {SupFlags, [Erlmines, Connection_sup]}}.
%, ?SUPERVISOR_LINK(connection_sup)]}}.
{ok, {SupFlags, [Connection_sup]}}.
% supervisor can be shutdown by calling exit(SupPid,shutdown)
% or, if it's linked to its parent, by parent calling exit/1.
shutdown() ->
exit(whereis(?MODULE), shutdown).
% or
% exit(normal).

View File

@ -1,19 +1,19 @@
%%
%% erlmines - The minetest server written in Erlang
%% Copyright (C) 2012 hummermania, Markevych Alexander <rabota.pmr@gmail.com>
%%
-module(connection_channels).
-compile(export_all).
-define(RPB, connection_reliable_packet_buffer).
-define(ISB, connection_incoming_split_buffer).
init(ChannelID) ->
ets:new(channels,[public, named_table]),
?RPB:init(list_to_atom(atom_to_list(incoming_reliables_) ++ integer_to_list(ChannelID))),
?RPB:init(list_to_atom(atom_to_list(outgoing_reliables_) ++ integer_to_list(ChannelID))),
?ISB:init(list_to_atom(atom_to_list(incoming_splits_) ++ integer_to_list(ChannelID))).
%%
%% erlmines - The minetest server written in Erlang
%% Copyright (C) 2012 hummermania, Markevych Alexander <rabota.pmr@gmail.com>
%%
-module(peer_channels).
-compile(export_all).
-define(RPB, connection_reliable_packet_buffer).
-define(ISB, connection_incoming_split_buffer).
init(ChannelID) ->
ets:new(channels,[public, named_table]),
?RPB:init(list_to_atom(atom_to_list(incoming_reliables_) ++ integer_to_list(ChannelID))),
?RPB:init(list_to_atom(atom_to_list(outgoing_reliables_) ++ integer_to_list(ChannelID))),
?ISB:init(list_to_atom(atom_to_list(incoming_splits_) ++ integer_to_list(ChannelID))).

View File

@ -1,66 +1,66 @@
%%
%% erlmines - The minetest server written in Erlang
%% Copyright (C) 2012 hummermania, Markevych Alexander <rabota.pmr@gmail.com>
%%
-module(connection_peers).
-compile(export_all).
-define(RPB, connection_reliable_packet_buffer).
-define(ISB, connection_incoming_split_buffer).
-record(peer,{
address,
id,
timeout_counter,
ping_timer,
resend_timeout,
avg_rtt,
has_sent_with_id,
m_sendtime_accu,
m_max_packets_per_second,
m_num_sent,
m_max_num_sent
}).
init() ->
ets:new(peers, [ordered_set, public, named_table]),
ok.
new(Address) ->
PeerId = case ets:match(peers,{{used,false},'$1'}) of
[] -> % If we can't find not used peer id's
case ets:last(peers) of % try get the last PeerId
LastKey when is_integer(LastKey) ->
LastKey + 1;
'$end_of_table' -> 2 % If peer list is empty-set Id=2
end;
[[#peer{id=UnusedPeerId}]] -> UnusedPeerId
end,
ets:insert(peers,
{PeerId,{
{used,true},
#peer{
address = Address,
id = PeerId,
timeout_counter = 0.0,
ping_timer = 0.0,
resend_timeout=0.5,
avg_rtt = -1.0,
has_sent_with_id = false,
m_sendtime_accu = 0,
m_max_packets_per_second = 10,
m_num_sent = 0,
m_max_num_sent = 0
}
}
}).
delete(PeerId) ->
ets:delete(peers,PeerId).
lookup(PeerId) ->
ets:lookup(peers,PeerId).
%%
%% erlmines - The minetest server written in Erlang
%% Copyright (C) 2012 hummermania, Markevych Alexander <rabota.pmr@gmail.com>
%%
-module(peer_mgr).
-compile(export_all).
-define(RPB, connection_reliable_packet_buffer).
-define(ISB, connection_incoming_split_buffer).
-record(peer,{
address,
id,
timeout_counter,
ping_timer,
resend_timeout,
avg_rtt,
has_sent_with_id,
m_sendtime_accu,
m_max_packets_per_second,
m_num_sent,
m_max_num_sent
}).
init() ->
ets:new(peers, [ordered_set, public, named_table]),
ok.
new(Address) ->
PeerId = case ets:match(peers,{{used,false},'$1'}) of
[] -> % If we can't find not used peer id's
case ets:last(peers) of % try get the last PeerId
LastKey when is_integer(LastKey) ->
LastKey + 1;
'$end_of_table' -> 2 % If peer list is empty-set Id=2
end;
[[#peer{id=UnusedPeerId}]] -> UnusedPeerId
end,
ets:insert(peers,
{PeerId,{
{used,true},
#peer{
address = Address,
id = PeerId,
timeout_counter = 0.0,
ping_timer = 0.0,
resend_timeout=0.5,
avg_rtt = -1.0,
has_sent_with_id = false,
m_sendtime_accu = 0,
m_max_packets_per_second = 10,
m_num_sent = 0,
m_max_num_sent = 0
}
}
}).
delete(PeerId) ->
ets:delete(peers,PeerId).
lookup(PeerId) ->
ets:lookup(peers,PeerId).

View File

@ -1,135 +1,135 @@
%%
%% erlmines - The minetest server written in Erlang
%% Copyright (C) 2012 hummermania, Markevych Alexander <rabota.pmr@gmail.com>
%%
-module(connection_incoming_split_buffer).
-include("connection.hrl").
-export([init/1, insert/3, removeUnreliableTimedOuts/1]).
init(Table) ->
ets:new(Table, [public, named_table]),
ok.
% TODO: Split buffer need to more learning and understanding
insert(Table, BufferedPacket, _Reliable) ->
_HeaderSize = ?BASE_HEADER_SIZE + 7,
<<_Header:?BASE_HEADER_SIZE, _Type:?U8, SeqNum:?U16, _ChunkCount:?U16,
_ChunkNum:?U16, _Other/binary>> = BufferedPacket,
case ets:lookup(Table, SeqNum) of
[{SeqNum, _FindPacket}] ->
io:format("IncomingSplitPacket with SeqNum=~p is found
in ReliablePacketBuffer~n",[SeqNum]);
[] ->
ets:insert(Table, {SeqNum, BufferedPacket})
end.
%
% This will throw a GotSplitPacketException when a full
% split packet is constructed.
%
% SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
%{
% u32 headersize = BASE_HEADER_SIZE + 7;
% assert(p.data.getSize() >= headersize);
% u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
% assert(type == TYPE_SPLIT);
% u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
% u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
% u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
%
% // Add if doesn't exist
% if(m_buf.find(seqnum) == NULL)
% {
% IncomingSplitPacket *sp = new IncomingSplitPacket();
% sp->chunk_count = chunk_count;
% sp->reliable = reliable;
% m_buf[seqnum] = sp;
% }
%
% IncomingSplitPacket *sp = m_buf[seqnum];
%
% // TODO: These errors should be thrown or something? Dunno.
% if(chunk_count != sp->chunk_count)
% derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
% <<" != sp->chunk_count="<<sp->chunk_count
% <<std::endl;
% if(reliable != sp->reliable)
% derr_con<<"Connection: WARNING: reliable="<<reliable
% <<" != sp->reliable="<<sp->reliable
% <<std::endl;
%
% // If chunk already exists, cancel
% if(sp->chunks.find(chunk_num) != NULL)
% throw AlreadyExistsException("Chunk already in buffer");
%
% // Cut chunk data out of packet
% u32 chunkdatasize = p.data.getSize() - headersize;
% SharedBuffer<u8> chunkdata(chunkdatasize);
% memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
%
% // Set chunk data in buffer
% sp->chunks[chunk_num] = chunkdata;
%
% // If not all chunks are received, return empty buffer
% if(sp->allReceived() == false)
% return SharedBuffer<u8>();
%
% // Calculate total size
% u32 totalsize = 0;
% core::map<u16, SharedBuffer<u8> >::Iterator i;
% i = sp->chunks.getIterator();
% for(; i.atEnd() == false; i++)
% {
% totalsize += i.getNode()->getValue().getSize();
% }
%
% SharedBuffer<u8> fulldata(totalsize);
%
% // Copy chunks to data buffer
% u32 start = 0;
% for(u32 chunk_i=0; chunk_i<sp->chunk_count;
% chunk_i++)
% {
% SharedBuffer<u8> buf = sp->chunks[chunk_i];
% u16 chunkdatasize = buf.getSize();
% memcpy(&fulldata[start], *buf, chunkdatasize);
% start += chunkdatasize;;
% }
%
% // Remove sp from buffer
% m_buf.remove(seqnum);
% delete sp;
%
% return fulldata;
% }
removeUnreliableTimedOuts(_Table) -> ok.
% void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
% {
% core::list<u16> remove_queue;
% core::map<u16, IncomingSplitPacket*>::Iterator i;
% i = m_buf.getIterator();
% for(; i.atEnd() == false; i++)
% {
% IncomingSplitPacket *p = i.getNode()->getValue();
% // Reliable ones are not removed by timeout
% if(p->reliable == true)
% continue;
% p->time += dtime;
% if(p->time >= timeout)
% remove_queue.push_back(i.getNode()->getKey());
% }
% core::list<u16>::Iterator j;
% j = remove_queue.begin();
% for(; j != remove_queue.end(); j++)
% {
% dout_con<<"NOTE: Removing timed out unreliable split packet"
% <<std::endl;
% delete m_buf[*j];
% m_buf.remove(*j);
% }
% }
%%
%% erlmines - The minetest server written in Erlang
%% Copyright (C) 2012 hummermania, Markevych Alexander <rabota.pmr@gmail.com>
%%
-module(peer_packet_buffers).
-include("connection.hrl").
-export([init/1, insert/3, removeUnreliableTimedOuts/1]).
init(Table) ->
ets:new(Table, [public, named_table]),
ok.
% TODO: Split buffer need to more learning and understanding
insert(Table, BufferedPacket, _Reliable) ->
_HeaderSize = ?BASE_HEADER_SIZE + 7,
<<_Header:?BASE_HEADER_SIZE, _Type:?U8, SeqNum:?U16, _ChunkCount:?U16,
_ChunkNum:?U16, _Other/binary>> = BufferedPacket,
case ets:lookup(Table, SeqNum) of
[{SeqNum, _FindPacket}] ->
io:format("IncomingSplitPacket with SeqNum=~p is found
in ReliablePacketBuffer~n",[SeqNum]);
[] ->
ets:insert(Table, {SeqNum, BufferedPacket})
end.
%
% This will throw a GotSplitPacketException when a full
% split packet is constructed.
%
% SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
%{
% u32 headersize = BASE_HEADER_SIZE + 7;
% assert(p.data.getSize() >= headersize);
% u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
% assert(type == TYPE_SPLIT);
% u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
% u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
% u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
%
% // Add if doesn't exist
% if(m_buf.find(seqnum) == NULL)
% {
% IncomingSplitPacket *sp = new IncomingSplitPacket();
% sp->chunk_count = chunk_count;
% sp->reliable = reliable;
% m_buf[seqnum] = sp;
% }
%
% IncomingSplitPacket *sp = m_buf[seqnum];
%
% // TODO: These errors should be thrown or something? Dunno.
% if(chunk_count != sp->chunk_count)
% derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
% <<" != sp->chunk_count="<<sp->chunk_count
% <<std::endl;
% if(reliable != sp->reliable)
% derr_con<<"Connection: WARNING: reliable="<<reliable
% <<" != sp->reliable="<<sp->reliable
% <<std::endl;
%
% // If chunk already exists, cancel
% if(sp->chunks.find(chunk_num) != NULL)
% throw AlreadyExistsException("Chunk already in buffer");
%
% // Cut chunk data out of packet
% u32 chunkdatasize = p.data.getSize() - headersize;
% SharedBuffer<u8> chunkdata(chunkdatasize);
% memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
%
% // Set chunk data in buffer
% sp->chunks[chunk_num] = chunkdata;
%
% // If not all chunks are received, return empty buffer
% if(sp->allReceived() == false)
% return SharedBuffer<u8>();
%
% // Calculate total size
% u32 totalsize = 0;
% core::map<u16, SharedBuffer<u8> >::Iterator i;
% i = sp->chunks.getIterator();
% for(; i.atEnd() == false; i++)
% {
% totalsize += i.getNode()->getValue().getSize();
% }
%
% SharedBuffer<u8> fulldata(totalsize);
%
% // Copy chunks to data buffer
% u32 start = 0;
% for(u32 chunk_i=0; chunk_i<sp->chunk_count;
% chunk_i++)
% {
% SharedBuffer<u8> buf = sp->chunks[chunk_i];
% u16 chunkdatasize = buf.getSize();
% memcpy(&fulldata[start], *buf, chunkdatasize);
% start += chunkdatasize;;
% }
%
% // Remove sp from buffer
% m_buf.remove(seqnum);
% delete sp;
%
% return fulldata;
% }
removeUnreliableTimedOuts(_Table) -> ok.
% void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
% {
% core::list<u16> remove_queue;
% core::map<u16, IncomingSplitPacket*>::Iterator i;
% i = m_buf.getIterator();
% for(; i.atEnd() == false; i++)
% {
% IncomingSplitPacket *p = i.getNode()->getValue();
% // Reliable ones are not removed by timeout
% if(p->reliable == true)
% continue;
% p->time += dtime;
% if(p->time >= timeout)
% remove_queue.push_back(i.getNode()->getKey());
% }
% core::list<u16>::Iterator j;
% j = remove_queue.begin();
% for(; j != remove_queue.end(); j++)
% {
% dout_con<<"NOTE: Removing timed out unreliable split packet"
% <<std::endl;
% delete m_buf[*j];
% m_buf.remove(*j);
% }
% }

View File

@ -1,154 +1,154 @@
%%
%% erlmines - The minetest server written in Erlang
%% Copyright (C) 2012 hummermania, Markevych Alexander <rabota.pmr@gmail.com>
%%
-module(connection_reliable_packet_buffer).
-include("connection.hrl").
-export([init/1, insert/2, findPacket/2, print/1, empty/1,
size/1, notFound/1, getFirstSeqnum/1,
popFirst/1, popSeqnum/2,
incrementTimedOuts/2, resetTimedOuts/2,
anyTotaltimeReached/2, getTimedOuts/2]).
-export([test_insert/1, test_findPacket/1, test_increment/0]).
%-define(TABLE_ID, ?MODULE).
init(Table) ->
ets:new(Table, [public, named_table]).
insert(Table, #buffered_packet{data=BufferedPacket, time=_Time, totaltime=_TotalTime, address = _Address} = BPrec) ->
<<_Header:?BASE_HEADER_SIZE/binary, Type:?U8, SeqNum:?U16,_Other/binary>> = BufferedPacket,
io:format("Type=~p, SeqNum=~p~n",[Type,SeqNum]),
case ets:lookup(Table, SeqNum) of
[{SeqNum, FindPacket}] ->
io:format("Reliable packet with SeqNum=~p is found in ReliablePacketBuffer~n",[SeqNum]),
{exists,FindPacket};
[] -> ets:insert(Table, {SeqNum, BPrec}),
{ok,BPrec}
end.
test_insert(Table) ->
insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,00,34,77,77,77,77]), time=0, totaltime=0}),
insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,01,11,77,77,77,77]), time=0, totaltime=0}),
insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,02,62,77,77,77,77]), time=0, totaltime=0}),
insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,67,02,77,77,77,77]), time=0, totaltime=0}),
insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,00,10,77,77,77,77]), time=0, totaltime=0}).
findPacket(Table, SeqNum) ->
case ets:lookup(Table, SeqNum) of
[{SeqNum, FindPacket}] -> {ok, FindPacket};
[]-> {error, not_found}
end.
test_findPacket(Table)->
findPacket(Table, 15874). % 8704, 2817, 579
print(Table) ->
ets:foldl(fun({SeqNum, Packet},AccIn) ->
io:format("SeqNum=~p: ~p ~n", [SeqNum,Packet]),
AccIn
end, end_list, Table).
empty(Table) ->
Size = ets:info(Table, size),
case Size of
0 -> true;
_ -> Size
end.
size(Table) ->
ets:info(Table, size).
% TODO Check the returning value
notFound(Table) ->
ets:last(Table).
%RPBSearchResult ReliablePacketBuffer::notFound()
%{
% return m_list.end();
%}
getFirstSeqnum(Table) ->
ets:first(Table).
popFirst(Table) ->
FirstSeqNum = ets:first(Table),
#buffered_packet{data=FirstPacket} = ets:lookup(Table, FirstSeqNum),
<<_Header:?BASE_HEADER_SIZE,_Type:?U8,SeqNum:?U16,_Other/binary>> = FirstPacket,
ets:delete(Table, SeqNum),
#buffered_packet{data=FirstPacket}.
popSeqnum(Table, SeqNum) ->
FindPacket = ets:lookup(Table, SeqNum),
ets:delete(Table, SeqNum),
FindPacket.
incrementTimedOuts(Table, Dtime) ->
ets:foldl(fun({SeqNum, Packet},AccIn) ->
#buffered_packet{data = Data, time = Time, totaltime = TotalTime, address = Address} = Packet,
ets:update_element(Table, SeqNum,
{2,#buffered_packet{data=Data,time = Time + Dtime, totaltime = TotalTime + Dtime, address = Address}}),
AccIn
end, end_list, Table).
test_increment()->
init(rpb_test),
test_insert(rpb_test),
print(rpb_test),
incrementTimedOuts(rpb_test, 35),
print(rpb_test).
resetTimedOuts(Table, Timeout) ->
ets:foldl(fun({SeqNum, Packet},AccIn) ->
#buffered_packet{data = Data, time = Time, totaltime = TotalTime, address = Address} = Packet,
if Time >= Timeout ->
ets:update_element(Table, SeqNum,
{2,#buffered_packet{data=Data,time = 0, totaltime = TotalTime, address = Address}});
true -> ok
end,
AccIn
end, end_list, Table).
anyTotaltimeReached(Table, Timeout) ->
%ets:select(Table, ets:fun2ms(fun({SeqNum,#buffered_packet{totaltime = TotalTime}}) when TotalTime >= Timeout -> true end)).
%ets:select(Table,
% ets:fun2ms(fun({Seq,#buffered_packet{data = _, time = _, totaltime = TotalTime, address = _}}) -> TotalTime end)).
%TODO This construction need to refactoring
case ets:select(Table, [{{'_',#buffered_packet{data='_',time='_',totaltime='$1',address='_'}},[{'>=','$1',Timeout}],['$1']}]) of
[] -> false;
_ -> true % or we must check list size
end.
%bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
%{
% core::list<BufferedPacket>::Iterator i;
% i = m_list.begin();
% for(; i != m_list.end(); i++){
% if(i->totaltime >= timeout)
% return true;
% }
% return false;
%}
getTimedOuts(Table, Timeout) ->
% TODO - Check the returning value: list of buffered_packet, or SeqNum
ets:select(Table, [{{'$1',#buffered_packet{data='_',time='$2',totaltime='_',address='_'}},[{'>=','$2',Timeout}],['$1']}]).
% core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
%{
% core::list<BufferedPacket> timed_outs;
% core::list<BufferedPacket>::Iterator i;
% i = m_list.begin();
% for(; i != m_list.end(); i++)
% {
% if(i->time >= timeout)
% timed_outs.push_back(*i);
% }
% return timed_outs;
%}
%%
%% erlmines - The minetest server written in Erlang
%% Copyright (C) 2012 hummermania, Markevych Alexander <rabota.pmr@gmail.com>
%%
-module(peer_reliable_packet_buffer).
-include("connection.hrl").
-export([init/1, insert/2, findPacket/2, print/1, empty/1,
size/1, notFound/1, getFirstSeqnum/1,
popFirst/1, popSeqnum/2,
incrementTimedOuts/2, resetTimedOuts/2,
anyTotaltimeReached/2, getTimedOuts/2]).
-export([test_insert/1, test_findPacket/1, test_increment/0]).
%-define(TABLE_ID, ?MODULE).
init(Table) ->
ets:new(Table, [public, named_table]).
insert(Table, #buffered_packet{data=BufferedPacket, time=_Time, totaltime=_TotalTime, address = _Address} = BPrec) ->
<<_Header:?BASE_HEADER_SIZE/binary, Type:?U8, SeqNum:?U16,_Other/binary>> = BufferedPacket,
io:format("Type=~p, SeqNum=~p~n",[Type,SeqNum]),
case ets:lookup(Table, SeqNum) of
[{SeqNum, FindPacket}] ->
io:format("Reliable packet with SeqNum=~p is found in ReliablePacketBuffer~n",[SeqNum]),
{exists,FindPacket};
[] -> ets:insert(Table, {SeqNum, BPrec}),
{ok,BPrec}
end.
test_insert(Table) ->
insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,00,34,77,77,77,77]), time=0, totaltime=0}),
insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,01,11,77,77,77,77]), time=0, totaltime=0}),
insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,02,62,77,77,77,77]), time=0, totaltime=0}),
insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,67,02,77,77,77,77]), time=0, totaltime=0}),
insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,00,10,77,77,77,77]), time=0, totaltime=0}).
findPacket(Table, SeqNum) ->
case ets:lookup(Table, SeqNum) of
[{SeqNum, FindPacket}] -> {ok, FindPacket};
[]-> {error, not_found}
end.
test_findPacket(Table)->
findPacket(Table, 15874). % 8704, 2817, 579
print(Table) ->
ets:foldl(fun({SeqNum, Packet},AccIn) ->
io:format("SeqNum=~p: ~p ~n", [SeqNum,Packet]),
AccIn
end, end_list, Table).
empty(Table) ->
Size = ets:info(Table, size),
case Size of
0 -> true;
_ -> Size
end.
size(Table) ->
ets:info(Table, size).
% TODO Check the returning value
notFound(Table) ->
ets:last(Table).
%RPBSearchResult ReliablePacketBuffer::notFound()
%{
% return m_list.end();
%}
getFirstSeqnum(Table) ->
ets:first(Table).
popFirst(Table) ->
FirstSeqNum = ets:first(Table),
#buffered_packet{data=FirstPacket} = ets:lookup(Table, FirstSeqNum),
<<_Header:?BASE_HEADER_SIZE,_Type:?U8,SeqNum:?U16,_Other/binary>> = FirstPacket,
ets:delete(Table, SeqNum),
#buffered_packet{data=FirstPacket}.
popSeqnum(Table, SeqNum) ->
FindPacket = ets:lookup(Table, SeqNum),
ets:delete(Table, SeqNum),
FindPacket.
incrementTimedOuts(Table, Dtime) ->
ets:foldl(fun({SeqNum, Packet},AccIn) ->
#buffered_packet{data = Data, time = Time, totaltime = TotalTime, address = Address} = Packet,
ets:update_element(Table, SeqNum,
{2,#buffered_packet{data=Data,time = Time + Dtime, totaltime = TotalTime + Dtime, address = Address}}),
AccIn
end, end_list, Table).
test_increment()->
init(rpb_test),
test_insert(rpb_test),
print(rpb_test),
incrementTimedOuts(rpb_test, 35),
print(rpb_test).
resetTimedOuts(Table, Timeout) ->
ets:foldl(fun({SeqNum, Packet},AccIn) ->
#buffered_packet{data = Data, time = Time, totaltime = TotalTime, address = Address} = Packet,
if Time >= Timeout ->
ets:update_element(Table, SeqNum,
{2,#buffered_packet{data=Data,time = 0, totaltime = TotalTime, address = Address}});
true -> ok
end,
AccIn
end, end_list, Table).
anyTotaltimeReached(Table, Timeout) ->
%ets:select(Table, ets:fun2ms(fun({SeqNum,#buffered_packet{totaltime = TotalTime}}) when TotalTime >= Timeout -> true end)).
%ets:select(Table,
% ets:fun2ms(fun({Seq,#buffered_packet{data = _, time = _, totaltime = TotalTime, address = _}}) -> TotalTime end)).
%TODO This construction need to refactoring
case ets:select(Table, [{{'_',#buffered_packet{data='_',time='_',totaltime='$1',address='_'}},[{'>=','$1',Timeout}],['$1']}]) of
[] -> false;
_ -> true % or we must check list size
end.
%bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
%{
% core::list<BufferedPacket>::Iterator i;
% i = m_list.begin();
% for(; i != m_list.end(); i++){
% if(i->totaltime >= timeout)
% return true;
% }
% return false;
%}
getTimedOuts(Table, Timeout) ->
% TODO - Check the returning value: list of buffered_packet, or SeqNum
ets:select(Table, [{{'$1',#buffered_packet{data='_',time='$2',totaltime='_',address='_'}},[{'>=','$2',Timeout}],['$1']}]).
% core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
%{
% core::list<BufferedPacket> timed_outs;
% core::list<BufferedPacket>::Iterator i;
% i = m_list.begin();
% for(; i != m_list.end(); i++)
% {
% if(i->time >= timeout)
% timed_outs.push_back(*i);
% }
% return timed_outs;
%}