lock code changed

master
subhra74 2022-08-01 20:37:18 +05:30
parent bd69b8cc2f
commit ca11cdc302
6 changed files with 154 additions and 57 deletions

View File

@ -34,6 +34,8 @@ namespace XDM.Core.Downloader.Adaptive
public string Id { get; private set; }
public virtual long FileSize => this._state.FileSize;
public virtual double Duration => this._state.Duration;
protected ReaderWriterLockSlim rwLock = new(LockRecursionPolicy.SupportsRecursion);
public ReaderWriterLockSlim Lock => this.rwLock;
public FileNameFetchMode FileNameFetchMode
{
get { return _fileNameFetchMode; }
@ -231,14 +233,14 @@ namespace XDM.Core.Downloader.Adaptive
protected virtual void SaveChunkState()
{
if (_chunks == null) return;
lock (this)
try
{
rwLock.EnterWriteLock();
TransactedIO.WriteStream("chunks.db", _state.TempDirectory, ChunkStateToBytes);
//TransactedIO.WriteBytes(ChunkStateToBytes(), "chunks.db", _state.TempDirectory);
//TransactedIO.Write(JsonConvert.SerializeObject(_chunks), "chunks.json", _state.TempDirectory);
//File.WriteAllText(Path.Combine(_state.TempDirectory, "chunks.json"),
// JsonConvert.SerializeObject(_chunks));
}
finally
{
rwLock.ExitWriteLock();
}
}
@ -440,8 +442,9 @@ namespace XDM.Core.Downloader.Adaptive
private void ChunkDataReceived(object sender, ChunkDownloadedEventArgs args)
{
lock (this)
try
{
rwLock.EnterWriteLock();
long tick = Helpers.TickCount();
totalDownloadedBytes += args.Downloaded;
downloadedBytesSinceStartOrResume += args.Downloaded;
@ -469,14 +472,23 @@ namespace XDM.Core.Downloader.Adaptive
}
this.ThrottleIfNeeded();
}
finally
{
rwLock.ExitWriteLock();
}
}
private void MimeTypeReceived(object sender, MimeTypeReceivedEventArgs args)
{
lock (this)
try
{
rwLock.EnterWriteLock();
this.OnContentTypeReceived(args.Chunk, args.MimeType);
}
finally
{
rwLock.ExitWriteLock();
}
}
//private int CreateFileList()
@ -806,8 +818,9 @@ namespace XDM.Core.Downloader.Adaptive
public void UpdateSpeedLimit(bool enable, int limit)
{
lock (_state)
try
{
rwLock.EnterWriteLock();
if (!enable)
{
limit = 0;
@ -815,12 +828,11 @@ namespace XDM.Core.Downloader.Adaptive
_state.SpeedLimit = limit;
SaveState();
}
finally
{
rwLock.ExitWriteLock();
}
}
//protected static string GuessContainerFormatFromContentType(string contentType)
//{
// return
//}
}
public class MultiSourceChunk : Chunk

View File

@ -1,5 +1,5 @@
using System;
using System.Threading;
using XDM.Core;
namespace XDM.Core.Downloader
@ -14,6 +14,7 @@ namespace XDM.Core.Downloader
public string Type { get; }
public FileNameFetchMode FileNameFetchMode { get; }
public Uri? PrimaryUrl { get; }
public ReaderWriterLockSlim Lock { get; }
event EventHandler Probed;
event EventHandler Finished;

View File

@ -167,9 +167,10 @@ namespace XDM.Core.Downloader.Progressive.DualHttp
public override void PieceConnected(string pieceId, ProbeResult? result)
{
lock (this)
if (this.cancelFlag.IsCancellationRequested) return;
try
{
if (this.cancelFlag.IsCancellationRequested) return;
rwLock.EnterWriteLock();
var piece = this.pieces[pieceId];
if (result != null) //probe result is not null only for first request of each stream type, for subsequent requests its always null
{
@ -261,9 +262,13 @@ namespace XDM.Core.Downloader.Progressive.DualHttp
throw new AssembleFailedException(ErrorCode.DiskError);
}
}
CreatePiece();
}
finally
{
rwLock.ExitWriteLock();
}
CreatePiece();
}
public override (
@ -298,11 +303,16 @@ namespace XDM.Core.Downloader.Progressive.DualHttp
protected override void SaveChunkState()
{
lock (this)
try
{
rwLock.EnterWriteLock();
if (pieces.Count == 0) return;
TransactedIO.WriteStream("chunks.db", state!.TempDir!, base.ChunkStateToBytes);
}
finally
{
rwLock.ExitWriteLock();
}
}
protected override void SaveState()
@ -313,12 +323,6 @@ namespace XDM.Core.Downloader.Progressive.DualHttp
public override void RestoreState()
{
state = DownloadStateIO.LoadDualSourceHTTPDownloaderState(Id!);
//var bytes = TransactedIO.ReadBytes(Id + ".state", Config.DataDir);
//if (bytes == null)
//{
// throw new FileNotFoundException(Path.Combine(Config.DataDir, Id + ".state"));
//}
//state = DownloadStateStore.DualSourceHTTPDownloaderStateFromBytes(bytes);
try
{
if (!TransactedIO.ReadStream("chunks.db", state!.TempDir!, s =>
@ -328,12 +332,6 @@ namespace XDM.Core.Downloader.Progressive.DualHttp
{
throw new FileNotFoundException(Path.Combine(state.TempDir, "chunks.db"));
}
//var chunkBytes = TransactedIO.ReadBytes("chunks.db", state.TempDir);
//if (chunkBytes == null)
//{
// throw new FileNotFoundException(Path.Combine(state.TempDir, "chunks.json"));
//}
//pieces = ChunkStateFromBytes(chunkBytes);
}
catch
{
@ -347,8 +345,9 @@ namespace XDM.Core.Downloader.Progressive.DualHttp
{
Log.Debug("Assembling..." + this.Id);
lock (this)
try
{
rwLock.EnterWriteLock();
#if NET35
var buf = new byte[5 * 1024 * 1024];
#else
@ -463,6 +462,10 @@ namespace XDM.Core.Downloader.Progressive.DualHttp
#endif
}
}
finally
{
rwLock.ExitWriteLock();
}
}
private void AssemblePieces(IList<Piece> pieces, FileStream outfs, ref byte[] buf, ref long totalBytes)

View File

@ -8,6 +8,7 @@ using XDM.Core.Util;
using XDM.Core.Clients.Http;
using XDM.Core.MediaProcessor;
using System.Text;
using System.Threading;
#if !NET5_0_OR_GREATER
using XDM.Compatibility;
@ -35,6 +36,8 @@ namespace XDM.Core.Downloader.Progressive
protected SpeedLimiter speedLimiter = new();
protected BaseMediaProcessor? mediaProcessor;
protected long downloadedBytesSinceStartOrResume = 0L;
protected ReaderWriterLockSlim rwLock = new(LockRecursionPolicy.SupportsRecursion);
public ReaderWriterLockSlim Lock => this.rwLock;
private bool stopRequested = false;
public FileNameFetchMode FileNameFetchMode
@ -157,8 +160,9 @@ namespace XDM.Core.Downloader.Progressive
public virtual bool ContinueAdjacentPiece(string pieceId, long maxByteRange)
{
lock (this)
try
{
rwLock.EnterWriteLock();
var chunk = pieces[pieceId];
var position = chunk.Offset + chunk.Length;
foreach (var key in this.pieces.Keys)
@ -188,14 +192,23 @@ namespace XDM.Core.Downloader.Progressive
}
return false;
}
finally
{
rwLock.ExitWriteLock();
}
}
public virtual Piece GetPiece(string pieceId)
{
lock (this)
try
{
rwLock.EnterReadLock();
return this.pieces[pieceId];
}
finally
{
rwLock.ExitReadLock();
}
}
public virtual string GetPieceFile(string pieceId)
@ -214,9 +227,10 @@ namespace XDM.Core.Downloader.Progressive
public void PieceDownloadFailed(string pieceId, ErrorCode error)
{
lock (this)
if (this.cancelFlag.IsCancellationRequested) return;
try
{
if (this.cancelFlag.IsCancellationRequested) return;
rwLock.EnterWriteLock();
grabberDict.Remove(pieceId);
this.SaveChunkState();
if (grabberDict.Count == 0)
@ -224,13 +238,18 @@ namespace XDM.Core.Downloader.Progressive
OnFailed(error);
}
}
finally
{
rwLock.ExitWriteLock();
}
}
public virtual void PieceDownloadFinished(string pieceId)
{
lock (this)
if (this.cancelFlag.IsCancellationRequested) return;
try
{
if (this.cancelFlag.IsCancellationRequested) return;
rwLock.EnterWriteLock();
var piece = this.pieces[pieceId];
piece.State = SegmentState.Finished;
grabberDict.Remove(pieceId);
@ -243,13 +262,18 @@ namespace XDM.Core.Downloader.Progressive
return;
}
}
finally
{
rwLock.ExitWriteLock();
}
this.CreatePiece();
}
public virtual void UpdateDownloadedBytesCount(string pieceId, long bytes)
{
lock (this)
try
{
rwLock.EnterWriteLock();
totalDownloadedBytes += bytes;
downloadedBytesSinceStartOrResume += bytes;
@ -274,16 +298,21 @@ namespace XDM.Core.Downloader.Progressive
progressResult.DownloadSpeed = instantSpeed;
progressResult.Downloaded = totalDownloadedBytes;
progressResult.Progress = FileSize > 0 ? (int)(totalDownloadedBytes * 100 / FileSize) : 0;
progressResult.Eta = FileSize > 0 ? (long)Math.Ceiling((FileSize - totalDownloadedBytes) / avgSpeed /*progressResult.DownloadSpeed*/) : 0;
progressResult.Eta = FileSize > 0 ? (long)Math.Ceiling((FileSize - totalDownloadedBytes) / avgSpeed) : 0;
ProgressChanged?.Invoke(this, progressResult);
}
}
finally
{
rwLock.ExitWriteLock();
}
}
protected virtual void CreatePiece()
{
lock (this)
try
{
rwLock.EnterWriteLock();
if (this.cancelFlag.IsCancellationRequested) return;
if (this.grabberDict.Count == MAX_COUNT) return;
var rem = MAX_COUNT - this.grabberDict.Count;
@ -300,24 +329,34 @@ namespace XDM.Core.Downloader.Progressive
}
SaveChunkState();
}
finally
{
rwLock.ExitWriteLock();
}
}
protected virtual bool AllFinished()
{
lock (this)
try
{
rwLock.EnterReadLock();
foreach (var pi in this.pieces.Keys)
{
if (this.pieces[pi].State != SegmentState.Finished) return false;
}
return true;
}
finally
{
rwLock.ExitReadLock();
}
}
protected int RetryFailedPieces(int max)
{
lock (this)
try
{
rwLock.EnterWriteLock();
var count = 0;
for (var chunkToRetry = Math.Min(GetInactiveChunkCount(), max); chunkToRetry > 0; chunkToRetry--)
{
@ -329,21 +368,31 @@ namespace XDM.Core.Downloader.Progressive
}
return count;
}
finally
{
rwLock.ExitWriteLock();
}
}
protected int GetInactiveChunkCount()
{
lock (this)
try
{
rwLock.EnterReadLock();
return pieces.Keys.Where(chunkId => !(grabberDict.ContainsKey(chunkId)
|| pieces[chunkId].State == SegmentState.Finished)).Count();
|| pieces[chunkId].State == SegmentState.Finished)).Count();
}
finally
{
rwLock.ExitReadLock();
}
}
protected string? GetInactivePiece()
{
lock (this)
try
{
rwLock.EnterReadLock();
foreach (var chunkId in pieces.Keys)
{
if (!(grabberDict.ContainsKey(chunkId) || pieces[chunkId].State == SegmentState.Finished
@ -351,12 +400,17 @@ namespace XDM.Core.Downloader.Progressive
}
return null;
}
finally
{
rwLock.ExitReadLock();
}
}
protected string? FindMaxChunk()
{
lock (this)
try
{
rwLock.EnterReadLock();
var max = -1L;
string? pid = null;
foreach (var pieceId in this.pieces.Keys)
@ -371,12 +425,17 @@ namespace XDM.Core.Downloader.Progressive
}
return max > 256 * 1024 ? pid : null;
}
finally
{
rwLock.ExitReadLock();
}
}
protected string? SplitPiece(string chunkId)
{
lock (this)
try
{
rwLock.EnterWriteLock();
if (chunkId != null && !this.cancelFlag.IsCancellationRequested)
{
var chunk = pieces[chunkId];
@ -404,6 +463,10 @@ namespace XDM.Core.Downloader.Progressive
}
return null;
}
finally
{
rwLock.ExitWriteLock();
}
}
protected virtual void OnStarted()

View File

@ -152,11 +152,15 @@ namespace XDM.Core.Downloader.Progressive.SingleHttp
protected override void SaveChunkState()
{
lock (this)
try
{
rwLock.EnterWriteLock();
if (pieces.Count == 0) return;
TransactedIO.WriteStream("chunks.db", state!.TempDir!, base.ChunkStateToBytes);
//TransactedIO.WriteBytes(ChunkStateToBytes(), "chunks.db", state.TempDir);
}
finally
{
rwLock.ExitWriteLock();
}
}
@ -222,9 +226,10 @@ namespace XDM.Core.Downloader.Progressive.SingleHttp
public override void PieceConnected(string pieceId, ProbeResult? result)
{
lock (this)
if (this.cancelFlag.IsCancellationRequested) return;
try
{
if (this.cancelFlag.IsCancellationRequested) return;
rwLock.EnterWriteLock();
if (result != null) //probe result is not null for first request only, for subsequent requests its always null
{
Log.Debug("connected: " + result.ResourceSize + " init...");
@ -278,8 +283,12 @@ namespace XDM.Core.Downloader.Progressive.SingleHttp
throw new AssembleFailedException(ErrorCode.DiskError);
}
}
CreatePiece();
}
finally
{
rwLock.ExitWriteLock();
}
CreatePiece();
}
private List<Piece> SortAndValidatePieces()
@ -320,9 +329,9 @@ namespace XDM.Core.Downloader.Progressive.SingleHttp
protected override void AssemblePieces()
{
Log.Debug("Assembling...");
lock (this)
try
{
rwLock.EnterWriteLock();
try
{
var pieces = SortAndValidatePieces();
@ -466,6 +475,10 @@ namespace XDM.Core.Downloader.Progressive.SingleHttp
throw new AssembleFailedException(ex is DownloadException de ? de.ErrorCode : ErrorCode.Generic);
}
}
finally
{
rwLock.ExitWriteLock();
}
}
public override (Dictionary<string, List<string>> Headers,

View File

@ -53,8 +53,9 @@ namespace XDM.Core.Downloader
lastTick = Helpers.TickCount();
return;
}
lock (downloader)
try
{
downloader.Lock.EnterWriteLock();
var maxBytesPerMS = (double)speedLimit * 1024 / 1000;
var now = Helpers.TickCount();
var actualTimeSpent = now - lastTick;
@ -74,6 +75,10 @@ namespace XDM.Core.Downloader
catch (Exception ex) { Log.Debug(ex, "Exception while throttling"); }
}
}
finally
{
downloader.Lock.ExitWriteLock();
}
}
private void sleep(int interval)