From 7961a24c675ad2f57cc0ed86828e0e9fe79f3216 Mon Sep 17 00:00:00 2001 From: "Yanhong.Ma" Date: Thu, 11 Aug 2022 15:29:46 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E7=BA=BF=E7=A8=8B=E6=B1=A0?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Example/Program.cs | 34 ++++++++++++----- src/IoTSharp.Data.Taos/ConcurrentTaosQueue.cs | 24 +++++++++--- src/IoTSharp.Data.Taos/TaosCommand.cs | 25 ++++++++++--- src/IoTSharp.Data.Taos/TaosConnection.cs | 8 ++-- .../TaosConnectionStringBuilder.cs | 37 +++++++++++++++---- src/IoTSharp.Data.Taos/TaosDataReader.cs | 11 +++--- 6 files changed, 104 insertions(+), 35 deletions(-) diff --git a/src/Example/Program.cs b/src/Example/Program.cs index 6742483..f1d48c0 100644 --- a/src/Example/Program.cs +++ b/src/Example/Program.cs @@ -85,19 +85,27 @@ static void Main(string[] args) connection.CreateCommand("CREATE TABLE IF NOT EXISTS telemetrydata (ts timestamp,value_type tinyint, value_boolean bool, value_string binary(10240), value_long bigint,value_datetime timestamp,value_double double) TAGS (deviceid binary(32),keyname binary(64));").ExecuteNonQuery(); var devid1 = $"{Guid.NewGuid():N}"; var devid2 = $"{Guid.NewGuid():N}"; + var devid3 = $"{Guid.NewGuid():N}"; + var devid4 = $"{Guid.NewGuid():N}"; DateTime dt = DateTime.Now; - UploadTelemetryData(connection, devid1, "1#air-compressor-two-level-discharge-temperature", 4000); - UploadTelemetryData(connection, devid2, "1#air-compressor-load-rate", 4000); - + UploadTelemetryData(connection, devid1, "1#air-compressor-two-level-discharge-temperature", 5000); + UploadTelemetryData(connection, devid2, "1#air-compressor-load-rate", 5000); + Console.WriteLine(""); + Console.WriteLine(""); + Console.WriteLine(""); DateTime dt2 = DateTime.Now; - UploadTelemetryDataPool(connection, devid1, "1#air-compressor-two-level-discharge-temperature1", 4000); - UploadTelemetryDataPool(connection, devid2, "1#air-compressor-load-rate1", 4000); - Console.WriteLine($"UploadTelemetryData 耗时:{DateTime.Now.Subtract(dt).TotalSeconds}"); - Console.WriteLine($"UploadTelemetryDataPool 耗时:{DateTime.Now.Subtract(dt2).TotalSeconds}"); + UploadTelemetryDataPool(connection, devid3, "1#air-compressor-two-level-discharge-temperature1", 5000); + UploadTelemetryDataPool(connection, devid4, "1#air-compressor-load-rate1", 5000); + var t1 = DateTime.Now.Subtract(dt).TotalSeconds; + var t2 = DateTime.Now.Subtract(dt2).TotalSeconds; + Console.WriteLine("Done"); + Thread.Sleep(TimeSpan.FromSeconds(1)); + Console.WriteLine($"UploadTelemetryData 耗时:{t1}"); + Console.WriteLine($"UploadTelemetryDataPool 耗时:{t2}"); + Thread.Sleep(TimeSpan.FromSeconds(2)); var reader2 = connection.CreateCommand("select last_row(*) from telemetrydata group by deviceid,keyname ;").ExecuteReader(); ConsoleTableBuilder.From(reader2.ToDataTable()).WithFormat(ConsoleTableBuilderFormat.Default).ExportAndWriteLine(); var reader3 = connection.CreateCommand("select * from telemetrydata").ExecuteReader(); - List list = new List(); while (reader3.Read()) { @@ -258,7 +266,15 @@ static void UploadTelemetryDataPool(TaosConnection connection, string devid, str { Parallel.For(0, count,new ParallelOptions() { MaxDegreeOfParallelism=connection.PoolSize }, i => { - connection.CreateCommand($"INSERT INTO device_{devid} USING telemetrydata TAGS(\"{devid}\",\"{keyname}\") values (now,2,true,'{i}',{i},now,{i});").ExecuteNonQuery(); + try + { + connection.CreateCommand($"INSERT INTO device_{devid} USING telemetrydata TAGS(\"{devid}\",\"{keyname}\") values (now,2,true,'{i}',{i},now,{i});").ExecuteNonQuery(); + Console.WriteLine($"线程:{Thread.CurrentThread.ManagedThreadId} 第{i}条数据, OK"); + } + catch (Exception ex) + { + Console.WriteLine($"线程:{Thread.CurrentThread.ManagedThreadId} 第{i}条数据, {ex.Message}"); + } }); } } diff --git a/src/IoTSharp.Data.Taos/ConcurrentTaosQueue.cs b/src/IoTSharp.Data.Taos/ConcurrentTaosQueue.cs index 64293ae..2fb9662 100644 --- a/src/IoTSharp.Data.Taos/ConcurrentTaosQueue.cs +++ b/src/IoTSharp.Data.Taos/ConcurrentTaosQueue.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; @@ -26,10 +27,10 @@ public void Return(IntPtr client) { Monitor.Enter(TaosQueue); TaosQueue.Enqueue(client); - System.Diagnostics.Debug.WriteLine($"TaosQueue Return:{client}"); + Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 归还 {client}"); Monitor.Pulse(TaosQueue); Monitor.Exit(TaosQueue); - + Thread.Sleep(0); } int _ref = 0; public void AddRef() @@ -50,16 +51,29 @@ public void RemoveRef() _ref--; } } + public int Timeout { get; set; } public IntPtr Take() { + IntPtr client = IntPtr.Zero; Monitor.Enter(TaosQueue); if (TaosQueue.IsEmpty) { - Monitor.Wait(TaosQueue); + Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 连接池已空,请等待 超时时长:{Timeout}"); + Monitor.Wait(TaosQueue, TimeSpan.FromSeconds(Timeout)); + } + if (!TaosQueue.TryDequeue(out client)) + { + Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 从连接池获取连接失败,等待并重试"); + } + else + { + Debug.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 拿走 {client}"); } - TaosQueue.TryDequeue(out var client); - System.Diagnostics.Debug.WriteLine($"TaosQueue Take:{client}"); Monitor.Exit(TaosQueue); + if (client == IntPtr.Zero) + { + throw new TimeoutException($"Connection pool is empty and wait time out({Timeout}s)!"); + } return client; } } diff --git a/src/IoTSharp.Data.Taos/TaosCommand.cs b/src/IoTSharp.Data.Taos/TaosCommand.cs index f846bbf..ace7a7a 100644 --- a/src/IoTSharp.Data.Taos/TaosCommand.cs +++ b/src/IoTSharp.Data.Taos/TaosCommand.cs @@ -289,6 +289,14 @@ internal long GetDateTimeFrom(DateTime dt, IntPtr _taos) /// A Taos error occurs during execution. public new virtual TaosDataReader ExecuteReader(CommandBehavior behavior) { + var _taos = _connection.TakeClient(); + var dr= ExecuteReader(behavior, _taos); + dr.OnDispose += (object sender, EventArgs e)=>_connection.ReturnClient(_taos); + return dr; + } + + private TaosDataReader ExecuteReader(CommandBehavior behavior,IntPtr _taos) + { if ((behavior & ~(CommandBehavior.Default | CommandBehavior.SequentialAccess | CommandBehavior.SingleResult | CommandBehavior.SingleRow | CommandBehavior.CloseConnection)) != 0) { @@ -322,7 +330,7 @@ internal long GetDateTimeFrom(DateTime dt, IntPtr _taos) var closeConnection = (behavior & CommandBehavior.CloseConnection) != 0; try { - var _taos = _connection.TakeClient(); + #if DEBUG Debug.WriteLine($"_commandText:{_commandText}"); #endif @@ -605,10 +613,14 @@ public override int ExecuteNonQuery() { throw new InvalidOperationException($"CallRequiresSetCommandText{nameof(ExecuteNonQuery)}"); } - using (var reader = ExecuteReader()) + var _taos= _connection.TakeClient(); + int result = -1; + using (var reader = ExecuteReader( CommandBehavior.Default,_taos)) { - return reader.RecordsAffected; + result= reader.RecordsAffected; } + _connection.ReturnClient(_taos); + return result; } /// @@ -626,13 +638,16 @@ public override object ExecuteScalar() { throw new InvalidOperationException($"CallRequiresSetCommandText{nameof(ExecuteScalar)}"); } - + var _taos = _connection.TakeClient(); + object result =null; using (var reader = ExecuteReader()) { - return reader.Read() + result= reader.Read() ? reader.GetValue(0) : null; } + _connection.ReturnClient(_taos); + return result; } /// diff --git a/src/IoTSharp.Data.Taos/TaosConnection.cs b/src/IoTSharp.Data.Taos/TaosConnection.cs index 17f1d50..6ddd509 100644 --- a/src/IoTSharp.Data.Taos/TaosConnection.cs +++ b/src/IoTSharp.Data.Taos/TaosConnection.cs @@ -101,6 +101,8 @@ public override string ConnectionString internal TaosConnectionStringBuilder ConnectionStringBuilder { get; set; } + public override int ConnectionTimeout => ConnectionStringBuilder.ConnectionTimeout; + /// /// Gets the path to the database file. Will be absolute for open connections. @@ -207,7 +209,7 @@ public override void Open() } if (!g_pool.ContainsKey(_connectionString)) { - g_pool.Add(_connectionString, new ConcurrentTaosQueue()); + g_pool.Add(_connectionString, new ConcurrentTaosQueue() { Timeout= ConnectionTimeout}); } _queue = g_pool[_connectionString]; _queue.AddRef(); @@ -215,10 +217,10 @@ public override void Open() { throw new InvalidOperationException("Open Requires Set ConnectionString"); } - for (int i = 0; i < ConnectionStringBuilder.PoolSize; i++) + for (int i = 0; i < ConnectionStringBuilder.PoolSize+1; i++) { var c = TDengine.Connect(this.DataSource, ConnectionStringBuilder.Username, ConnectionStringBuilder.Password, "", (short)ConnectionStringBuilder.Port); - if (c != null && c!=IntPtr.Zero) + if (c!=IntPtr.Zero) { _queue.Return(c); } diff --git a/src/IoTSharp.Data.Taos/TaosConnectionStringBuilder.cs b/src/IoTSharp.Data.Taos/TaosConnectionStringBuilder.cs index a72b0e6..ee233bc 100644 --- a/src/IoTSharp.Data.Taos/TaosConnectionStringBuilder.cs +++ b/src/IoTSharp.Data.Taos/TaosConnectionStringBuilder.cs @@ -26,7 +26,7 @@ public class TaosConnectionStringBuilder : DbConnectionStringBuilder private const string DataBaseKeyword = "DataBase"; private const string PortKeyword = "Port"; private const string PoolSizeKeyword = "PoolSize"; - + private const string TimeOutKeyword = "TimeOut"; private enum Keywords { DataSource, @@ -35,7 +35,9 @@ private enum Keywords Password, Port, Charset, - PoolSize + PoolSize, + TimeOut + } @@ -48,11 +50,12 @@ private enum Keywords private string _charset = System.Text.Encoding.UTF8.EncodingName; private string _password = string.Empty; private int _port =6030; - private int _PoolSize=8; + private int _PoolSize=Environment.ProcessorCount; + private int _timeout = 30; static TaosConnectionStringBuilder() { - var validKeywords = new string[7]; + var validKeywords = new string[8]; validKeywords[(int)Keywords.DataSource] = DataSourceKeyword; validKeywords[(int)Keywords.DataBase] = DataBaseKeyword; validKeywords[(int)Keywords.Username] = UserNameKeyword; @@ -60,9 +63,10 @@ static TaosConnectionStringBuilder() validKeywords[(int)Keywords.Charset] =CharsetKeyword; validKeywords[(int)Keywords.Port] = PortKeyword; validKeywords[(int)Keywords.PoolSize] = PoolSizeKeyword; + validKeywords[(int)Keywords.TimeOut] = TimeOutKeyword; _validKeywords = validKeywords; - _keywords = new Dictionary(7, StringComparer.OrdinalIgnoreCase) + _keywords = new Dictionary(8, StringComparer.OrdinalIgnoreCase) { [DataSourceKeyword] = Keywords.DataSource, [UserNameKeyword] = Keywords.Username, @@ -71,7 +75,8 @@ static TaosConnectionStringBuilder() [CharsetKeyword] = Keywords.Charset, [DataSourceNoSpaceKeyword] = Keywords.DataSource, [PortKeyword] = Keywords.Port, - [PoolSizeKeyword] = Keywords.PoolSize + [PoolSizeKeyword] = Keywords.PoolSize, + [TimeOutKeyword] = Keywords.TimeOut }; } @@ -100,6 +105,8 @@ public virtual string DataSource get => _dataSource; set => base[DataSourceKeyword] = _dataSource = value; } + + public virtual string Username { get => _userName; @@ -164,7 +171,13 @@ public virtual string DataBase } internal bool ForceDatabaseName { get; set; } = false; + + public int ConnectionTimeout + { + get => _timeout; + set => base[TimeOutKeyword] = _timeout = value; + } /// /// Gets or sets the value associated with the specified key. @@ -206,6 +219,9 @@ public override object this[string keyword] case Keywords.PoolSize: PoolSize = Convert.ToInt32(value, CultureInfo.InvariantCulture); return; + case Keywords.TimeOut: + ConnectionTimeout = Convert.ToInt32(value, CultureInfo.InvariantCulture); + return; default: Debug.Assert(false, "Unexpected keyword: " + keyword); return; @@ -330,6 +346,8 @@ private object GetAt(Keywords index) return Charset; case Keywords.PoolSize: return PoolSize; + case Keywords.TimeOut: + return ConnectionTimeout; default: Debug.Assert(false, "Unexpected keyword: " + index); return null; @@ -358,13 +376,16 @@ private void Reset(Keywords index) _dataBase = string.Empty; return; case Keywords.Port: - _port=6060; + _port=6030; return; case Keywords.Charset: _charset = System.Text.Encoding.UTF8.EncodingName; return; case Keywords.PoolSize: - _PoolSize = 8; + _PoolSize = Environment.ProcessorCount; + return; + case Keywords.TimeOut : + _timeout = 30; return; default: Debug.Assert(false, "Unexpected keyword: " + index); diff --git a/src/IoTSharp.Data.Taos/TaosDataReader.cs b/src/IoTSharp.Data.Taos/TaosDataReader.cs index 59bd671..2aaec5c 100644 --- a/src/IoTSharp.Data.Taos/TaosDataReader.cs +++ b/src/IoTSharp.Data.Taos/TaosDataReader.cs @@ -24,7 +24,6 @@ public class TaosDataReader : DbDataReader private readonly int _recordsAffected; private bool _closed; private bool _closeConnection; - private readonly IntPtr _taos; private IntPtr _taosResult; private int _fieldCount; IntPtr rowdata=IntPtr.Zero; @@ -33,11 +32,12 @@ public class TaosDataReader : DbDataReader private DateTime _dt1970; private TAOS_BIND[] _binds; + + internal TaosDataReader(TaosCommand taosCommand, taosField[] metas, bool closeConnection, IntPtr taos, IntPtr res, int recordsAffected, int fieldcount, TAOS_BIND[] binds) { _command = taosCommand; _closeConnection = closeConnection; - _taos = taos; _fieldCount = fieldcount; _hasRows = recordsAffected > 0; _recordsAffected = recordsAffected; @@ -145,6 +145,8 @@ public override bool NextResult() public override void Close() => Dispose(true); + + internal event EventHandler OnDispose; /// /// Releases any resources used by the data reader and closes it. /// @@ -160,8 +162,7 @@ protected override void Dispose(bool disposing) _command.DataReader = null; _closed = true; - - + TDengine.FreeResult(_taosResult); if (_binds != null) { @@ -172,7 +173,7 @@ protected override void Dispose(bool disposing) { TDengine.FreeResult(rowdata); } - _command.Connection.ReturnClient(_taos); + OnDispose?.Invoke(this, EventArgs.Empty); } ///