From d9b32e67d89cf9888f11bed293cfc2e7415d1c42 Mon Sep 17 00:00:00 2001 From: Alexander Markevych Date: Tue, 24 Jul 2012 16:58:42 +0300 Subject: [PATCH] Refactoring process tree. --- ebin/erlmines.app | 16 +- erlmines.sh | 1 + include/connection.hrl | 20 ++ src/connection/connection.erl | 93 ------ src/connection/connection_disp.erl | 181 ++++++++++ src/connection/connection_sup.erl | 31 ++ src/erlmines.app.src | 12 - src/erlmines.erl | 150 +++------ src/erlmines_app.erl | 28 +- src/erlmines_sup.erl | 91 +++--- .../peer_channels.erl} | 38 +-- .../peer_mgr.erl} | 132 ++++---- .../peer_packet_buffers.erl} | 270 +++++++-------- .../peer_reliable_packet_buffer.erl} | 308 +++++++++--------- 14 files changed, 731 insertions(+), 640 deletions(-) delete mode 100644 src/connection/connection.erl create mode 100644 src/connection/connection_disp.erl create mode 100644 src/connection/connection_sup.erl delete mode 100644 src/erlmines.app.src rename src/{connection/connection_channels.erl => peer/peer_channels.erl} (93%) rename src/{connection/connection_peers.erl => peer/peer_mgr.erl} (95%) rename src/{connection/connection_incoming_split_buffer.erl => peer/peer_packet_buffers.erl} (96%) rename src/{connection/connection_reliable_packet_buffer.erl => peer/peer_reliable_packet_buffer.erl} (96%) diff --git a/ebin/erlmines.app b/ebin/erlmines.app index e5f655e..1e22ab0 100644 --- a/ebin/erlmines.app +++ b/ebin/erlmines.app @@ -1,11 +1,17 @@ {application,erlmines, [{description,[]}, {vsn,"0.0.1"}, - {registered,[erlmines_app,erlmines_sup,erlmines]}, + {registered,[erlmines_app,erlmines_sup,connection_sup,connection_disp]}, {applications,[kernel,stdlib]}, {mod,{erlmines_app,[]}}, {env,[]}, - {modules,[clientserver,connection,connection_channels, - connection_incoming_split_buffer,connection_peers, - connection_reliable_packet_buffer,erlmines, - erlmines_app,erlmines_sup]}]}. + {modules,[erlmines_app, + erlmines_sup, + erlmines, + connection_sup, + connection_disp, + peer_channels, + peer_packet_buffers, + peer_mgr, + peer_reliable_packet_buffer, + clientserver]}]}. diff --git a/erlmines.sh b/erlmines.sh index 9d8e42f..40a9621 100755 --- a/erlmines.sh +++ b/erlmines.sh @@ -1,3 +1,4 @@ #!/bin/bash #erl -noshell -pa ./ebin -eval 'application:start(erlmines)' +erl -smp auto -pa ./ebin -eval 'application:start(erlmines)' diff --git a/include/connection.hrl b/include/connection.hrl index 3b35721..fc61bc5 100644 --- a/include/connection.hrl +++ b/include/connection.hrl @@ -9,6 +9,26 @@ -define(U16, 16/unsigned-little-integer). -define(U32, 32/unsigned-little-integer). +%% @type udp_server_option() = +%% {option(), port(), max_restarts(), time(), shutdown(),recv_length(), recv_timeout()}. +%% A data structure holding the options. +%% @type option() = [term()]. +%% @type port() = integer(). +%% @type max_restarts() = integer(). +%% @type time() = integer(). +%% @type shutdown() = integer(). +%% @type recv_length() = integer(). +%% @type recv_timeout() = integer() | infinity. +-record(udp_server_option, { + option = [binary], + port = 4000, + max_restarts = 3, + time = 60, + shutdown = 2000, + recv_length = 0, + recv_timeout = infinity +}). + -record(buffered_packet, { data, % Data of the packet, including headers time, % Seconds from buffering the packet or re-sending diff --git a/src/connection/connection.erl b/src/connection/connection.erl deleted file mode 100644 index b31da03..0000000 --- a/src/connection/connection.erl +++ /dev/null @@ -1,93 +0,0 @@ -%% -%% erlmines - The minetest server written in Erlang -%% Copyright (C) 2012 hummermania, Markevych Alexander -%% - --module(connection). --include("connection.hrl"). - --export([readPeerId/1,readChannel/1]). --export([makePacket/5, - makeOriginalPacket/1, - makeSplitPacket/3, - makeAutoSplitPacket/3, - makeReliablePacket/2, - test/0]). - -ceiling(X) -> - T = trunc(X), - case X - T == 0 of - true -> T; - false -> T + 1 - end. - -readPeerId(PacketData) -> - <<_P:?U32,SenderPeerId:?U16,_Other/binary>> = PacketData, - SenderPeerId. - -readChannel(PacketData) -> - <<_P:?U32,_S:?U16,Channel:?U8,_Other/binary>> = PacketData, - Channel. - -% This adds the base headers to the data and makes a packet out of it -makePacket(Address, Data, ProtocolId,SenderPeerId,Channel)-> - #buffered_packet{ - address=Address, - data = <> - }. -% Add the TYPE_ORIGINAL header to the data -makeOriginalPacket(Data) -> - #original_packet{ data = <> }. - -% Split data in chunks and add TYPE_SPLIT headers to them -makeSplitPacket(Data,ChunkSizeMax,SeqNum) -> - ChunkHeaderSize = 7, - MaxDataSize = ChunkSizeMax - ChunkHeaderSize, - ChunkCount=ceiling(byte_size(Data)/MaxDataSize), - Chunks = makeSplitPacketList(MaxDataSize, SeqNum, ChunkCount, 0, Data), - Chunks. - -makeSplitPacketList(_MaxDataSize, _SeqNum, ChunkCount, ChunkCount, _Data) ->[]; -makeSplitPacketList(MaxDataSize, SeqNum, ChunkCount, ChunkNum, Data) -> - DataSize = byte_size(Data), - ChunkDataSize = case MaxDataSize - DataSize < 0 of - true -> MaxDataSize; - false -> DataSize - end, - <> = Data, - %%ChunkNum1 = ChunkNum+1, - [<>|makeSplitPacketList(MaxDataSize,SeqNum,ChunkCount,ChunkNum+1,Tile)]. - -test() -> - makeSplitPacket(list_to_binary([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22]),9,65500). - -% Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet -% Increments split_seqnum if a split packet is made -makeAutoSplitPacket(_Data,_ChunkSizeMax,_SplitSeqNum) -> - todo. - -% core::list > makeAutoSplitPacket( -% SharedBuffer data, -% u32 chunksize_max, -% u16 &split_seqnum) -%{ -% u32 original_header_size = 1; -% core::list > list; -% if(data.getSize() + original_header_size > chunksize_max) -% { -% list = makeSplitPacket(data, chunksize_max, split_seqnum); -% split_seqnum++; -% return list; -% } -% else -% { -% list.push_back(makeOriginalPacket(data)); -% } -% return list; -%} -% - -% Add the TYPE_RELIABLE header to the data -makeReliablePacket(Data, SeqNum) -> - #reliable_packet{ data = <> }. - diff --git a/src/connection/connection_disp.erl b/src/connection/connection_disp.erl new file mode 100644 index 0000000..738ab24 --- /dev/null +++ b/src/connection/connection_disp.erl @@ -0,0 +1,181 @@ +%% +%% erlmines - The minetest server written in Erlang +%% Copyright (C) 2012 hummermania, Markevych Alexander +%% + +-module(connection_disp). + +-behaviour(gen_server). + + +-export([start_link/0 + , init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-export([set_socket/2, readPeerId/1,readChannel/1]). +-export([makePacket/5, + makeOriginalPacket/1, + makeSplitPacket/3, + makeAutoSplitPacket/3, + makeReliablePacket/2, + test/0]). + +-include("connection.hrl"). + +%% Client API +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%% Server functions +init(_Args) -> + erlang:process_flag(trap_exit, true), + io:format("===connection:init===~n",[]), + case gen_udp:open(30000,[binary]) of + {ok,Socket} -> {ok, [Socket]}; + {error, Reason} -> {stop, Reason} + end. + +set_socket(Peer, Socket) when is_pid(Peer) andalso is_port(Socket) -> + gen_tcp:controlling_process(Socket, Peer), + gen_server:call(Peer, {set_socket, Socket}). + +handle_call({msg,Msg},From,State) -> + io:format("===connection:handle_call===~p, ~p, ~p~n",[Msg,From,State]), + {reply, ok, State}; + +handle_call({set_socket, Socket}, _From, State) -> + inet:setopts(Socket, [{packet,raw},{active,once}]), + {reply, ok, State}. + +handle_cast(calc, State) -> + io:format("result 2+2=4~n"), + {noreply, State}; +handle_cast(calcbad, State) -> + io:format("result 1/0~n"), + 1 / 0, + {noreply, State}; +handle_cast(_Msg, State) -> + io:format("===connection:handle_cast===~n",[]), + {noreply,State}. + +handle_info({udp, Socket, _IP, _InPortNo, _Bin} = UdpMsg, State) -> + inet:setopts(Socket, [{active,once}]), + process_packet(UdpMsg), + %%{ok, Decoder1, Frames} = decode(Bin, Decoder), + %%[Consumer ! Frame || Frame <- Frames], + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +process_packet({udp,_Socket,Host,Port,Bin}) -> + io:format("========================================~n",[]), + io:format("Host = ~p, Port = ~p~n",[Host,Port]), + <<_ProtocolId:?U32, SenderPeerId:?U16, Channel:?U8, + PacketType:?U8, Data/binary>> = Bin, + case PacketType of + ?TYPE_CONTROL -> Type = "TYPE_CONTROL (0)", + type_control(Data); + ?TYPE_ORIGINAL -> Type = "TYPE_ORIGINAL(1)", + type_original(Data); + ?TYPE_SPLIT -> Type = "TYPE_SPLIT(2)", + type_split(Data); + ?TYPE_RELIABLE -> Type = "TYPE_RELIABLE(2)", + type_reliable(Data) + end, + %io:format("ProtocolId: ~p, SenderPeerId:~p, Channel:~p, PacketType: ~p,~n Data:~p~n",[ProtocolId,SenderPeerId,Channel,Type,Data]). + io:format("SenderPeerId:~p, Channel:~p, PacketType: ~p,~n Data:~p~n", + [SenderPeerId,Channel,Type,Data]). + +type_control(<<_ControlType:?U8,_Data/binary>>)-> ok. + +type_original(_Data)-> ok. +type_split(<<_SeqNum:?U16,_ChunkCount:?U16,_ChunkNum:?U16,_Data/binary>>)-> ok. +type_reliable(<<_SeqNum:?U16,_Data/binary>>)-> ok. + + +ceiling(X) -> + T = trunc(X), + case X - T == 0 of + true -> T; + false -> T + 1 + end. + +readPeerId(PacketData) -> + <<_P:?U32,SenderPeerId:?U16,_Other/binary>> = PacketData, + SenderPeerId. + +readChannel(PacketData) -> + <<_P:?U32,_S:?U16,Channel:?U8,_Other/binary>> = PacketData, + Channel. + +% This adds the base headers to the data and makes a packet out of it +makePacket(Address, Data, ProtocolId,SenderPeerId,Channel)-> + #buffered_packet{ + address=Address, + data = <> + }. +% Add the TYPE_ORIGINAL header to the data +makeOriginalPacket(Data) -> + #original_packet{ data = <> }. + +% Split data in chunks and add TYPE_SPLIT headers to them +makeSplitPacket(Data,ChunkSizeMax,SeqNum) -> + ChunkHeaderSize = 7, + MaxDataSize = ChunkSizeMax - ChunkHeaderSize, + ChunkCount=ceiling(byte_size(Data)/MaxDataSize), + Chunks = makeSplitPacketList(MaxDataSize, SeqNum, ChunkCount, 0, Data), + Chunks. + +makeSplitPacketList(_MaxDataSize, _SeqNum, ChunkCount, ChunkCount, _Data) ->[]; +makeSplitPacketList(MaxDataSize, SeqNum, ChunkCount, ChunkNum, Data) -> + DataSize = byte_size(Data), + ChunkDataSize = case MaxDataSize - DataSize < 0 of + true -> MaxDataSize; + false -> DataSize + end, + <> = Data, + %%ChunkNum1 = ChunkNum+1, + [<>|makeSplitPacketList(MaxDataSize,SeqNum,ChunkCount,ChunkNum+1,Tile)]. + +test() -> + makeSplitPacket(list_to_binary([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22]),9,65500). + +% Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet +% Increments split_seqnum if a split packet is made +makeAutoSplitPacket(_Data,_ChunkSizeMax,_SplitSeqNum) -> + todo. + +% core::list > makeAutoSplitPacket( +% SharedBuffer data, +% u32 chunksize_max, +% u16 &split_seqnum) +%{ +% u32 original_header_size = 1; +% core::list > list; +% if(data.getSize() + original_header_size > chunksize_max) +% { +% list = makeSplitPacket(data, chunksize_max, split_seqnum); +% split_seqnum++; +% return list; +% } +% else +% { +% list.push_back(makeOriginalPacket(data)); +% } +% return list; +%} +% + +% Add the TYPE_RELIABLE header to the data +makeReliablePacket(Data, SeqNum) -> + #reliable_packet{ data = <> }. + diff --git a/src/connection/connection_sup.erl b/src/connection/connection_sup.erl new file mode 100644 index 0000000..2056d73 --- /dev/null +++ b/src/connection/connection_sup.erl @@ -0,0 +1,31 @@ +%% +%% erlmines - The minetest server written in Erlang +%% Copyright (C) 2012 hummermania, Markevych Alexander +%% +-module(connection_sup). + +-behaviour(supervisor). + +-export([start_link/0, init/1]). + + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init(_Args) -> + io:format("===connection_sup:init===~n",[]), + RestartStrategy = one_for_one, % one_for_one | one_for_all | rest_for_one + MaxRestarts = 10, + MaxSecondsBetweenRestarts = 60, + SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts}, + + Restart = permanent, % permanent | transient | temporary + Shutdown = 2000, % brutal_kill | int() >= 0 | infinity + Type = worker, % worker | supervisor + + Connection = {connection_dispatcher, % used to identify the child spec internally by the supervisor + {connection_disp, start_link, []}, % StartFun = {M, F, A} + Restart, Shutdown, Type, + [connection_disp]}, % Modules = [Module] | dynamic + + {ok, {SupFlags, [Connection]}}. diff --git a/src/erlmines.app.src b/src/erlmines.app.src deleted file mode 100644 index 5541783..0000000 --- a/src/erlmines.app.src +++ /dev/null @@ -1,12 +0,0 @@ -{application, erlmines, - [ - {description, ""}, - {vsn, "0.0.1"}, - {registered, [erlmines_app, erlmines_sup, erlmines]}, - {applications, [ - kernel, - stdlib - ]}, - {mod, { erlmines_app, []}}, - {env, []} - ]}. diff --git a/src/erlmines.erl b/src/erlmines.erl index eb8ecf5..71f5cf3 100644 --- a/src/erlmines.erl +++ b/src/erlmines.erl @@ -1,103 +1,47 @@ --module(erlmines). - --behaviour(gen_server). --include("connection.hrl"). - --export([start_link/0]). --export([init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). - -%% @type udp_server_option() = -%% {option(), port(), max_restarts(), time(), shutdown(),recv_length(), recv_timeout()}. -%% A data structure holding the options. -%% @type option() = [term()]. -%% @type port() = integer(). -%% @type max_restarts() = integer(). -%% @type time() = integer(). -%% @type shutdown() = integer(). -%% @type recv_length() = integer(). -%% @type recv_timeout() = integer() | infinity. --record(udp_server_option, { - option = [binary], - port = 4000, - max_restarts = 3, - time = 60, - shutdown = 2000, - recv_length = 0, - recv_timeout = infinity -}). - -%% Client API -start_link() -> - gen_server:start_link(?MODULE, [], []). - -%% Server functions -init([]) -> - case gen_udp:open(30000,[binary]) of - {ok,Socket} -> {ok, [Socket]}; - {error, Reason} -> {stop, Reason} - end. - -handle_call(_Request,From,State) -> - error_logger:info_report("handle_call"), - %% io:format("handle_call ========================================~n",[]), - {reply, ok, State}. - -handle_cast({udp,Socket,Host,Port,Bin} = Message,State) -> - error_logger:info_report("handle_cast1"), - %% io:format("handle_cast ========================================~n",[]), - %% io:format("Data= ~p~n",[Bin]), - {noreply,State}; - -handle_cast(_Msg, State) -> - error_logger:info_report("handle_cast2"), - %% io:format("handle_cast ~n",[]), - %% io:format("Data= ~p~n",[Bin]), - {noreply,State}. - -handle_info(UdpMsg,State) -> - process_packet(UdpMsg), - {noreply,State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -process_packet({udp,Socket,Host,Port,Bin}) -> - io:format("========================================~n",[]), - % io:format("Host = ~p, Port = ~p~n",[Host,Port]), - <> = Bin, - case PacketType of - ?TYPE_CONTROL -> Type = "TYPE_CONTROL (0)", - type_control(Data); - ?TYPE_ORIGINAL -> Type = "TYPE_ORIGINAL(1)", - type_original(Data); - ?TYPE_SPLIT -> Type = "TYPE_SPLIT(2)", - type_split(Data); - ?TYPE_RELIABLE -> Type = "TYPE_RELIABLE(2)", - type_reliable(Data) - end, - %io:format("ProtocolId: ~p, SenderPeerId:~p, Channel:~p, PacketType: ~p,~n Data:~p~n",[ProtocolId,SenderPeerId,Channel,Type,Data]). - io:format("SenderPeerId:~p, Channel:~p, PacketType: ~p,~n Data:~p~n", - [SenderPeerId,Channel,Type,Data]). - -type_control(<>)-> ok. - -type_original(Data)-> ok. -type_split(<>)-> ok. -type_reliable(<>)-> ok. - - - - - - - +-module(erlmines). + +-behaviour(gen_server). + +-export([start_link/0]). +-export([init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +%% Client API +start_link() -> + gen_server:start_link({local,?MODULE}, ?MODULE, [], []). + +%% Server functions +init(_Args) -> + io:format("===erlmines:init===~n",[]), + erlang:process_flag(trap_exit, true), + io:format("erlmines has started (~w)~n", [self()]), + {ok,[]}. + + +handle_call(_Msg, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + %error_logger:info_report("handle_cast"), + {noreply,State}. + +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + io:format("===erlmines:terminate===~n",[]), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + + + + + diff --git a/src/erlmines_app.erl b/src/erlmines_app.erl index 4743b11..6083827 100644 --- a/src/erlmines_app.erl +++ b/src/erlmines_app.erl @@ -1,14 +1,14 @@ -%% -%% erlmines - The minetest server written in Erlang -%% Copyright (C) 2012 hummermania, Markevych Alexander -%% - --module(erlmines_app). --behaviour(application). --export([start/2, stop/1]). - -start(_Type, StartArgs) -> - erlmines_sup:start_link(StartArgs). - -stop(_State) -> - ok. +%% +%% erlmines - The minetest server written in Erlang +%% Copyright (C) 2012 hummermania, Markevych Alexander +%% + +-module(erlmines_app). +-behaviour(application). +-export([start/2, stop/1]). + +start(normal, _Args) -> + erlmines_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/erlmines_sup.erl b/src/erlmines_sup.erl index dd4db1f..fe988bd 100644 --- a/src/erlmines_sup.erl +++ b/src/erlmines_sup.erl @@ -1,39 +1,52 @@ -%% -%% erlmines - The minetest server written in Erlang -%% Copyright (C) 2012 hummermania, Markevych Alexander -%% --module(erlmines_sup). - --behaviour(supervisor). - --export([start/0, start_link/1, init/1]). - -start() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []). - -start_link(Args) -> - supervisor:start_link({local, ?MODULE}, ?MODULE, Args). - -init([]) -> - RestartStrategy = one_for_one, % one_for_one | one_for_all | rest_for_one - MaxRestarts = 10, - MaxSecondsBetweenRestarts = 60, - SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts}, - - Restart = permanent, % permanent | transient | temporary - Shutdown = 2000, % brutal_kill | int() >= 0 | infinity - Type = worker, % worker | supervisor - - AChild = {erlmines_sup, % used to identify the child spec internally by the supervisor - {erlmines, start_link, []}, % StartFun = {M, F, A} - Restart, Shutdown, Type, - [erlmines]}, % Modules = [Module] | dynamic - - {ok, {SupFlags, [AChild]}}. - - -% {ok, {{one_for_one, 100, 300}, -% [{erlmines_sup, -% {erlmines, start_link, []}, -% permanent, 10000, worker, [erlmines]} -% ]}}. +%% +%% erlmines - The minetest server written in Erlang +%% Copyright (C) 2012 hummermania, Markevych Alexander +%% +-module(erlmines_sup). + +-behaviour(supervisor). + +-export([start_link/0, init/1, + shutdown/0 + % start_shell/0 + ]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%start_shell() -> +% {ok, Pid} = supervisor:start_link(erlmines_sup, []), +% unlink(Pid). + +init(_Args) -> + io:format("===erlmines_sup:init===~n",[]), + RestartStrategy = one_for_one, % one_for_one | one_for_all | rest_for_one + MaxRestarts = 10, + MaxSecondsBetweenRestarts = 60, + SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts}, + + Restart = permanent, % permanent | transient | temporary + Shutdown = infinity, % brutal_kill | int() >= 0 | infinity + Type = supervisor, % worker | supervisor + +% Erlmines = {erlmines_main, % used to identify the child spec internally by the supervisor +% {erlmines, start_link, []}, % StartFun = {M, F, A} +% Restart, Shutdown, Type, +% [erlmines]}, + + Connection_sup = {erlmines_connection_sup, + {connection_sup, start_link, []}, + Restart, Shutdown, Type, + [connection_sup]}, + + %%{ok, {SupFlags, [Erlmines, Connection_sup]}}. + %, ?SUPERVISOR_LINK(connection_sup)]}}. + {ok, {SupFlags, [Connection_sup]}}. + + +% supervisor can be shutdown by calling exit(SupPid,shutdown) +% or, if it's linked to its parent, by parent calling exit/1. +shutdown() -> + exit(whereis(?MODULE), shutdown). + % or + % exit(normal). \ No newline at end of file diff --git a/src/connection/connection_channels.erl b/src/peer/peer_channels.erl similarity index 93% rename from src/connection/connection_channels.erl rename to src/peer/peer_channels.erl index 9919b8e..b1808dd 100644 --- a/src/connection/connection_channels.erl +++ b/src/peer/peer_channels.erl @@ -1,19 +1,19 @@ -%% -%% erlmines - The minetest server written in Erlang -%% Copyright (C) 2012 hummermania, Markevych Alexander -%% --module(connection_channels). - --compile(export_all). - --define(RPB, connection_reliable_packet_buffer). --define(ISB, connection_incoming_split_buffer). - -init(ChannelID) -> - ets:new(channels,[public, named_table]), - ?RPB:init(list_to_atom(atom_to_list(incoming_reliables_) ++ integer_to_list(ChannelID))), - ?RPB:init(list_to_atom(atom_to_list(outgoing_reliables_) ++ integer_to_list(ChannelID))), - ?ISB:init(list_to_atom(atom_to_list(incoming_splits_) ++ integer_to_list(ChannelID))). - - - +%% +%% erlmines - The minetest server written in Erlang +%% Copyright (C) 2012 hummermania, Markevych Alexander +%% +-module(peer_channels). + +-compile(export_all). + +-define(RPB, connection_reliable_packet_buffer). +-define(ISB, connection_incoming_split_buffer). + +init(ChannelID) -> + ets:new(channels,[public, named_table]), + ?RPB:init(list_to_atom(atom_to_list(incoming_reliables_) ++ integer_to_list(ChannelID))), + ?RPB:init(list_to_atom(atom_to_list(outgoing_reliables_) ++ integer_to_list(ChannelID))), + ?ISB:init(list_to_atom(atom_to_list(incoming_splits_) ++ integer_to_list(ChannelID))). + + + diff --git a/src/connection/connection_peers.erl b/src/peer/peer_mgr.erl similarity index 95% rename from src/connection/connection_peers.erl rename to src/peer/peer_mgr.erl index 23a83ac..d9f6f70 100644 --- a/src/connection/connection_peers.erl +++ b/src/peer/peer_mgr.erl @@ -1,66 +1,66 @@ -%% -%% erlmines - The minetest server written in Erlang -%% Copyright (C) 2012 hummermania, Markevych Alexander -%% --module(connection_peers). - --compile(export_all). - --define(RPB, connection_reliable_packet_buffer). --define(ISB, connection_incoming_split_buffer). - --record(peer,{ - address, - id, - timeout_counter, - ping_timer, - resend_timeout, - avg_rtt, - has_sent_with_id, - m_sendtime_accu, - m_max_packets_per_second, - m_num_sent, - m_max_num_sent - }). - - -init() -> - ets:new(peers, [ordered_set, public, named_table]), - ok. - -new(Address) -> - PeerId = case ets:match(peers,{{used,false},'$1'}) of - [] -> % If we can't find not used peer id's - case ets:last(peers) of % try get the last PeerId - LastKey when is_integer(LastKey) -> - LastKey + 1; - '$end_of_table' -> 2 % If peer list is empty-set Id=2 - end; - [[#peer{id=UnusedPeerId}]] -> UnusedPeerId - end, - ets:insert(peers, - {PeerId,{ - {used,true}, - #peer{ - address = Address, - id = PeerId, - timeout_counter = 0.0, - ping_timer = 0.0, - resend_timeout=0.5, - avg_rtt = -1.0, - has_sent_with_id = false, - m_sendtime_accu = 0, - m_max_packets_per_second = 10, - m_num_sent = 0, - m_max_num_sent = 0 - } - } - }). - -delete(PeerId) -> - ets:delete(peers,PeerId). - -lookup(PeerId) -> - ets:lookup(peers,PeerId). - - +%% +%% erlmines - The minetest server written in Erlang +%% Copyright (C) 2012 hummermania, Markevych Alexander +%% +-module(peer_mgr). + +-compile(export_all). + +-define(RPB, connection_reliable_packet_buffer). +-define(ISB, connection_incoming_split_buffer). + +-record(peer,{ + address, + id, + timeout_counter, + ping_timer, + resend_timeout, + avg_rtt, + has_sent_with_id, + m_sendtime_accu, + m_max_packets_per_second, + m_num_sent, + m_max_num_sent + }). + + +init() -> + ets:new(peers, [ordered_set, public, named_table]), + ok. + +new(Address) -> + PeerId = case ets:match(peers,{{used,false},'$1'}) of + [] -> % If we can't find not used peer id's + case ets:last(peers) of % try get the last PeerId + LastKey when is_integer(LastKey) -> + LastKey + 1; + '$end_of_table' -> 2 % If peer list is empty-set Id=2 + end; + [[#peer{id=UnusedPeerId}]] -> UnusedPeerId + end, + ets:insert(peers, + {PeerId,{ + {used,true}, + #peer{ + address = Address, + id = PeerId, + timeout_counter = 0.0, + ping_timer = 0.0, + resend_timeout=0.5, + avg_rtt = -1.0, + has_sent_with_id = false, + m_sendtime_accu = 0, + m_max_packets_per_second = 10, + m_num_sent = 0, + m_max_num_sent = 0 + } + } + }). + +delete(PeerId) -> + ets:delete(peers,PeerId). + +lookup(PeerId) -> + ets:lookup(peers,PeerId). + + diff --git a/src/connection/connection_incoming_split_buffer.erl b/src/peer/peer_packet_buffers.erl similarity index 96% rename from src/connection/connection_incoming_split_buffer.erl rename to src/peer/peer_packet_buffers.erl index 578461f..fdf25d9 100644 --- a/src/connection/connection_incoming_split_buffer.erl +++ b/src/peer/peer_packet_buffers.erl @@ -1,135 +1,135 @@ -%% -%% erlmines - The minetest server written in Erlang -%% Copyright (C) 2012 hummermania, Markevych Alexander -%% - --module(connection_incoming_split_buffer). --include("connection.hrl"). - --export([init/1, insert/3, removeUnreliableTimedOuts/1]). - - -init(Table) -> - ets:new(Table, [public, named_table]), - ok. - -% TODO: Split buffer need to more learning and understanding - -insert(Table, BufferedPacket, _Reliable) -> - _HeaderSize = ?BASE_HEADER_SIZE + 7, - <<_Header:?BASE_HEADER_SIZE, _Type:?U8, SeqNum:?U16, _ChunkCount:?U16, - _ChunkNum:?U16, _Other/binary>> = BufferedPacket, - case ets:lookup(Table, SeqNum) of - [{SeqNum, _FindPacket}] -> - io:format("IncomingSplitPacket with SeqNum=~p is found - in ReliablePacketBuffer~n",[SeqNum]); - [] -> - ets:insert(Table, {SeqNum, BufferedPacket}) - end. -% -% This will throw a GotSplitPacketException when a full -% split packet is constructed. -% -% SharedBuffer IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable) -%{ -% u32 headersize = BASE_HEADER_SIZE + 7; -% assert(p.data.getSize() >= headersize); -% u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]); -% assert(type == TYPE_SPLIT); -% u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); -% u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]); -% u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]); -% -% // Add if doesn't exist -% if(m_buf.find(seqnum) == NULL) -% { -% IncomingSplitPacket *sp = new IncomingSplitPacket(); -% sp->chunk_count = chunk_count; -% sp->reliable = reliable; -% m_buf[seqnum] = sp; -% } -% -% IncomingSplitPacket *sp = m_buf[seqnum]; -% -% // TODO: These errors should be thrown or something? Dunno. -% if(chunk_count != sp->chunk_count) -% derr_con<<"Connection: WARNING: chunk_count="<chunk_count="<chunk_count -% <reliable) -% derr_con<<"Connection: WARNING: reliable="<reliable="<reliable -% <chunks.find(chunk_num) != NULL) -% throw AlreadyExistsException("Chunk already in buffer"); -% -% // Cut chunk data out of packet -% u32 chunkdatasize = p.data.getSize() - headersize; -% SharedBuffer chunkdata(chunkdatasize); -% memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize); -% -% // Set chunk data in buffer -% sp->chunks[chunk_num] = chunkdata; -% -% // If not all chunks are received, return empty buffer -% if(sp->allReceived() == false) -% return SharedBuffer(); -% -% // Calculate total size -% u32 totalsize = 0; -% core::map >::Iterator i; -% i = sp->chunks.getIterator(); -% for(; i.atEnd() == false; i++) -% { -% totalsize += i.getNode()->getValue().getSize(); -% } -% -% SharedBuffer fulldata(totalsize); -% -% // Copy chunks to data buffer -% u32 start = 0; -% for(u32 chunk_i=0; chunk_ichunk_count; -% chunk_i++) -% { -% SharedBuffer buf = sp->chunks[chunk_i]; -% u16 chunkdatasize = buf.getSize(); -% memcpy(&fulldata[start], *buf, chunkdatasize); -% start += chunkdatasize;; -% } -% -% // Remove sp from buffer -% m_buf.remove(seqnum); -% delete sp; -% -% return fulldata; -% } - -removeUnreliableTimedOuts(_Table) -> ok. - -% void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) -% { -% core::list remove_queue; -% core::map::Iterator i; -% i = m_buf.getIterator(); -% for(; i.atEnd() == false; i++) -% { -% IncomingSplitPacket *p = i.getNode()->getValue(); -% // Reliable ones are not removed by timeout -% if(p->reliable == true) -% continue; -% p->time += dtime; -% if(p->time >= timeout) -% remove_queue.push_back(i.getNode()->getKey()); -% } -% core::list::Iterator j; -% j = remove_queue.begin(); -% for(; j != remove_queue.end(); j++) -% { -% dout_con<<"NOTE: Removing timed out unreliable split packet" -% < +%% + +-module(peer_packet_buffers). +-include("connection.hrl"). + +-export([init/1, insert/3, removeUnreliableTimedOuts/1]). + + +init(Table) -> + ets:new(Table, [public, named_table]), + ok. + +% TODO: Split buffer need to more learning and understanding + +insert(Table, BufferedPacket, _Reliable) -> + _HeaderSize = ?BASE_HEADER_SIZE + 7, + <<_Header:?BASE_HEADER_SIZE, _Type:?U8, SeqNum:?U16, _ChunkCount:?U16, + _ChunkNum:?U16, _Other/binary>> = BufferedPacket, + case ets:lookup(Table, SeqNum) of + [{SeqNum, _FindPacket}] -> + io:format("IncomingSplitPacket with SeqNum=~p is found + in ReliablePacketBuffer~n",[SeqNum]); + [] -> + ets:insert(Table, {SeqNum, BufferedPacket}) + end. +% +% This will throw a GotSplitPacketException when a full +% split packet is constructed. +% +% SharedBuffer IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable) +%{ +% u32 headersize = BASE_HEADER_SIZE + 7; +% assert(p.data.getSize() >= headersize); +% u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]); +% assert(type == TYPE_SPLIT); +% u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); +% u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]); +% u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]); +% +% // Add if doesn't exist +% if(m_buf.find(seqnum) == NULL) +% { +% IncomingSplitPacket *sp = new IncomingSplitPacket(); +% sp->chunk_count = chunk_count; +% sp->reliable = reliable; +% m_buf[seqnum] = sp; +% } +% +% IncomingSplitPacket *sp = m_buf[seqnum]; +% +% // TODO: These errors should be thrown or something? Dunno. +% if(chunk_count != sp->chunk_count) +% derr_con<<"Connection: WARNING: chunk_count="<chunk_count="<chunk_count +% <reliable) +% derr_con<<"Connection: WARNING: reliable="<reliable="<reliable +% <chunks.find(chunk_num) != NULL) +% throw AlreadyExistsException("Chunk already in buffer"); +% +% // Cut chunk data out of packet +% u32 chunkdatasize = p.data.getSize() - headersize; +% SharedBuffer chunkdata(chunkdatasize); +% memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize); +% +% // Set chunk data in buffer +% sp->chunks[chunk_num] = chunkdata; +% +% // If not all chunks are received, return empty buffer +% if(sp->allReceived() == false) +% return SharedBuffer(); +% +% // Calculate total size +% u32 totalsize = 0; +% core::map >::Iterator i; +% i = sp->chunks.getIterator(); +% for(; i.atEnd() == false; i++) +% { +% totalsize += i.getNode()->getValue().getSize(); +% } +% +% SharedBuffer fulldata(totalsize); +% +% // Copy chunks to data buffer +% u32 start = 0; +% for(u32 chunk_i=0; chunk_ichunk_count; +% chunk_i++) +% { +% SharedBuffer buf = sp->chunks[chunk_i]; +% u16 chunkdatasize = buf.getSize(); +% memcpy(&fulldata[start], *buf, chunkdatasize); +% start += chunkdatasize;; +% } +% +% // Remove sp from buffer +% m_buf.remove(seqnum); +% delete sp; +% +% return fulldata; +% } + +removeUnreliableTimedOuts(_Table) -> ok. + +% void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) +% { +% core::list remove_queue; +% core::map::Iterator i; +% i = m_buf.getIterator(); +% for(; i.atEnd() == false; i++) +% { +% IncomingSplitPacket *p = i.getNode()->getValue(); +% // Reliable ones are not removed by timeout +% if(p->reliable == true) +% continue; +% p->time += dtime; +% if(p->time >= timeout) +% remove_queue.push_back(i.getNode()->getKey()); +% } +% core::list::Iterator j; +% j = remove_queue.begin(); +% for(; j != remove_queue.end(); j++) +% { +% dout_con<<"NOTE: Removing timed out unreliable split packet" +% < -%% --module(connection_reliable_packet_buffer). --include("connection.hrl"). - --export([init/1, insert/2, findPacket/2, print/1, empty/1, - size/1, notFound/1, getFirstSeqnum/1, - popFirst/1, popSeqnum/2, - incrementTimedOuts/2, resetTimedOuts/2, - anyTotaltimeReached/2, getTimedOuts/2]). - --export([test_insert/1, test_findPacket/1, test_increment/0]). - -%-define(TABLE_ID, ?MODULE). - -init(Table) -> - ets:new(Table, [public, named_table]). - -insert(Table, #buffered_packet{data=BufferedPacket, time=_Time, totaltime=_TotalTime, address = _Address} = BPrec) -> - <<_Header:?BASE_HEADER_SIZE/binary, Type:?U8, SeqNum:?U16,_Other/binary>> = BufferedPacket, - io:format("Type=~p, SeqNum=~p~n",[Type,SeqNum]), - case ets:lookup(Table, SeqNum) of - [{SeqNum, FindPacket}] -> - io:format("Reliable packet with SeqNum=~p is found in ReliablePacketBuffer~n",[SeqNum]), - {exists,FindPacket}; - [] -> ets:insert(Table, {SeqNum, BPrec}), - {ok,BPrec} - end. - -test_insert(Table) -> - insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,00,34,77,77,77,77]), time=0, totaltime=0}), - insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,01,11,77,77,77,77]), time=0, totaltime=0}), - insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,02,62,77,77,77,77]), time=0, totaltime=0}), - insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,67,02,77,77,77,77]), time=0, totaltime=0}), - insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,00,10,77,77,77,77]), time=0, totaltime=0}). - - -findPacket(Table, SeqNum) -> - case ets:lookup(Table, SeqNum) of - [{SeqNum, FindPacket}] -> {ok, FindPacket}; - []-> {error, not_found} - end. - -test_findPacket(Table)-> - findPacket(Table, 15874). % 8704, 2817, 579 - -print(Table) -> - ets:foldl(fun({SeqNum, Packet},AccIn) -> - io:format("SeqNum=~p: ~p ~n", [SeqNum,Packet]), - AccIn - end, end_list, Table). - -empty(Table) -> - Size = ets:info(Table, size), - case Size of - 0 -> true; - _ -> Size - end. - -size(Table) -> - ets:info(Table, size). - -% TODO Check the returning value -notFound(Table) -> - ets:last(Table). - -%RPBSearchResult ReliablePacketBuffer::notFound() -%{ -% return m_list.end(); -%} - -getFirstSeqnum(Table) -> - ets:first(Table). - -popFirst(Table) -> - FirstSeqNum = ets:first(Table), - #buffered_packet{data=FirstPacket} = ets:lookup(Table, FirstSeqNum), - <<_Header:?BASE_HEADER_SIZE,_Type:?U8,SeqNum:?U16,_Other/binary>> = FirstPacket, - ets:delete(Table, SeqNum), - #buffered_packet{data=FirstPacket}. - -popSeqnum(Table, SeqNum) -> - FindPacket = ets:lookup(Table, SeqNum), - ets:delete(Table, SeqNum), - FindPacket. - -incrementTimedOuts(Table, Dtime) -> - ets:foldl(fun({SeqNum, Packet},AccIn) -> - #buffered_packet{data = Data, time = Time, totaltime = TotalTime, address = Address} = Packet, - ets:update_element(Table, SeqNum, - {2,#buffered_packet{data=Data,time = Time + Dtime, totaltime = TotalTime + Dtime, address = Address}}), - AccIn - end, end_list, Table). - -test_increment()-> - init(rpb_test), - test_insert(rpb_test), - print(rpb_test), - incrementTimedOuts(rpb_test, 35), - print(rpb_test). - -resetTimedOuts(Table, Timeout) -> - ets:foldl(fun({SeqNum, Packet},AccIn) -> - #buffered_packet{data = Data, time = Time, totaltime = TotalTime, address = Address} = Packet, - if Time >= Timeout -> - ets:update_element(Table, SeqNum, - {2,#buffered_packet{data=Data,time = 0, totaltime = TotalTime, address = Address}}); - true -> ok - end, - AccIn - end, end_list, Table). - - -anyTotaltimeReached(Table, Timeout) -> - %ets:select(Table, ets:fun2ms(fun({SeqNum,#buffered_packet{totaltime = TotalTime}}) when TotalTime >= Timeout -> true end)). - %ets:select(Table, - % ets:fun2ms(fun({Seq,#buffered_packet{data = _, time = _, totaltime = TotalTime, address = _}}) -> TotalTime end)). - - %TODO This construction need to refactoring - case ets:select(Table, [{{'_',#buffered_packet{data='_',time='_',totaltime='$1',address='_'}},[{'>=','$1',Timeout}],['$1']}]) of - [] -> false; - _ -> true % or we must check list size - end. - -%bool ReliablePacketBuffer::anyTotaltimeReached(float timeout) -%{ -% core::list::Iterator i; -% i = m_list.begin(); -% for(; i != m_list.end(); i++){ -% if(i->totaltime >= timeout) -% return true; -% } -% return false; -%} - - -getTimedOuts(Table, Timeout) -> - % TODO - Check the returning value: list of buffered_packet, or SeqNum - ets:select(Table, [{{'$1',#buffered_packet{data='_',time='$2',totaltime='_',address='_'}},[{'>=','$2',Timeout}],['$1']}]). - -% core::list ReliablePacketBuffer::getTimedOuts(float timeout) -%{ -% core::list timed_outs; -% core::list::Iterator i; -% i = m_list.begin(); -% for(; i != m_list.end(); i++) -% { -% if(i->time >= timeout) -% timed_outs.push_back(*i); -% } -% return timed_outs; -%} +%% +%% erlmines - The minetest server written in Erlang +%% Copyright (C) 2012 hummermania, Markevych Alexander +%% +-module(peer_reliable_packet_buffer). +-include("connection.hrl"). + +-export([init/1, insert/2, findPacket/2, print/1, empty/1, + size/1, notFound/1, getFirstSeqnum/1, + popFirst/1, popSeqnum/2, + incrementTimedOuts/2, resetTimedOuts/2, + anyTotaltimeReached/2, getTimedOuts/2]). + +-export([test_insert/1, test_findPacket/1, test_increment/0]). + +%-define(TABLE_ID, ?MODULE). + +init(Table) -> + ets:new(Table, [public, named_table]). + +insert(Table, #buffered_packet{data=BufferedPacket, time=_Time, totaltime=_TotalTime, address = _Address} = BPrec) -> + <<_Header:?BASE_HEADER_SIZE/binary, Type:?U8, SeqNum:?U16,_Other/binary>> = BufferedPacket, + io:format("Type=~p, SeqNum=~p~n",[Type,SeqNum]), + case ets:lookup(Table, SeqNum) of + [{SeqNum, FindPacket}] -> + io:format("Reliable packet with SeqNum=~p is found in ReliablePacketBuffer~n",[SeqNum]), + {exists,FindPacket}; + [] -> ets:insert(Table, {SeqNum, BPrec}), + {ok,BPrec} + end. + +test_insert(Table) -> + insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,00,34,77,77,77,77]), time=0, totaltime=0}), + insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,01,11,77,77,77,77]), time=0, totaltime=0}), + insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,02,62,77,77,77,77]), time=0, totaltime=0}), + insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,67,02,77,77,77,77]), time=0, totaltime=0}), + insert(Table, #buffered_packet{data=list_to_binary([1,2,3,4,5,6,7,33,00,10,77,77,77,77]), time=0, totaltime=0}). + + +findPacket(Table, SeqNum) -> + case ets:lookup(Table, SeqNum) of + [{SeqNum, FindPacket}] -> {ok, FindPacket}; + []-> {error, not_found} + end. + +test_findPacket(Table)-> + findPacket(Table, 15874). % 8704, 2817, 579 + +print(Table) -> + ets:foldl(fun({SeqNum, Packet},AccIn) -> + io:format("SeqNum=~p: ~p ~n", [SeqNum,Packet]), + AccIn + end, end_list, Table). + +empty(Table) -> + Size = ets:info(Table, size), + case Size of + 0 -> true; + _ -> Size + end. + +size(Table) -> + ets:info(Table, size). + +% TODO Check the returning value +notFound(Table) -> + ets:last(Table). + +%RPBSearchResult ReliablePacketBuffer::notFound() +%{ +% return m_list.end(); +%} + +getFirstSeqnum(Table) -> + ets:first(Table). + +popFirst(Table) -> + FirstSeqNum = ets:first(Table), + #buffered_packet{data=FirstPacket} = ets:lookup(Table, FirstSeqNum), + <<_Header:?BASE_HEADER_SIZE,_Type:?U8,SeqNum:?U16,_Other/binary>> = FirstPacket, + ets:delete(Table, SeqNum), + #buffered_packet{data=FirstPacket}. + +popSeqnum(Table, SeqNum) -> + FindPacket = ets:lookup(Table, SeqNum), + ets:delete(Table, SeqNum), + FindPacket. + +incrementTimedOuts(Table, Dtime) -> + ets:foldl(fun({SeqNum, Packet},AccIn) -> + #buffered_packet{data = Data, time = Time, totaltime = TotalTime, address = Address} = Packet, + ets:update_element(Table, SeqNum, + {2,#buffered_packet{data=Data,time = Time + Dtime, totaltime = TotalTime + Dtime, address = Address}}), + AccIn + end, end_list, Table). + +test_increment()-> + init(rpb_test), + test_insert(rpb_test), + print(rpb_test), + incrementTimedOuts(rpb_test, 35), + print(rpb_test). + +resetTimedOuts(Table, Timeout) -> + ets:foldl(fun({SeqNum, Packet},AccIn) -> + #buffered_packet{data = Data, time = Time, totaltime = TotalTime, address = Address} = Packet, + if Time >= Timeout -> + ets:update_element(Table, SeqNum, + {2,#buffered_packet{data=Data,time = 0, totaltime = TotalTime, address = Address}}); + true -> ok + end, + AccIn + end, end_list, Table). + + +anyTotaltimeReached(Table, Timeout) -> + %ets:select(Table, ets:fun2ms(fun({SeqNum,#buffered_packet{totaltime = TotalTime}}) when TotalTime >= Timeout -> true end)). + %ets:select(Table, + % ets:fun2ms(fun({Seq,#buffered_packet{data = _, time = _, totaltime = TotalTime, address = _}}) -> TotalTime end)). + + %TODO This construction need to refactoring + case ets:select(Table, [{{'_',#buffered_packet{data='_',time='_',totaltime='$1',address='_'}},[{'>=','$1',Timeout}],['$1']}]) of + [] -> false; + _ -> true % or we must check list size + end. + +%bool ReliablePacketBuffer::anyTotaltimeReached(float timeout) +%{ +% core::list::Iterator i; +% i = m_list.begin(); +% for(; i != m_list.end(); i++){ +% if(i->totaltime >= timeout) +% return true; +% } +% return false; +%} + + +getTimedOuts(Table, Timeout) -> + % TODO - Check the returning value: list of buffered_packet, or SeqNum + ets:select(Table, [{{'$1',#buffered_packet{data='_',time='$2',totaltime='_',address='_'}},[{'>=','$2',Timeout}],['$1']}]). + +% core::list ReliablePacketBuffer::getTimedOuts(float timeout) +%{ +% core::list timed_outs; +% core::list::Iterator i; +% i = m_list.begin(); +% for(; i != m_list.end(); i++) +% { +% if(i->time >= timeout) +% timed_outs.push_back(*i); +% } +% return timed_outs; +%}